adv_parser.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Callable
  5. from functools import lru_cache
  6. from typing import Any, TypedDict
  7. from bleak.backends.device import BLEDevice
  8. from bleak.backends.scanner import AdvertisementData
  9. from .adv_parsers.air_purifier import process_air_purifier
  10. from .adv_parsers.blind_tilt import process_woblindtilt
  11. from .adv_parsers.bot import process_wohand
  12. from .adv_parsers.bulb import process_color_bulb
  13. from .adv_parsers.ceiling_light import process_woceiling
  14. from .adv_parsers.contact import process_wocontact
  15. from .adv_parsers.curtain import process_wocurtain
  16. from .adv_parsers.fan import process_fan
  17. from .adv_parsers.hub2 import process_wohub2
  18. from .adv_parsers.hub3 import process_hub3
  19. from .adv_parsers.hubmini_matter import process_hubmini_matter
  20. from .adv_parsers.humidifier import process_evaporative_humidifier, process_wohumidifier
  21. from .adv_parsers.keypad import process_wokeypad
  22. from .adv_parsers.leak import process_leak
  23. from .adv_parsers.light_strip import process_light, process_rgbic_light, process_wostrip
  24. from .adv_parsers.lock import (
  25. process_lock2,
  26. process_locklite,
  27. process_wolock,
  28. process_wolock_pro,
  29. )
  30. from .adv_parsers.meter import process_wosensorth, process_wosensorth_c
  31. from .adv_parsers.motion import process_wopresence
  32. from .adv_parsers.plug import process_woplugmini
  33. from .adv_parsers.relay_switch import (
  34. process_garage_door_opener,
  35. process_relay_switch_1pm,
  36. process_relay_switch_2pm,
  37. process_relay_switch_common_data,
  38. )
  39. from .adv_parsers.remote import process_woremote
  40. from .adv_parsers.roller_shade import process_worollershade
  41. from .adv_parsers.vacuum import process_vacuum, process_vacuum_k
  42. from .const import SwitchbotModel
  43. from .models import SwitchBotAdvertisement
  44. from .utils import format_mac_upper
  45. _LOGGER = logging.getLogger(__name__)
  46. SERVICE_DATA_ORDER = (
  47. "0000fd3d-0000-1000-8000-00805f9b34fb",
  48. "00000d00-0000-1000-8000-00805f9b34fb",
  49. )
  50. MFR_DATA_ORDER = (2409, 741, 89)
  51. _MODEL_TO_MAC_CACHE: dict[str, SwitchbotModel] = {}
  52. class SwitchbotSupportedType(TypedDict):
  53. """Supported type of Switchbot."""
  54. modelName: SwitchbotModel
  55. modelFriendlyName: str
  56. func: Callable[[bytes, bytes | None], dict[str, bool | int]]
  57. manufacturer_id: int | None
  58. manufacturer_data_length: int | None
  59. SUPPORTED_TYPES: dict[str | bytes, SwitchbotSupportedType] = {
  60. "d": {
  61. "modelName": SwitchbotModel.CONTACT_SENSOR,
  62. "modelFriendlyName": "Contact Sensor",
  63. "func": process_wocontact,
  64. "manufacturer_id": 2409,
  65. },
  66. "H": {
  67. "modelName": SwitchbotModel.BOT,
  68. "modelFriendlyName": "Bot",
  69. "func": process_wohand,
  70. "manufacturer_id": 89,
  71. },
  72. "s": {
  73. "modelName": SwitchbotModel.MOTION_SENSOR,
  74. "modelFriendlyName": "Motion Sensor",
  75. "func": process_wopresence,
  76. "manufacturer_id": 2409,
  77. },
  78. "r": {
  79. "modelName": SwitchbotModel.LIGHT_STRIP,
  80. "modelFriendlyName": "Light Strip",
  81. "func": process_wostrip,
  82. "manufacturer_id": 2409,
  83. },
  84. "{": {
  85. "modelName": SwitchbotModel.CURTAIN,
  86. "modelFriendlyName": "Curtain 3",
  87. "func": process_wocurtain,
  88. "manufacturer_id": 2409,
  89. },
  90. "c": {
  91. "modelName": SwitchbotModel.CURTAIN,
  92. "modelFriendlyName": "Curtain",
  93. "func": process_wocurtain,
  94. "manufacturer_id": 2409,
  95. },
  96. "w": {
  97. "modelName": SwitchbotModel.IO_METER,
  98. "modelFriendlyName": "Indoor/Outdoor Meter",
  99. "func": process_wosensorth,
  100. "manufacturer_id": 2409,
  101. },
  102. "i": {
  103. "modelName": SwitchbotModel.METER,
  104. "modelFriendlyName": "Meter Plus",
  105. "func": process_wosensorth,
  106. "manufacturer_id": 2409,
  107. },
  108. "T": {
  109. "modelName": SwitchbotModel.METER,
  110. "modelFriendlyName": "Meter",
  111. "func": process_wosensorth,
  112. "manufacturer_id": 2409,
  113. },
  114. "4": {
  115. "modelName": SwitchbotModel.METER_PRO,
  116. "modelFriendlyName": "Meter Pro",
  117. "func": process_wosensorth,
  118. "manufacturer_id": 2409,
  119. },
  120. "5": {
  121. "modelName": SwitchbotModel.METER_PRO_C,
  122. "modelFriendlyName": "Meter Pro CO2",
  123. "func": process_wosensorth_c,
  124. "manufacturer_id": 2409,
  125. },
  126. "v": {
  127. "modelName": SwitchbotModel.HUB2,
  128. "modelFriendlyName": "Hub 2",
  129. "func": process_wohub2,
  130. "manufacturer_id": 2409,
  131. },
  132. "g": {
  133. "modelName": SwitchbotModel.PLUG_MINI,
  134. "modelFriendlyName": "Plug Mini",
  135. "func": process_woplugmini,
  136. "manufacturer_id": 2409,
  137. },
  138. "j": {
  139. "modelName": SwitchbotModel.PLUG_MINI,
  140. "modelFriendlyName": "Plug Mini (JP)",
  141. "func": process_woplugmini,
  142. "manufacturer_id": 2409,
  143. },
  144. "u": {
  145. "modelName": SwitchbotModel.COLOR_BULB,
  146. "modelFriendlyName": "Color Bulb",
  147. "func": process_color_bulb,
  148. "manufacturer_id": 2409,
  149. },
  150. "q": {
  151. "modelName": SwitchbotModel.CEILING_LIGHT,
  152. "modelFriendlyName": "Ceiling Light",
  153. "func": process_woceiling,
  154. "manufacturer_id": 2409,
  155. },
  156. "n": {
  157. "modelName": SwitchbotModel.CEILING_LIGHT,
  158. "modelFriendlyName": "Ceiling Light Pro",
  159. "func": process_woceiling,
  160. "manufacturer_id": 2409,
  161. },
  162. "e": {
  163. "modelName": SwitchbotModel.HUMIDIFIER,
  164. "modelFriendlyName": "Humidifier",
  165. "func": process_wohumidifier,
  166. "manufacturer_id": 741,
  167. "manufacturer_data_length": 6,
  168. },
  169. "#": {
  170. "modelName": SwitchbotModel.EVAPORATIVE_HUMIDIFIER,
  171. "modelFriendlyName": "Evaporative Humidifier",
  172. "func": process_evaporative_humidifier,
  173. "manufacturer_id": 2409,
  174. },
  175. "o": {
  176. "modelName": SwitchbotModel.LOCK,
  177. "modelFriendlyName": "Lock",
  178. "func": process_wolock,
  179. "manufacturer_id": 2409,
  180. },
  181. "$": {
  182. "modelName": SwitchbotModel.LOCK_PRO,
  183. "modelFriendlyName": "Lock Pro",
  184. "func": process_wolock_pro,
  185. "manufacturer_id": 2409,
  186. },
  187. "x": {
  188. "modelName": SwitchbotModel.BLIND_TILT,
  189. "modelFriendlyName": "Blind Tilt",
  190. "func": process_woblindtilt,
  191. "manufacturer_id": 2409,
  192. },
  193. "&": {
  194. "modelName": SwitchbotModel.LEAK,
  195. "modelFriendlyName": "Leak Detector",
  196. "func": process_leak,
  197. "manufacturer_id": 2409,
  198. },
  199. "y": {
  200. "modelName": SwitchbotModel.KEYPAD,
  201. "modelFriendlyName": "Keypad",
  202. "func": process_wokeypad,
  203. "manufacturer_id": 2409,
  204. },
  205. "<": {
  206. "modelName": SwitchbotModel.RELAY_SWITCH_1PM,
  207. "modelFriendlyName": "Relay Switch 1PM",
  208. "func": process_relay_switch_1pm,
  209. "manufacturer_id": 2409,
  210. },
  211. ";": {
  212. "modelName": SwitchbotModel.RELAY_SWITCH_1,
  213. "modelFriendlyName": "Relay Switch 1",
  214. "func": process_relay_switch_common_data,
  215. "manufacturer_id": 2409,
  216. },
  217. "b": {
  218. "modelName": SwitchbotModel.REMOTE,
  219. "modelFriendlyName": "Remote",
  220. "func": process_woremote,
  221. "manufacturer_id": 89,
  222. },
  223. ",": {
  224. "modelName": SwitchbotModel.ROLLER_SHADE,
  225. "modelFriendlyName": "Roller Shade",
  226. "func": process_worollershade,
  227. "manufacturer_id": 2409,
  228. },
  229. "%": {
  230. "modelName": SwitchbotModel.HUBMINI_MATTER,
  231. "modelFriendlyName": "HubMini Matter",
  232. "func": process_hubmini_matter,
  233. "manufacturer_id": 2409,
  234. },
  235. "~": {
  236. "modelName": SwitchbotModel.CIRCULATOR_FAN,
  237. "modelFriendlyName": "Circulator Fan",
  238. "func": process_fan,
  239. "manufacturer_id": 2409,
  240. },
  241. ".": {
  242. "modelName": SwitchbotModel.K20_VACUUM,
  243. "modelFriendlyName": "K20 Vacuum",
  244. "func": process_vacuum,
  245. "manufacturer_id": 2409,
  246. },
  247. "z": {
  248. "modelName": SwitchbotModel.S10_VACUUM,
  249. "modelFriendlyName": "S10 Vacuum",
  250. "func": process_vacuum,
  251. "manufacturer_id": 2409,
  252. },
  253. "3": {
  254. "modelName": SwitchbotModel.K10_PRO_COMBO_VACUUM,
  255. "modelFriendlyName": "K10+ Pro Combo Vacuum",
  256. "func": process_vacuum,
  257. "manufacturer_id": 2409,
  258. },
  259. "}": {
  260. "modelName": SwitchbotModel.K10_VACUUM,
  261. "modelFriendlyName": "K10+ Vacuum",
  262. "func": process_vacuum_k,
  263. "manufacturer_id": 2409,
  264. },
  265. "(": {
  266. "modelName": SwitchbotModel.K10_PRO_VACUUM,
  267. "modelFriendlyName": "K10+ Pro Vacuum",
  268. "func": process_vacuum_k,
  269. "manufacturer_id": 2409,
  270. },
  271. "*": {
  272. "modelName": SwitchbotModel.AIR_PURIFIER,
  273. "modelFriendlyName": "Air Purifier",
  274. "func": process_air_purifier,
  275. "manufacturer_id": 2409,
  276. },
  277. "+": {
  278. "modelName": SwitchbotModel.AIR_PURIFIER,
  279. "modelFriendlyName": "Air Purifier",
  280. "func": process_air_purifier,
  281. "manufacturer_id": 2409,
  282. },
  283. "7": {
  284. "modelName": SwitchbotModel.AIR_PURIFIER_TABLE,
  285. "modelFriendlyName": "Air Purifier Table",
  286. "func": process_air_purifier,
  287. "manufacturer_id": 2409,
  288. },
  289. "8": {
  290. "modelName": SwitchbotModel.AIR_PURIFIER_TABLE,
  291. "modelFriendlyName": "Air Purifier Table",
  292. "func": process_air_purifier,
  293. "manufacturer_id": 2409,
  294. },
  295. b"\x00\x10\xb9\x40": {
  296. "modelName": SwitchbotModel.HUB3,
  297. "modelFriendlyName": "Hub3",
  298. "func": process_hub3,
  299. "manufacturer_id": 2409,
  300. },
  301. "-": {
  302. "modelName": SwitchbotModel.LOCK_LITE,
  303. "modelFriendlyName": "Lock Lite",
  304. "func": process_locklite,
  305. "manufacturer_id": 2409,
  306. },
  307. b"\x00\x10\xa5\xb8": {
  308. "modelName": SwitchbotModel.LOCK_ULTRA,
  309. "modelFriendlyName": "Lock Ultra",
  310. "func": process_lock2,
  311. "manufacturer_id": 2409,
  312. },
  313. ">": {
  314. "modelName": SwitchbotModel.GARAGE_DOOR_OPENER,
  315. "modelFriendlyName": "Garage Door Opener",
  316. "func": process_garage_door_opener,
  317. "manufacturer_id": 2409,
  318. },
  319. "=": {
  320. "modelName": SwitchbotModel.RELAY_SWITCH_2PM,
  321. "modelFriendlyName": "Relay Switch 2PM",
  322. "func": process_relay_switch_2pm,
  323. "manufacturer_id": 2409,
  324. },
  325. b"\x00\x10\xd0\xb0": {
  326. "modelName": SwitchbotModel.FLOOR_LAMP,
  327. "modelFriendlyName": "Floor Lamp",
  328. "func": process_light,
  329. "manufacturer_id": 2409,
  330. },
  331. b"\x00\x10\xd0\xb1": {
  332. "modelName": SwitchbotModel.STRIP_LIGHT_3,
  333. "modelFriendlyName": "Strip Light 3",
  334. "func": process_light,
  335. "manufacturer_id": 2409,
  336. },
  337. "?": {
  338. "modelName": SwitchbotModel.PLUG_MINI_EU,
  339. "modelFriendlyName": "Plug Mini (EU)",
  340. "func": process_relay_switch_1pm,
  341. "manufacturer_id": 2409,
  342. },
  343. b"\x00\x10\xd0\xb3": {
  344. "modelName": SwitchbotModel.RGBICWW_STRIP_LIGHT,
  345. "modelFriendlyName": "RGBICWW Strip Light",
  346. "func": process_rgbic_light,
  347. "manufacturer_id": 2409,
  348. },
  349. b"\x00\x10\xd0\xb4": {
  350. "modelName": SwitchbotModel.RGBICWW_FLOOR_LAMP,
  351. "modelFriendlyName": "RGBICWW Floor Lamp",
  352. "func": process_rgbic_light,
  353. "manufacturer_id": 2409,
  354. },
  355. b"\x00\x10\xfb\xa8": {
  356. "modelName": SwitchbotModel.K11_VACUUM,
  357. "modelFriendlyName": "K11+ Vacuum",
  358. "func": process_vacuum,
  359. "manufacturer_id": 2409,
  360. },
  361. }
  362. _SWITCHBOT_MODEL_TO_CHAR = {
  363. model_data["modelName"]: model_chr
  364. for model_chr, model_data in SUPPORTED_TYPES.items()
  365. }
  366. MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = {
  367. mfr_id: [] for mfr_id in MFR_DATA_ORDER
  368. }
  369. for model_chr, model in SUPPORTED_TYPES.items():
  370. if "manufacturer_id" in model:
  371. mfr_id = model["manufacturer_id"]
  372. MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model))
  373. def parse_advertisement_data(
  374. device: BLEDevice,
  375. advertisement_data: AdvertisementData,
  376. model: SwitchbotModel | None = None,
  377. ) -> SwitchBotAdvertisement | None:
  378. """Parse advertisement data."""
  379. upper_mac = format_mac_upper(device.address)
  380. if model is None and upper_mac in _MODEL_TO_MAC_CACHE:
  381. model = _MODEL_TO_MAC_CACHE[upper_mac]
  382. service_data = advertisement_data.service_data
  383. _service_data = None
  384. for uuid in SERVICE_DATA_ORDER:
  385. if uuid in service_data:
  386. _service_data = service_data[uuid]
  387. break
  388. _mfr_data = None
  389. _mfr_id = None
  390. for mfr_id in MFR_DATA_ORDER:
  391. if mfr_id in advertisement_data.manufacturer_data:
  392. _mfr_id = mfr_id
  393. _mfr_data = advertisement_data.manufacturer_data[mfr_id]
  394. break
  395. if _mfr_data is None and _service_data is None:
  396. return None
  397. try:
  398. data = _parse_data(
  399. _service_data,
  400. _mfr_data,
  401. _mfr_id,
  402. model,
  403. )
  404. except Exception: # pylint: disable=broad-except
  405. _LOGGER.exception("Failed to parse advertisement data: %s", advertisement_data)
  406. return None
  407. if not data:
  408. return None
  409. return SwitchBotAdvertisement(
  410. device.address, data, device, advertisement_data.rssi, bool(_service_data)
  411. )
  412. @lru_cache(maxsize=128)
  413. def _parse_data(
  414. _service_data: bytes | None,
  415. _mfr_data: bytes | None,
  416. _mfr_id: int | None = None,
  417. _switchbot_model: SwitchbotModel | None = None,
  418. ) -> dict[str, Any] | None:
  419. """Parse advertisement data."""
  420. _model = chr(_service_data[0] & 0b01111111) if _service_data else None
  421. if _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR:
  422. _model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model]
  423. if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA:
  424. for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]:
  425. if model_data.get("manufacturer_data_length") == len(_mfr_data):
  426. _model = model_chr
  427. break
  428. if (
  429. _service_data
  430. and len(_service_data) > 5
  431. and _service_data[-4:] in SUPPORTED_TYPES
  432. ):
  433. _model = _service_data[-4:]
  434. if not _model:
  435. return None
  436. _isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False
  437. data = {
  438. "rawAdvData": _service_data,
  439. "data": {},
  440. "model": _model,
  441. "isEncrypted": _isEncrypted,
  442. }
  443. type_data = SUPPORTED_TYPES.get(_model)
  444. if type_data:
  445. model_data = type_data["func"](_service_data, _mfr_data)
  446. if model_data:
  447. data.update(
  448. {
  449. "modelFriendlyName": type_data["modelFriendlyName"],
  450. "modelName": type_data["modelName"],
  451. "data": model_data,
  452. }
  453. )
  454. return data
  455. def populate_model_to_mac_cache(mac: str, model: SwitchbotModel) -> None:
  456. """Populate the model to MAC address cache."""
  457. _MODEL_TO_MAC_CACHE[mac] = model