adv_parser.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Callable
  5. from functools import lru_cache
  6. from typing import Any, TypedDict
  7. from bleak.backends.device import BLEDevice
  8. from bleak.backends.scanner import AdvertisementData
  9. from .adv_parsers.air_purifier import process_air_purifier
  10. from .adv_parsers.blind_tilt import process_woblindtilt
  11. from .adv_parsers.bot import process_wohand
  12. from .adv_parsers.bulb import process_color_bulb
  13. from .adv_parsers.ceiling_light import process_woceiling
  14. from .adv_parsers.climate_panel import process_climate_panel
  15. from .adv_parsers.contact import process_wocontact
  16. from .adv_parsers.curtain import process_wocurtain
  17. from .adv_parsers.fan import process_fan
  18. from .adv_parsers.hub2 import process_wohub2
  19. from .adv_parsers.hub3 import process_hub3
  20. from .adv_parsers.hubmini_matter import process_hubmini_matter
  21. from .adv_parsers.humidifier import process_evaporative_humidifier, process_wohumidifier
  22. from .adv_parsers.keypad import process_wokeypad
  23. from .adv_parsers.leak import process_leak
  24. from .adv_parsers.light_strip import process_light, process_rgbic_light, process_wostrip
  25. from .adv_parsers.lock import (
  26. process_lock2,
  27. process_locklite,
  28. process_wolock,
  29. process_wolock_pro,
  30. )
  31. from .adv_parsers.meter import process_wosensorth, process_wosensorth_c
  32. from .adv_parsers.motion import process_wopresence
  33. from .adv_parsers.plug import process_woplugmini
  34. from .adv_parsers.relay_switch import (
  35. process_garage_door_opener,
  36. process_relay_switch_1pm,
  37. process_relay_switch_2pm,
  38. process_relay_switch_common_data,
  39. )
  40. from .adv_parsers.remote import process_woremote
  41. from .adv_parsers.roller_shade import process_worollershade
  42. from .adv_parsers.smart_thermostat_radiator import process_smart_thermostat_radiator
  43. from .adv_parsers.vacuum import process_vacuum, process_vacuum_k
  44. from .const import SwitchbotModel
  45. from .models import SwitchBotAdvertisement
  46. from .utils import format_mac_upper
  47. _LOGGER = logging.getLogger(__name__)
  48. SERVICE_DATA_ORDER = (
  49. "0000fd3d-0000-1000-8000-00805f9b34fb",
  50. "00000d00-0000-1000-8000-00805f9b34fb",
  51. )
  52. MFR_DATA_ORDER = (2409, 741, 89)
  53. _MODEL_TO_MAC_CACHE: dict[str, SwitchbotModel] = {}
  54. class SwitchbotSupportedType(TypedDict):
  55. """Supported type of Switchbot."""
  56. modelName: SwitchbotModel
  57. modelFriendlyName: str
  58. func: Callable[[bytes, bytes | None], dict[str, bool | int]]
  59. manufacturer_id: int | None
  60. manufacturer_data_length: int | None
  61. SUPPORTED_TYPES: dict[str | bytes, SwitchbotSupportedType] = {
  62. "d": {
  63. "modelName": SwitchbotModel.CONTACT_SENSOR,
  64. "modelFriendlyName": "Contact Sensor",
  65. "func": process_wocontact,
  66. "manufacturer_id": 2409,
  67. },
  68. "H": {
  69. "modelName": SwitchbotModel.BOT,
  70. "modelFriendlyName": "Bot",
  71. "func": process_wohand,
  72. "manufacturer_id": 89,
  73. },
  74. "s": {
  75. "modelName": SwitchbotModel.MOTION_SENSOR,
  76. "modelFriendlyName": "Motion Sensor",
  77. "func": process_wopresence,
  78. "manufacturer_id": 2409,
  79. },
  80. "r": {
  81. "modelName": SwitchbotModel.LIGHT_STRIP,
  82. "modelFriendlyName": "Light Strip",
  83. "func": process_wostrip,
  84. "manufacturer_id": 2409,
  85. },
  86. "{": {
  87. "modelName": SwitchbotModel.CURTAIN,
  88. "modelFriendlyName": "Curtain 3",
  89. "func": process_wocurtain,
  90. "manufacturer_id": 2409,
  91. },
  92. "c": {
  93. "modelName": SwitchbotModel.CURTAIN,
  94. "modelFriendlyName": "Curtain",
  95. "func": process_wocurtain,
  96. "manufacturer_id": 2409,
  97. },
  98. "w": {
  99. "modelName": SwitchbotModel.IO_METER,
  100. "modelFriendlyName": "Indoor/Outdoor Meter",
  101. "func": process_wosensorth,
  102. "manufacturer_id": 2409,
  103. },
  104. "i": {
  105. "modelName": SwitchbotModel.METER,
  106. "modelFriendlyName": "Meter Plus",
  107. "func": process_wosensorth,
  108. "manufacturer_id": 2409,
  109. },
  110. "T": {
  111. "modelName": SwitchbotModel.METER,
  112. "modelFriendlyName": "Meter",
  113. "func": process_wosensorth,
  114. "manufacturer_id": 2409,
  115. },
  116. "4": {
  117. "modelName": SwitchbotModel.METER_PRO,
  118. "modelFriendlyName": "Meter Pro",
  119. "func": process_wosensorth,
  120. "manufacturer_id": 2409,
  121. },
  122. "5": {
  123. "modelName": SwitchbotModel.METER_PRO_C,
  124. "modelFriendlyName": "Meter Pro CO2",
  125. "func": process_wosensorth_c,
  126. "manufacturer_id": 2409,
  127. },
  128. "v": {
  129. "modelName": SwitchbotModel.HUB2,
  130. "modelFriendlyName": "Hub 2",
  131. "func": process_wohub2,
  132. "manufacturer_id": 2409,
  133. },
  134. "g": {
  135. "modelName": SwitchbotModel.PLUG_MINI,
  136. "modelFriendlyName": "Plug Mini",
  137. "func": process_woplugmini,
  138. "manufacturer_id": 2409,
  139. },
  140. "j": {
  141. "modelName": SwitchbotModel.PLUG_MINI,
  142. "modelFriendlyName": "Plug Mini (JP)",
  143. "func": process_woplugmini,
  144. "manufacturer_id": 2409,
  145. },
  146. "u": {
  147. "modelName": SwitchbotModel.COLOR_BULB,
  148. "modelFriendlyName": "Color Bulb",
  149. "func": process_color_bulb,
  150. "manufacturer_id": 2409,
  151. },
  152. "q": {
  153. "modelName": SwitchbotModel.CEILING_LIGHT,
  154. "modelFriendlyName": "Ceiling Light",
  155. "func": process_woceiling,
  156. "manufacturer_id": 2409,
  157. },
  158. "n": {
  159. "modelName": SwitchbotModel.CEILING_LIGHT,
  160. "modelFriendlyName": "Ceiling Light Pro",
  161. "func": process_woceiling,
  162. "manufacturer_id": 2409,
  163. },
  164. "e": {
  165. "modelName": SwitchbotModel.HUMIDIFIER,
  166. "modelFriendlyName": "Humidifier",
  167. "func": process_wohumidifier,
  168. "manufacturer_id": 741,
  169. "manufacturer_data_length": 6,
  170. },
  171. "#": {
  172. "modelName": SwitchbotModel.EVAPORATIVE_HUMIDIFIER,
  173. "modelFriendlyName": "Evaporative Humidifier",
  174. "func": process_evaporative_humidifier,
  175. "manufacturer_id": 2409,
  176. },
  177. "o": {
  178. "modelName": SwitchbotModel.LOCK,
  179. "modelFriendlyName": "Lock",
  180. "func": process_wolock,
  181. "manufacturer_id": 2409,
  182. },
  183. "$": {
  184. "modelName": SwitchbotModel.LOCK_PRO,
  185. "modelFriendlyName": "Lock Pro",
  186. "func": process_wolock_pro,
  187. "manufacturer_id": 2409,
  188. },
  189. "x": {
  190. "modelName": SwitchbotModel.BLIND_TILT,
  191. "modelFriendlyName": "Blind Tilt",
  192. "func": process_woblindtilt,
  193. "manufacturer_id": 2409,
  194. },
  195. "&": {
  196. "modelName": SwitchbotModel.LEAK,
  197. "modelFriendlyName": "Leak Detector",
  198. "func": process_leak,
  199. "manufacturer_id": 2409,
  200. },
  201. "y": {
  202. "modelName": SwitchbotModel.KEYPAD,
  203. "modelFriendlyName": "Keypad",
  204. "func": process_wokeypad,
  205. "manufacturer_id": 2409,
  206. },
  207. "<": {
  208. "modelName": SwitchbotModel.RELAY_SWITCH_1PM,
  209. "modelFriendlyName": "Relay Switch 1PM",
  210. "func": process_relay_switch_1pm,
  211. "manufacturer_id": 2409,
  212. },
  213. ";": {
  214. "modelName": SwitchbotModel.RELAY_SWITCH_1,
  215. "modelFriendlyName": "Relay Switch 1",
  216. "func": process_relay_switch_common_data,
  217. "manufacturer_id": 2409,
  218. },
  219. "b": {
  220. "modelName": SwitchbotModel.REMOTE,
  221. "modelFriendlyName": "Remote",
  222. "func": process_woremote,
  223. "manufacturer_id": 89,
  224. },
  225. ",": {
  226. "modelName": SwitchbotModel.ROLLER_SHADE,
  227. "modelFriendlyName": "Roller Shade",
  228. "func": process_worollershade,
  229. "manufacturer_id": 2409,
  230. },
  231. "%": {
  232. "modelName": SwitchbotModel.HUBMINI_MATTER,
  233. "modelFriendlyName": "HubMini Matter",
  234. "func": process_hubmini_matter,
  235. "manufacturer_id": 2409,
  236. },
  237. "~": {
  238. "modelName": SwitchbotModel.CIRCULATOR_FAN,
  239. "modelFriendlyName": "Circulator Fan",
  240. "func": process_fan,
  241. "manufacturer_id": 2409,
  242. },
  243. ".": {
  244. "modelName": SwitchbotModel.K20_VACUUM,
  245. "modelFriendlyName": "K20 Vacuum",
  246. "func": process_vacuum,
  247. "manufacturer_id": 2409,
  248. },
  249. "z": {
  250. "modelName": SwitchbotModel.S10_VACUUM,
  251. "modelFriendlyName": "S10 Vacuum",
  252. "func": process_vacuum,
  253. "manufacturer_id": 2409,
  254. },
  255. "3": {
  256. "modelName": SwitchbotModel.K10_PRO_COMBO_VACUUM,
  257. "modelFriendlyName": "K10+ Pro Combo Vacuum",
  258. "func": process_vacuum,
  259. "manufacturer_id": 2409,
  260. },
  261. "}": {
  262. "modelName": SwitchbotModel.K10_VACUUM,
  263. "modelFriendlyName": "K10+ Vacuum",
  264. "func": process_vacuum_k,
  265. "manufacturer_id": 2409,
  266. },
  267. "(": {
  268. "modelName": SwitchbotModel.K10_PRO_VACUUM,
  269. "modelFriendlyName": "K10+ Pro Vacuum",
  270. "func": process_vacuum_k,
  271. "manufacturer_id": 2409,
  272. },
  273. "*": {
  274. "modelName": SwitchbotModel.AIR_PURIFIER,
  275. "modelFriendlyName": "Air Purifier",
  276. "func": process_air_purifier,
  277. "manufacturer_id": 2409,
  278. },
  279. "+": {
  280. "modelName": SwitchbotModel.AIR_PURIFIER,
  281. "modelFriendlyName": "Air Purifier",
  282. "func": process_air_purifier,
  283. "manufacturer_id": 2409,
  284. },
  285. "7": {
  286. "modelName": SwitchbotModel.AIR_PURIFIER_TABLE,
  287. "modelFriendlyName": "Air Purifier Table",
  288. "func": process_air_purifier,
  289. "manufacturer_id": 2409,
  290. },
  291. "8": {
  292. "modelName": SwitchbotModel.AIR_PURIFIER_TABLE,
  293. "modelFriendlyName": "Air Purifier Table",
  294. "func": process_air_purifier,
  295. "manufacturer_id": 2409,
  296. },
  297. b"\x00\x10\xb9\x40": {
  298. "modelName": SwitchbotModel.HUB3,
  299. "modelFriendlyName": "Hub3",
  300. "func": process_hub3,
  301. "manufacturer_id": 2409,
  302. },
  303. "-": {
  304. "modelName": SwitchbotModel.LOCK_LITE,
  305. "modelFriendlyName": "Lock Lite",
  306. "func": process_locklite,
  307. "manufacturer_id": 2409,
  308. },
  309. b"\x00\x10\xa5\xb8": {
  310. "modelName": SwitchbotModel.LOCK_ULTRA,
  311. "modelFriendlyName": "Lock Ultra",
  312. "func": process_lock2,
  313. "manufacturer_id": 2409,
  314. },
  315. ">": {
  316. "modelName": SwitchbotModel.GARAGE_DOOR_OPENER,
  317. "modelFriendlyName": "Garage Door Opener",
  318. "func": process_garage_door_opener,
  319. "manufacturer_id": 2409,
  320. },
  321. "=": {
  322. "modelName": SwitchbotModel.RELAY_SWITCH_2PM,
  323. "modelFriendlyName": "Relay Switch 2PM",
  324. "func": process_relay_switch_2pm,
  325. "manufacturer_id": 2409,
  326. },
  327. b"\x00\x10\xd0\xb0": {
  328. "modelName": SwitchbotModel.FLOOR_LAMP,
  329. "modelFriendlyName": "Floor Lamp",
  330. "func": process_light,
  331. "manufacturer_id": 2409,
  332. },
  333. b"\x00\x10\xd0\xb1": {
  334. "modelName": SwitchbotModel.STRIP_LIGHT_3,
  335. "modelFriendlyName": "Strip Light 3",
  336. "func": process_light,
  337. "manufacturer_id": 2409,
  338. },
  339. "?": {
  340. "modelName": SwitchbotModel.PLUG_MINI_EU,
  341. "modelFriendlyName": "Plug Mini (EU)",
  342. "func": process_relay_switch_1pm,
  343. "manufacturer_id": 2409,
  344. },
  345. b"\x00\x10\xd0\xb3": {
  346. "modelName": SwitchbotModel.RGBICWW_STRIP_LIGHT,
  347. "modelFriendlyName": "RGBICWW Strip Light",
  348. "func": process_rgbic_light,
  349. "manufacturer_id": 2409,
  350. },
  351. b"\x00\x10\xd0\xb4": {
  352. "modelName": SwitchbotModel.RGBICWW_FLOOR_LAMP,
  353. "modelFriendlyName": "RGBICWW Floor Lamp",
  354. "func": process_rgbic_light,
  355. "manufacturer_id": 2409,
  356. },
  357. b"\x00\x10\xfb\xa8": {
  358. "modelName": SwitchbotModel.K11_VACUUM,
  359. "modelFriendlyName": "K11+ Vacuum",
  360. "func": process_vacuum,
  361. "manufacturer_id": 2409,
  362. },
  363. b"\x00\x10\xf3\xd8": {
  364. "modelName": SwitchbotModel.CLIMATE_PANEL,
  365. "modelFriendlyName": "Climate Panel",
  366. "func": process_climate_panel,
  367. "manufacturer_id": 2409,
  368. },
  369. b"\x00\x116@": {
  370. "modelName": SwitchbotModel.SMART_THERMOSTAT_RADIATOR,
  371. "modelFriendlyName": "Smart Thermostat Radiator",
  372. "func": process_smart_thermostat_radiator,
  373. "manufacturer_id": 2409,
  374. },
  375. }
  376. _SWITCHBOT_MODEL_TO_CHAR = {
  377. model_data["modelName"]: model_chr
  378. for model_chr, model_data in SUPPORTED_TYPES.items()
  379. }
  380. MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = {
  381. mfr_id: [] for mfr_id in MFR_DATA_ORDER
  382. }
  383. for model_chr, model in SUPPORTED_TYPES.items():
  384. if "manufacturer_id" in model:
  385. mfr_id = model["manufacturer_id"]
  386. MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model))
  387. def parse_advertisement_data(
  388. device: BLEDevice,
  389. advertisement_data: AdvertisementData,
  390. model: SwitchbotModel | None = None,
  391. ) -> SwitchBotAdvertisement | None:
  392. """Parse advertisement data."""
  393. upper_mac = format_mac_upper(device.address)
  394. if model is None and upper_mac in _MODEL_TO_MAC_CACHE:
  395. model = _MODEL_TO_MAC_CACHE[upper_mac]
  396. service_data = advertisement_data.service_data
  397. _service_data = None
  398. for uuid in SERVICE_DATA_ORDER:
  399. if uuid in service_data:
  400. _service_data = service_data[uuid]
  401. break
  402. _mfr_data = None
  403. _mfr_id = None
  404. for mfr_id in MFR_DATA_ORDER:
  405. if mfr_id in advertisement_data.manufacturer_data:
  406. _mfr_id = mfr_id
  407. _mfr_data = advertisement_data.manufacturer_data[mfr_id]
  408. break
  409. if _mfr_data is None and _service_data is None:
  410. return None
  411. try:
  412. data = _parse_data(
  413. _service_data,
  414. _mfr_data,
  415. _mfr_id,
  416. model,
  417. )
  418. except Exception: # pylint: disable=broad-except
  419. _LOGGER.exception("Failed to parse advertisement data: %s", advertisement_data)
  420. return None
  421. if not data:
  422. return None
  423. return SwitchBotAdvertisement(
  424. device.address, data, device, advertisement_data.rssi, bool(_service_data)
  425. )
  426. @lru_cache(maxsize=128)
  427. def _parse_data(
  428. _service_data: bytes | None,
  429. _mfr_data: bytes | None,
  430. _mfr_id: int | None = None,
  431. _switchbot_model: SwitchbotModel | None = None,
  432. ) -> dict[str, Any] | None:
  433. """Parse advertisement data."""
  434. _model = chr(_service_data[0] & 0b01111111) if _service_data else None
  435. if _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR:
  436. _model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model]
  437. if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA:
  438. for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]:
  439. if model_data.get("manufacturer_data_length") == len(_mfr_data):
  440. _model = model_chr
  441. break
  442. if _service_data and len(_service_data) > 5:
  443. for s in (_service_data[-4:], _service_data[-5:-1]):
  444. if s in SUPPORTED_TYPES:
  445. _model = s
  446. break
  447. if not _model:
  448. return None
  449. _isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False
  450. data = {
  451. "rawAdvData": _service_data,
  452. "data": {},
  453. "model": _model,
  454. "isEncrypted": _isEncrypted,
  455. }
  456. type_data = SUPPORTED_TYPES.get(_model)
  457. if type_data:
  458. model_data = type_data["func"](_service_data, _mfr_data)
  459. if model_data:
  460. data.update(
  461. {
  462. "modelFriendlyName": type_data["modelFriendlyName"],
  463. "modelName": type_data["modelName"],
  464. "data": model_data,
  465. }
  466. )
  467. return data
  468. def populate_model_to_mac_cache(mac: str, model: SwitchbotModel) -> None:
  469. """Populate the model to MAC address cache."""
  470. _MODEL_TO_MAC_CACHE[mac] = model