adv_parser.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Callable
  5. from functools import lru_cache
  6. from typing import Any, TypedDict
  7. from bleak.backends.device import BLEDevice
  8. from bleak.backends.scanner import AdvertisementData
  9. from .adv_parsers.air_purifier import process_air_purifier
  10. from .adv_parsers.blind_tilt import process_woblindtilt
  11. from .adv_parsers.bot import process_wohand
  12. from .adv_parsers.bulb import process_color_bulb
  13. from .adv_parsers.ceiling_light import process_woceiling
  14. from .adv_parsers.contact import process_wocontact
  15. from .adv_parsers.curtain import process_wocurtain
  16. from .adv_parsers.fan import process_fan
  17. from .adv_parsers.hub2 import process_wohub2
  18. from .adv_parsers.hub3 import process_hub3
  19. from .adv_parsers.hubmini_matter import process_hubmini_matter
  20. from .adv_parsers.humidifier import process_evaporative_humidifier, process_wohumidifier
  21. from .adv_parsers.keypad import process_wokeypad
  22. from .adv_parsers.leak import process_leak
  23. from .adv_parsers.light_strip import process_wostrip
  24. from .adv_parsers.lock import process_lock2, process_wolock, process_wolock_pro
  25. from .adv_parsers.meter import process_wosensorth, process_wosensorth_c
  26. from .adv_parsers.motion import process_wopresence
  27. from .adv_parsers.plug import process_woplugmini
  28. from .adv_parsers.relay_switch import (
  29. process_garage_door_opener,
  30. process_relay_switch_2pm,
  31. process_relay_switch_common_data,
  32. )
  33. from .adv_parsers.remote import process_woremote
  34. from .adv_parsers.roller_shade import process_worollershade
  35. from .adv_parsers.vacuum import process_vacuum, process_vacuum_k
  36. from .const import SwitchbotModel
  37. from .models import SwitchBotAdvertisement
  38. _LOGGER = logging.getLogger(__name__)
  39. SERVICE_DATA_ORDER = (
  40. "0000fd3d-0000-1000-8000-00805f9b34fb",
  41. "00000d00-0000-1000-8000-00805f9b34fb",
  42. )
  43. MFR_DATA_ORDER = (2409, 741, 89)
  44. class SwitchbotSupportedType(TypedDict):
  45. """Supported type of Switchbot."""
  46. modelName: SwitchbotModel
  47. modelFriendlyName: str
  48. func: Callable[[bytes, bytes | None], dict[str, bool | int]]
  49. manufacturer_id: int | None
  50. manufacturer_data_length: int | None
  51. SUPPORTED_TYPES: dict[str | bytes, SwitchbotSupportedType] = {
  52. "d": {
  53. "modelName": SwitchbotModel.CONTACT_SENSOR,
  54. "modelFriendlyName": "Contact Sensor",
  55. "func": process_wocontact,
  56. "manufacturer_id": 2409,
  57. },
  58. "H": {
  59. "modelName": SwitchbotModel.BOT,
  60. "modelFriendlyName": "Bot",
  61. "func": process_wohand,
  62. "manufacturer_id": 89,
  63. },
  64. "s": {
  65. "modelName": SwitchbotModel.MOTION_SENSOR,
  66. "modelFriendlyName": "Motion Sensor",
  67. "func": process_wopresence,
  68. "manufacturer_id": 2409,
  69. },
  70. "r": {
  71. "modelName": SwitchbotModel.LIGHT_STRIP,
  72. "modelFriendlyName": "Light Strip",
  73. "func": process_wostrip,
  74. "manufacturer_id": 2409,
  75. },
  76. "{": {
  77. "modelName": SwitchbotModel.CURTAIN,
  78. "modelFriendlyName": "Curtain 3",
  79. "func": process_wocurtain,
  80. "manufacturer_id": 2409,
  81. },
  82. "c": {
  83. "modelName": SwitchbotModel.CURTAIN,
  84. "modelFriendlyName": "Curtain",
  85. "func": process_wocurtain,
  86. "manufacturer_id": 2409,
  87. },
  88. "w": {
  89. "modelName": SwitchbotModel.IO_METER,
  90. "modelFriendlyName": "Indoor/Outdoor Meter",
  91. "func": process_wosensorth,
  92. "manufacturer_id": 2409,
  93. },
  94. "i": {
  95. "modelName": SwitchbotModel.METER,
  96. "modelFriendlyName": "Meter Plus",
  97. "func": process_wosensorth,
  98. "manufacturer_id": 2409,
  99. },
  100. "T": {
  101. "modelName": SwitchbotModel.METER,
  102. "modelFriendlyName": "Meter",
  103. "func": process_wosensorth,
  104. "manufacturer_id": 2409,
  105. },
  106. "4": {
  107. "modelName": SwitchbotModel.METER_PRO,
  108. "modelFriendlyName": "Meter Pro",
  109. "func": process_wosensorth,
  110. "manufacturer_id": 2409,
  111. },
  112. "5": {
  113. "modelName": SwitchbotModel.METER_PRO_C,
  114. "modelFriendlyName": "Meter Pro CO2",
  115. "func": process_wosensorth_c,
  116. "manufacturer_id": 2409,
  117. },
  118. "v": {
  119. "modelName": SwitchbotModel.HUB2,
  120. "modelFriendlyName": "Hub 2",
  121. "func": process_wohub2,
  122. "manufacturer_id": 2409,
  123. },
  124. "g": {
  125. "modelName": SwitchbotModel.PLUG_MINI,
  126. "modelFriendlyName": "Plug Mini",
  127. "func": process_woplugmini,
  128. "manufacturer_id": 2409,
  129. },
  130. "j": {
  131. "modelName": SwitchbotModel.PLUG_MINI,
  132. "modelFriendlyName": "Plug Mini (JP)",
  133. "func": process_woplugmini,
  134. "manufacturer_id": 2409,
  135. },
  136. "u": {
  137. "modelName": SwitchbotModel.COLOR_BULB,
  138. "modelFriendlyName": "Color Bulb",
  139. "func": process_color_bulb,
  140. "manufacturer_id": 2409,
  141. },
  142. "q": {
  143. "modelName": SwitchbotModel.CEILING_LIGHT,
  144. "modelFriendlyName": "Ceiling Light",
  145. "func": process_woceiling,
  146. "manufacturer_id": 2409,
  147. },
  148. "n": {
  149. "modelName": SwitchbotModel.CEILING_LIGHT,
  150. "modelFriendlyName": "Ceiling Light Pro",
  151. "func": process_woceiling,
  152. "manufacturer_id": 2409,
  153. },
  154. "e": {
  155. "modelName": SwitchbotModel.HUMIDIFIER,
  156. "modelFriendlyName": "Humidifier",
  157. "func": process_wohumidifier,
  158. "manufacturer_id": 741,
  159. "manufacturer_data_length": 6,
  160. },
  161. "#": {
  162. "modelName": SwitchbotModel.EVAPORATIVE_HUMIDIFIER,
  163. "modelFriendlyName": "Evaporative Humidifier",
  164. "func": process_evaporative_humidifier,
  165. "manufacturer_id": 2409,
  166. },
  167. "o": {
  168. "modelName": SwitchbotModel.LOCK,
  169. "modelFriendlyName": "Lock",
  170. "func": process_wolock,
  171. "manufacturer_id": 2409,
  172. },
  173. "$": {
  174. "modelName": SwitchbotModel.LOCK_PRO,
  175. "modelFriendlyName": "Lock Pro",
  176. "func": process_wolock_pro,
  177. "manufacturer_id": 2409,
  178. },
  179. "x": {
  180. "modelName": SwitchbotModel.BLIND_TILT,
  181. "modelFriendlyName": "Blind Tilt",
  182. "func": process_woblindtilt,
  183. "manufacturer_id": 2409,
  184. },
  185. "&": {
  186. "modelName": SwitchbotModel.LEAK,
  187. "modelFriendlyName": "Leak Detector",
  188. "func": process_leak,
  189. "manufacturer_id": 2409,
  190. },
  191. "y": {
  192. "modelName": SwitchbotModel.KEYPAD,
  193. "modelFriendlyName": "Keypad",
  194. "func": process_wokeypad,
  195. "manufacturer_id": 2409,
  196. },
  197. "<": {
  198. "modelName": SwitchbotModel.RELAY_SWITCH_1PM,
  199. "modelFriendlyName": "Relay Switch 1PM",
  200. "func": process_relay_switch_common_data,
  201. "manufacturer_id": 2409,
  202. },
  203. ";": {
  204. "modelName": SwitchbotModel.RELAY_SWITCH_1,
  205. "modelFriendlyName": "Relay Switch 1",
  206. "func": process_relay_switch_common_data,
  207. "manufacturer_id": 2409,
  208. },
  209. "b": {
  210. "modelName": SwitchbotModel.REMOTE,
  211. "modelFriendlyName": "Remote",
  212. "func": process_woremote,
  213. "manufacturer_id": 89,
  214. },
  215. ",": {
  216. "modelName": SwitchbotModel.ROLLER_SHADE,
  217. "modelFriendlyName": "Roller Shade",
  218. "func": process_worollershade,
  219. "manufacturer_id": 2409,
  220. },
  221. "%": {
  222. "modelName": SwitchbotModel.HUBMINI_MATTER,
  223. "modelFriendlyName": "HubMini Matter",
  224. "func": process_hubmini_matter,
  225. "manufacturer_id": 2409,
  226. },
  227. "~": {
  228. "modelName": SwitchbotModel.CIRCULATOR_FAN,
  229. "modelFriendlyName": "Circulator Fan",
  230. "func": process_fan,
  231. "manufacturer_id": 2409,
  232. },
  233. ".": {
  234. "modelName": SwitchbotModel.K20_VACUUM,
  235. "modelFriendlyName": "K20 Vacuum",
  236. "func": process_vacuum,
  237. "manufacturer_id": 2409,
  238. },
  239. "z": {
  240. "modelName": SwitchbotModel.S10_VACUUM,
  241. "modelFriendlyName": "S10 Vacuum",
  242. "func": process_vacuum,
  243. "manufacturer_id": 2409,
  244. },
  245. "3": {
  246. "modelName": SwitchbotModel.K10_PRO_COMBO_VACUUM,
  247. "modelFriendlyName": "K10+ Pro Combo Vacuum",
  248. "func": process_vacuum,
  249. "manufacturer_id": 2409,
  250. },
  251. "}": {
  252. "modelName": SwitchbotModel.K10_VACUUM,
  253. "modelFriendlyName": "K10+ Vacuum",
  254. "func": process_vacuum_k,
  255. "manufacturer_id": 2409,
  256. },
  257. "(": {
  258. "modelName": SwitchbotModel.K10_PRO_VACUUM,
  259. "modelFriendlyName": "K10+ Pro Vacuum",
  260. "func": process_vacuum_k,
  261. "manufacturer_id": 2409,
  262. },
  263. "*": {
  264. "modelName": SwitchbotModel.AIR_PURIFIER,
  265. "modelFriendlyName": "Air Purifier",
  266. "func": process_air_purifier,
  267. "manufacturer_id": 2409,
  268. },
  269. "+": {
  270. "modelName": SwitchbotModel.AIR_PURIFIER,
  271. "modelFriendlyName": "Air Purifier",
  272. "func": process_air_purifier,
  273. "manufacturer_id": 2409,
  274. },
  275. "7": {
  276. "modelName": SwitchbotModel.AIR_PURIFIER_TABLE,
  277. "modelFriendlyName": "Air Purifier Table",
  278. "func": process_air_purifier,
  279. "manufacturer_id": 2409,
  280. },
  281. "8": {
  282. "modelName": SwitchbotModel.AIR_PURIFIER_TABLE,
  283. "modelFriendlyName": "Air Purifier Table",
  284. "func": process_air_purifier,
  285. "manufacturer_id": 2409,
  286. },
  287. b"\x00\x10\xb9\x40": {
  288. "modelName": SwitchbotModel.HUB3,
  289. "modelFriendlyName": "Hub3",
  290. "func": process_hub3,
  291. "manufacturer_id": 2409,
  292. },
  293. "-": {
  294. "modelName": SwitchbotModel.LOCK_LITE,
  295. "modelFriendlyName": "Lock Lite",
  296. "func": process_wolock,
  297. "manufacturer_id": 2409,
  298. },
  299. b"\x00\x10\xa5\xb8": {
  300. "modelName": SwitchbotModel.LOCK_ULTRA,
  301. "modelFriendlyName": "Lock Ultra",
  302. "func": process_lock2,
  303. "manufacturer_id": 2409,
  304. },
  305. ">": {
  306. "modelName": SwitchbotModel.GARAGE_DOOR_OPENER,
  307. "modelFriendlyName": "Garage Door Opener",
  308. "func": process_garage_door_opener,
  309. "manufacturer_id": 2409,
  310. },
  311. "=": {
  312. "modelName": SwitchbotModel.RELAY_SWITCH_2PM,
  313. "modelFriendlyName": "Relay Switch 2PM",
  314. "func": process_relay_switch_2pm,
  315. "manufacturer_id": 2409,
  316. },
  317. }
  318. _SWITCHBOT_MODEL_TO_CHAR = {
  319. model_data["modelName"]: model_chr
  320. for model_chr, model_data in SUPPORTED_TYPES.items()
  321. }
  322. MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = {
  323. mfr_id: [] for mfr_id in MFR_DATA_ORDER
  324. }
  325. for model_chr, model in SUPPORTED_TYPES.items():
  326. if "manufacturer_id" in model:
  327. mfr_id = model["manufacturer_id"]
  328. MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model))
  329. def parse_advertisement_data(
  330. device: BLEDevice,
  331. advertisement_data: AdvertisementData,
  332. model: SwitchbotModel | None = None,
  333. ) -> SwitchBotAdvertisement | None:
  334. """Parse advertisement data."""
  335. service_data = advertisement_data.service_data
  336. _service_data = None
  337. for uuid in SERVICE_DATA_ORDER:
  338. if uuid in service_data:
  339. _service_data = service_data[uuid]
  340. break
  341. _mfr_data = None
  342. _mfr_id = None
  343. for mfr_id in MFR_DATA_ORDER:
  344. if mfr_id in advertisement_data.manufacturer_data:
  345. _mfr_id = mfr_id
  346. _mfr_data = advertisement_data.manufacturer_data[mfr_id]
  347. break
  348. if _mfr_data is None and _service_data is None:
  349. return None
  350. try:
  351. data = _parse_data(
  352. _service_data,
  353. _mfr_data,
  354. _mfr_id,
  355. model,
  356. )
  357. except Exception: # pylint: disable=broad-except
  358. _LOGGER.exception("Failed to parse advertisement data: %s", advertisement_data)
  359. return None
  360. if not data:
  361. return None
  362. return SwitchBotAdvertisement(
  363. device.address, data, device, advertisement_data.rssi, bool(_service_data)
  364. )
  365. @lru_cache(maxsize=128)
  366. def _parse_data(
  367. _service_data: bytes | None,
  368. _mfr_data: bytes | None,
  369. _mfr_id: int | None = None,
  370. _switchbot_model: SwitchbotModel | None = None,
  371. ) -> dict[str, Any] | None:
  372. """Parse advertisement data."""
  373. _model = chr(_service_data[0] & 0b01111111) if _service_data else None
  374. if _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR:
  375. _model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model]
  376. if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA:
  377. for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]:
  378. if model_data.get("manufacturer_data_length") == len(_mfr_data):
  379. _model = model_chr
  380. break
  381. if (
  382. _service_data
  383. and len(_service_data) > 5
  384. and _service_data[-4:] in SUPPORTED_TYPES
  385. ):
  386. _model = _service_data[-4:]
  387. if not _model:
  388. return None
  389. _isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False
  390. data = {
  391. "rawAdvData": _service_data,
  392. "data": {},
  393. "model": _model,
  394. "isEncrypted": _isEncrypted,
  395. }
  396. type_data = SUPPORTED_TYPES.get(_model)
  397. if type_data:
  398. model_data = type_data["func"](_service_data, _mfr_data)
  399. if model_data:
  400. data.update(
  401. {
  402. "modelFriendlyName": type_data["modelFriendlyName"],
  403. "modelName": type_data["modelName"],
  404. "data": model_data,
  405. }
  406. )
  407. return data