adv_parser.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Callable
  5. from functools import lru_cache
  6. from typing import Any, TypedDict
  7. from bleak.backends.device import BLEDevice
  8. from bleak.backends.scanner import AdvertisementData
  9. from .adv_parsers.blind_tilt import process_woblindtilt
  10. from .adv_parsers.bot import process_wohand
  11. from .adv_parsers.bulb import process_color_bulb
  12. from .adv_parsers.ceiling_light import process_woceiling
  13. from .adv_parsers.contact import process_wocontact
  14. from .adv_parsers.curtain import process_wocurtain
  15. from .adv_parsers.humidifier import process_wohumidifier
  16. from .adv_parsers.light_strip import process_wostrip
  17. from .adv_parsers.lock import process_wolock
  18. from .adv_parsers.meter import process_wosensorth
  19. from .adv_parsers.motion import process_wopresence
  20. from .adv_parsers.plug import process_woplugmini
  21. from .const import SwitchbotModel
  22. from .models import SwitchBotAdvertisement
  23. _LOGGER = logging.getLogger(__name__)
  24. SERVICE_DATA_ORDER = (
  25. "0000fd3d-0000-1000-8000-00805f9b34fb",
  26. "00000d00-0000-1000-8000-00805f9b34fb",
  27. )
  28. MFR_DATA_ORDER = (2409, 741, 89)
  29. class SwitchbotSupportedType(TypedDict):
  30. """Supported type of Switchbot."""
  31. modelName: SwitchbotModel
  32. modelFriendlyName: str
  33. func: Callable[[bytes, bytes | None], dict[str, bool | int]]
  34. manufacturer_id: int | None
  35. manufacturer_data_length: int | None
  36. SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
  37. "d": {
  38. "modelName": SwitchbotModel.CONTACT_SENSOR,
  39. "modelFriendlyName": "Contact Sensor",
  40. "func": process_wocontact,
  41. "manufacturer_id": 2409,
  42. },
  43. "H": {
  44. "modelName": SwitchbotModel.BOT,
  45. "modelFriendlyName": "Bot",
  46. "func": process_wohand,
  47. "service_uuids": {"cba20d00-224d-11e6-9fb8-0002a5d5c51b"},
  48. "manufacturer_id": 89,
  49. },
  50. "s": {
  51. "modelName": SwitchbotModel.MOTION_SENSOR,
  52. "modelFriendlyName": "Motion Sensor",
  53. "func": process_wopresence,
  54. "manufacturer_id": 2409,
  55. },
  56. "r": {
  57. "modelName": SwitchbotModel.LIGHT_STRIP,
  58. "modelFriendlyName": "Light Strip",
  59. "func": process_wostrip,
  60. "manufacturer_id": 2409,
  61. "manufacturer_data_length": 16,
  62. },
  63. "{": {
  64. "modelName": SwitchbotModel.CURTAIN,
  65. "modelFriendlyName": "Curtain 3",
  66. "func": process_wocurtain,
  67. "manufacturer_id": 2409,
  68. },
  69. "c": {
  70. "modelName": SwitchbotModel.CURTAIN,
  71. "modelFriendlyName": "Curtain",
  72. "func": process_wocurtain,
  73. "manufacturer_id": 2409,
  74. },
  75. "w": {
  76. "modelName": SwitchbotModel.IO_METER,
  77. "modelFriendlyName": "Indoor/Outdoor Meter",
  78. "func": process_wosensorth,
  79. "manufacturer_id": 2409,
  80. },
  81. "i": {
  82. "modelName": SwitchbotModel.METER,
  83. "modelFriendlyName": "Meter Plus",
  84. "func": process_wosensorth,
  85. "manufacturer_id": 2409,
  86. },
  87. "T": {
  88. "modelName": SwitchbotModel.METER,
  89. "modelFriendlyName": "Meter",
  90. "func": process_wosensorth,
  91. "manufacturer_id": 2409,
  92. },
  93. "g": {
  94. "modelName": SwitchbotModel.PLUG_MINI,
  95. "modelFriendlyName": "Plug Mini",
  96. "func": process_woplugmini,
  97. "manufacturer_id": 2409,
  98. },
  99. "j": {
  100. "modelName": SwitchbotModel.PLUG_MINI,
  101. "modelFriendlyName": "Plug Mini (JP)",
  102. "func": process_woplugmini,
  103. "manufacturer_id": 2409,
  104. },
  105. "u": {
  106. "modelName": SwitchbotModel.COLOR_BULB,
  107. "modelFriendlyName": "Color Bulb",
  108. "func": process_color_bulb,
  109. "manufacturer_id": 2409,
  110. },
  111. "q": {
  112. "modelName": SwitchbotModel.CEILING_LIGHT,
  113. "modelFriendlyName": "Ceiling Light",
  114. "func": process_woceiling,
  115. "manufacturer_id": 2409,
  116. },
  117. "n": {
  118. "modelName": SwitchbotModel.CEILING_LIGHT,
  119. "modelFriendlyName": "Ceiling Light Pro",
  120. "func": process_woceiling,
  121. "manufacturer_id": 2409,
  122. },
  123. "e": {
  124. "modelName": SwitchbotModel.HUMIDIFIER,
  125. "modelFriendlyName": "Humidifier",
  126. "func": process_wohumidifier,
  127. "manufacturer_id": 741,
  128. "manufacturer_data_length": 6,
  129. },
  130. "o": {
  131. "modelName": SwitchbotModel.LOCK,
  132. "modelFriendlyName": "Lock",
  133. "func": process_wolock,
  134. "manufacturer_id": 2409,
  135. },
  136. "x": {
  137. "modelName": SwitchbotModel.BLIND_TILT,
  138. "modelFriendlyName": "Blind Tilt",
  139. "func": process_woblindtilt,
  140. "manufacturer_id": 2409,
  141. },
  142. }
  143. _SWITCHBOT_MODEL_TO_CHAR = {
  144. model_data["modelName"]: model_chr
  145. for model_chr, model_data in SUPPORTED_TYPES.items()
  146. }
  147. MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = {
  148. mfr_id: [] for mfr_id in MFR_DATA_ORDER
  149. }
  150. for model_chr, model in SUPPORTED_TYPES.items():
  151. if "manufacturer_id" in model:
  152. mfr_id = model["manufacturer_id"]
  153. MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model))
  154. def parse_advertisement_data(
  155. device: BLEDevice,
  156. advertisement_data: AdvertisementData,
  157. model: SwitchbotModel | None = None,
  158. ) -> SwitchBotAdvertisement | None:
  159. """Parse advertisement data."""
  160. service_data = advertisement_data.service_data
  161. _service_data = None
  162. for uuid in SERVICE_DATA_ORDER:
  163. if uuid in service_data:
  164. _service_data = service_data[uuid]
  165. break
  166. _mfr_data = None
  167. _mfr_id = None
  168. for mfr_id in MFR_DATA_ORDER:
  169. if mfr_id in advertisement_data.manufacturer_data:
  170. _mfr_id = mfr_id
  171. _mfr_data = advertisement_data.manufacturer_data[mfr_id]
  172. break
  173. if _mfr_data is None and _service_data is None:
  174. return None
  175. try:
  176. data = _parse_data(
  177. _service_data,
  178. _mfr_data,
  179. _mfr_id,
  180. model,
  181. )
  182. except Exception as err: # pylint: disable=broad-except
  183. _LOGGER.exception(
  184. "Failed to parse advertisement data: %s: %s", advertisement_data, err
  185. )
  186. return None
  187. if not data:
  188. return None
  189. return SwitchBotAdvertisement(
  190. device.address, data, device, advertisement_data.rssi, bool(_service_data)
  191. )
  192. @lru_cache(maxsize=128)
  193. def _parse_data(
  194. _service_data: bytes | None,
  195. _mfr_data: bytes | None,
  196. _mfr_id: int | None = None,
  197. _switchbot_model: SwitchbotModel | None = None,
  198. ) -> dict[str, Any] | None:
  199. """Parse advertisement data."""
  200. _model = chr(_service_data[0] & 0b01111111) if _service_data else None
  201. if _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR:
  202. _model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model]
  203. if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA:
  204. for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]:
  205. if model_data.get("manufacturer_data_length") == len(_mfr_data):
  206. _model = model_chr
  207. break
  208. if not _model:
  209. return None
  210. _isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False
  211. data = {
  212. "rawAdvData": _service_data,
  213. "data": {},
  214. "model": _model,
  215. "isEncrypted": _isEncrypted,
  216. }
  217. type_data = SUPPORTED_TYPES.get(_model)
  218. if type_data:
  219. model_data = type_data["func"](_service_data, _mfr_data)
  220. if model_data:
  221. data.update(
  222. {
  223. "modelFriendlyName": type_data["modelFriendlyName"],
  224. "modelName": type_data["modelName"],
  225. "data": model_data,
  226. }
  227. )
  228. return data