adv_parser.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Callable
  5. from functools import lru_cache
  6. from typing import TypedDict
  7. from bleak.backends.device import BLEDevice
  8. from bleak.backends.scanner import AdvertisementData
  9. from .adv_parsers.bot import process_wohand
  10. from .adv_parsers.bulb import process_color_bulb
  11. from .adv_parsers.ceiling_light import process_woceiling
  12. from .adv_parsers.contact import process_wocontact
  13. from .adv_parsers.curtain import process_wocurtain
  14. from .adv_parsers.humidifier import process_wohumidifier
  15. from .adv_parsers.light_strip import process_wostrip
  16. from .adv_parsers.lock import process_wolock
  17. from .adv_parsers.meter import process_wosensorth
  18. from .adv_parsers.motion import process_wopresence
  19. from .adv_parsers.plug import process_woplugmini
  20. from .const import SwitchbotModel
  21. from .models import SwitchBotAdvertisement
  22. _LOGGER = logging.getLogger(__name__)
  23. SERVICE_DATA_ORDER = (
  24. "0000fd3d-0000-1000-8000-00805f9b34fb",
  25. "00000d00-0000-1000-8000-00805f9b34fb",
  26. )
  27. MFR_DATA_ORDER = (2409, 89)
  28. class SwitchbotSupportedType(TypedDict):
  29. """Supported type of Switchbot."""
  30. modelName: SwitchbotModel
  31. modelFriendlyName: str
  32. func: Callable[[bytes, bytes | None], dict[str, bool | int]]
  33. SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
  34. "d": {
  35. "modelName": SwitchbotModel.CONTACT_SENSOR,
  36. "modelFriendlyName": "Contact Sensor",
  37. "func": process_wocontact,
  38. "manufacturer_id": 2409,
  39. "manufacturer_data_length": 13,
  40. },
  41. "H": {
  42. "modelName": SwitchbotModel.BOT,
  43. "modelFriendlyName": "Bot",
  44. "func": process_wohand,
  45. "service_uuids": {"cba20d00-224d-11e6-9fb8-0002a5d5c51b"},
  46. "manufacturer_id": 89,
  47. },
  48. "s": {
  49. "modelName": SwitchbotModel.MOTION_SENSOR,
  50. "modelFriendlyName": "Motion Sensor",
  51. "func": process_wopresence,
  52. },
  53. "r": {
  54. "modelName": SwitchbotModel.LIGHT_STRIP,
  55. "modelFriendlyName": "Light Strip",
  56. "func": process_wostrip,
  57. "manufacturer_id": 2409,
  58. "manufacturer_data_length": 16,
  59. },
  60. "c": {
  61. "modelName": SwitchbotModel.CURTAIN,
  62. "modelFriendlyName": "Curtain",
  63. "func": process_wocurtain,
  64. "service_uuids": {
  65. "00001800-0000-1000-8000-00805f9b34fb",
  66. "00001801-0000-1000-8000-00805f9b34fb",
  67. "cba20d00-224d-11e6-9fb8-0002a5d5c51b",
  68. },
  69. "manufacturer_id": 89,
  70. },
  71. "T": {
  72. "modelName": SwitchbotModel.METER,
  73. "modelFriendlyName": "Meter",
  74. "func": process_wosensorth,
  75. },
  76. "i": {
  77. "modelName": SwitchbotModel.METER,
  78. "modelFriendlyName": "Meter Plus",
  79. "func": process_wosensorth,
  80. },
  81. "g": {
  82. "modelName": SwitchbotModel.PLUG_MINI,
  83. "modelFriendlyName": "Plug Mini",
  84. "func": process_woplugmini,
  85. },
  86. "j": {
  87. "modelName": SwitchbotModel.PLUG_MINI,
  88. "modelFriendlyName": "Plug Mini (JP)",
  89. "func": process_woplugmini,
  90. },
  91. "u": {
  92. "modelName": SwitchbotModel.COLOR_BULB,
  93. "modelFriendlyName": "Color Bulb",
  94. "func": process_color_bulb,
  95. "manufacturer_id": 2409,
  96. "manufacturer_data_length": 11,
  97. },
  98. "q": {
  99. "modelName": SwitchbotModel.CEILING_LIGHT,
  100. "modelFriendlyName": "Ceiling Light",
  101. "func": process_woceiling,
  102. },
  103. "e": {
  104. "modelName": SwitchbotModel.HUMIDIFIER,
  105. "modelFriendlyName": "Humidifier",
  106. "func": process_wohumidifier,
  107. },
  108. "o": {
  109. "modelName": SwitchbotModel.LOCK,
  110. "modelFriendlyName": "Lock",
  111. "func": process_wolock,
  112. },
  113. }
  114. _SWITCHBOT_MODEL_TO_CHAR = {
  115. model_data["modelName"]: model_chr
  116. for model_chr, model_data in SUPPORTED_TYPES.items()
  117. }
  118. MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = {
  119. mfr_id: [] for mfr_id in MFR_DATA_ORDER
  120. }
  121. for model_chr, model in SUPPORTED_TYPES.items():
  122. if "manufacturer_id" in model:
  123. mfr_id = model["manufacturer_id"]
  124. MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model))
  125. def parse_advertisement_data(
  126. device: BLEDevice,
  127. advertisement_data: AdvertisementData,
  128. model: SwitchbotModel | None = None,
  129. ) -> SwitchBotAdvertisement | None:
  130. """Parse advertisement data."""
  131. service_data = advertisement_data.service_data
  132. _service_data = None
  133. for uuid in SERVICE_DATA_ORDER:
  134. if uuid in service_data:
  135. _service_data = service_data[uuid]
  136. break
  137. _mfr_data = None
  138. _mfr_id = None
  139. for mfr_id in MFR_DATA_ORDER:
  140. if mfr_id in advertisement_data.manufacturer_data:
  141. _mfr_id = mfr_id
  142. _mfr_data = advertisement_data.manufacturer_data[mfr_id]
  143. break
  144. if _mfr_data is None and _service_data is None:
  145. return None
  146. try:
  147. data = _parse_data(
  148. "".join(sorted(advertisement_data.service_uuids)),
  149. _service_data,
  150. _mfr_data,
  151. _mfr_id,
  152. model,
  153. )
  154. except Exception as err: # pylint: disable=broad-except
  155. _LOGGER.exception(
  156. "Failed to parse advertisement data: %s: %s", advertisement_data, err
  157. )
  158. return None
  159. if not data:
  160. return None
  161. return SwitchBotAdvertisement(device.address, data, device, advertisement_data.rssi)
  162. @lru_cache(maxsize=128)
  163. def _parse_data(
  164. _service_uuids_str: str,
  165. _service_data: bytes | None,
  166. _mfr_data: bytes | None,
  167. _mfr_id: int | None = None,
  168. _switchbot_model: SwitchbotModel | None = None,
  169. ) -> SwitchBotAdvertisement | None:
  170. """Parse advertisement data."""
  171. _model = chr(_service_data[0] & 0b01111111) if _service_data else None
  172. if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA:
  173. _service_uuids = set(_service_uuids_str.split(","))
  174. for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]:
  175. if model_data.get("manufacturer_data_length") == len(_mfr_data):
  176. _model = model_chr
  177. break
  178. service_uuids = model_data.get("service_uuids", set())
  179. if service_uuids and service_uuids.intersection(_service_uuids):
  180. _model = model_chr
  181. break
  182. if not _model and _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR:
  183. _model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model]
  184. if not _model:
  185. return None
  186. _isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False
  187. data = {
  188. "rawAdvData": _service_data,
  189. "data": {},
  190. "model": _model,
  191. "isEncrypted": _isEncrypted,
  192. }
  193. type_data = SUPPORTED_TYPES.get(_model)
  194. if type_data:
  195. model_data = type_data["func"](_service_data, _mfr_data)
  196. if model_data:
  197. data.update(
  198. {
  199. "modelFriendlyName": type_data["modelFriendlyName"],
  200. "modelName": type_data["modelName"],
  201. "data": model_data,
  202. }
  203. )
  204. return data