adv_parser.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Callable
  5. from functools import lru_cache
  6. from typing import TypedDict
  7. from bleak.backends.device import BLEDevice
  8. from bleak.backends.scanner import AdvertisementData
  9. from .adv_parsers.bot import process_wohand
  10. from .adv_parsers.bulb import process_color_bulb
  11. from .adv_parsers.ceiling_light import process_woceiling
  12. from .adv_parsers.contact import process_wocontact
  13. from .adv_parsers.curtain import process_wocurtain
  14. from .adv_parsers.humidifier import process_wohumidifier
  15. from .adv_parsers.light_strip import process_wostrip
  16. from .adv_parsers.lock import process_wolock
  17. from .adv_parsers.meter import process_wosensorth
  18. from .adv_parsers.motion import process_wopresence
  19. from .adv_parsers.plug import process_woplugmini
  20. from .const import SwitchbotModel
  21. from .models import SwitchBotAdvertisement
  22. _LOGGER = logging.getLogger(__name__)
  23. SERVICE_DATA_ORDER = (
  24. "0000fd3d-0000-1000-8000-00805f9b34fb",
  25. "00000d00-0000-1000-8000-00805f9b34fb",
  26. )
  27. class SwitchbotSupportedType(TypedDict):
  28. """Supported type of Switchbot."""
  29. modelName: SwitchbotModel
  30. modelFriendlyName: str
  31. func: Callable[[bytes, bytes | None], dict[str, bool | int]]
  32. SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
  33. "d": {
  34. "modelName": SwitchbotModel.CONTACT_SENSOR,
  35. "modelFriendlyName": "Contact Sensor",
  36. "func": process_wocontact,
  37. },
  38. "H": {
  39. "modelName": SwitchbotModel.BOT,
  40. "modelFriendlyName": "Bot",
  41. "func": process_wohand,
  42. },
  43. "s": {
  44. "modelName": SwitchbotModel.MOTION_SENSOR,
  45. "modelFriendlyName": "Motion Sensor",
  46. "func": process_wopresence,
  47. },
  48. "r": {
  49. "modelName": SwitchbotModel.LIGHT_STRIP,
  50. "modelFriendlyName": "Light Strip",
  51. "func": process_wostrip,
  52. },
  53. "c": {
  54. "modelName": SwitchbotModel.CURTAIN,
  55. "modelFriendlyName": "Curtain",
  56. "func": process_wocurtain,
  57. },
  58. "T": {
  59. "modelName": SwitchbotModel.METER,
  60. "modelFriendlyName": "Meter",
  61. "func": process_wosensorth,
  62. },
  63. "i": {
  64. "modelName": SwitchbotModel.METER,
  65. "modelFriendlyName": "Meter Plus",
  66. "func": process_wosensorth,
  67. },
  68. "g": {
  69. "modelName": SwitchbotModel.PLUG_MINI,
  70. "modelFriendlyName": "Plug Mini",
  71. "func": process_woplugmini,
  72. },
  73. "j": {
  74. "modelName": SwitchbotModel.PLUG_MINI,
  75. "modelFriendlyName": "Plug Mini (JP)",
  76. "func": process_woplugmini,
  77. },
  78. "u": {
  79. "modelName": SwitchbotModel.COLOR_BULB,
  80. "modelFriendlyName": "Color Bulb",
  81. "func": process_color_bulb,
  82. },
  83. "q": {
  84. "modelName": SwitchbotModel.CEILING_LIGHT,
  85. "modelFriendlyName": "Ceiling Light",
  86. "func": process_woceiling,
  87. },
  88. "e": {
  89. "modelName": SwitchbotModel.HUMIDIFIER,
  90. "modelFriendlyName": "Humidifier",
  91. "func": process_wohumidifier,
  92. },
  93. "o": {
  94. "modelName": SwitchbotModel.LOCK,
  95. "modelFriendlyName": "Lock",
  96. "func": process_wolock,
  97. },
  98. }
  99. def parse_advertisement_data(
  100. device: BLEDevice, advertisement_data: AdvertisementData
  101. ) -> SwitchBotAdvertisement | None:
  102. """Parse advertisement data."""
  103. _mgr_datas = list(advertisement_data.manufacturer_data.values())
  104. service_data = advertisement_data.service_data
  105. if not service_data:
  106. return None
  107. _service_data = None
  108. for uuid in SERVICE_DATA_ORDER:
  109. if uuid in service_data:
  110. _service_data = service_data[uuid]
  111. break
  112. if not _service_data:
  113. return None
  114. _mfr_data = _mgr_datas[0] if _mgr_datas else None
  115. try:
  116. data = _parse_data(_service_data, _mfr_data)
  117. except Exception as err: # pylint: disable=broad-except
  118. _LOGGER.exception(
  119. "Failed to parse advertisement data: %s: %s", advertisement_data, err
  120. )
  121. return None
  122. return SwitchBotAdvertisement(device.address, data, device, advertisement_data.rssi)
  123. @lru_cache(maxsize=128)
  124. def _parse_data(
  125. _service_data: bytes, _mfr_data: bytes | None
  126. ) -> SwitchBotAdvertisement | None:
  127. """Parse advertisement data."""
  128. _model = chr(_service_data[0] & 0b01111111)
  129. data = {
  130. "rawAdvData": _service_data,
  131. "data": {},
  132. "model": _model,
  133. "isEncrypted": bool(_service_data[0] & 0b10000000),
  134. }
  135. type_data = SUPPORTED_TYPES.get(_model)
  136. if type_data:
  137. model_data = type_data["func"](_service_data, _mfr_data)
  138. if model_data:
  139. data.update(
  140. {
  141. "modelFriendlyName": type_data["modelFriendlyName"],
  142. "modelName": type_data["modelName"],
  143. "data": model_data,
  144. }
  145. )
  146. return data