adv_parser.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Callable
  5. from functools import lru_cache
  6. from typing import Any, TypedDict
  7. from bleak.backends.device import BLEDevice
  8. from bleak.backends.scanner import AdvertisementData
  9. from .adv_parsers.air_purifier import process_air_purifier
  10. from .adv_parsers.blind_tilt import process_woblindtilt
  11. from .adv_parsers.bot import process_wohand
  12. from .adv_parsers.bulb import process_color_bulb
  13. from .adv_parsers.ceiling_light import process_woceiling
  14. from .adv_parsers.contact import process_wocontact
  15. from .adv_parsers.curtain import process_wocurtain
  16. from .adv_parsers.fan import process_fan
  17. from .adv_parsers.hub2 import process_wohub2
  18. from .adv_parsers.hub3 import process_hub3
  19. from .adv_parsers.hubmini_matter import process_hubmini_matter
  20. from .adv_parsers.humidifier import process_evaporative_humidifier, process_wohumidifier
  21. from .adv_parsers.keypad import process_wokeypad
  22. from .adv_parsers.leak import process_leak
  23. from .adv_parsers.light_strip import process_wostrip
  24. from .adv_parsers.lock import process_wolock, process_wolock_pro
  25. from .adv_parsers.meter import process_wosensorth, process_wosensorth_c
  26. from .adv_parsers.motion import process_wopresence
  27. from .adv_parsers.plug import process_woplugmini
  28. from .adv_parsers.relay_switch import (
  29. process_worelay_switch_1,
  30. process_worelay_switch_1pm,
  31. )
  32. from .adv_parsers.remote import process_woremote
  33. from .adv_parsers.roller_shade import process_worollershade
  34. from .adv_parsers.vacuum import process_vacuum, process_vacuum_k
  35. from .const import SwitchbotModel
  36. from .models import SwitchBotAdvertisement
  37. _LOGGER = logging.getLogger(__name__)
  38. SERVICE_DATA_ORDER = (
  39. "0000fd3d-0000-1000-8000-00805f9b34fb",
  40. "00000d00-0000-1000-8000-00805f9b34fb",
  41. )
  42. MFR_DATA_ORDER = (2409, 741, 89)
  43. class SwitchbotSupportedType(TypedDict):
  44. """Supported type of Switchbot."""
  45. modelName: SwitchbotModel
  46. modelFriendlyName: str
  47. func: Callable[[bytes, bytes | None], dict[str, bool | int]]
  48. manufacturer_id: int | None
  49. manufacturer_data_length: int | None
  50. SUPPORTED_TYPES: dict[str | bytes, SwitchbotSupportedType] = {
  51. "d": {
  52. "modelName": SwitchbotModel.CONTACT_SENSOR,
  53. "modelFriendlyName": "Contact Sensor",
  54. "func": process_wocontact,
  55. "manufacturer_id": 2409,
  56. },
  57. "H": {
  58. "modelName": SwitchbotModel.BOT,
  59. "modelFriendlyName": "Bot",
  60. "func": process_wohand,
  61. "manufacturer_id": 89,
  62. },
  63. "s": {
  64. "modelName": SwitchbotModel.MOTION_SENSOR,
  65. "modelFriendlyName": "Motion Sensor",
  66. "func": process_wopresence,
  67. "manufacturer_id": 2409,
  68. },
  69. "r": {
  70. "modelName": SwitchbotModel.LIGHT_STRIP,
  71. "modelFriendlyName": "Light Strip",
  72. "func": process_wostrip,
  73. "manufacturer_id": 2409,
  74. },
  75. "{": {
  76. "modelName": SwitchbotModel.CURTAIN,
  77. "modelFriendlyName": "Curtain 3",
  78. "func": process_wocurtain,
  79. "manufacturer_id": 2409,
  80. },
  81. "c": {
  82. "modelName": SwitchbotModel.CURTAIN,
  83. "modelFriendlyName": "Curtain",
  84. "func": process_wocurtain,
  85. "manufacturer_id": 2409,
  86. },
  87. "w": {
  88. "modelName": SwitchbotModel.IO_METER,
  89. "modelFriendlyName": "Indoor/Outdoor Meter",
  90. "func": process_wosensorth,
  91. "manufacturer_id": 2409,
  92. },
  93. "i": {
  94. "modelName": SwitchbotModel.METER,
  95. "modelFriendlyName": "Meter Plus",
  96. "func": process_wosensorth,
  97. "manufacturer_id": 2409,
  98. },
  99. "T": {
  100. "modelName": SwitchbotModel.METER,
  101. "modelFriendlyName": "Meter",
  102. "func": process_wosensorth,
  103. "manufacturer_id": 2409,
  104. },
  105. "4": {
  106. "modelName": SwitchbotModel.METER_PRO,
  107. "modelFriendlyName": "Meter",
  108. "func": process_wosensorth,
  109. "manufacturer_id": 2409,
  110. },
  111. "5": {
  112. "modelName": SwitchbotModel.METER_PRO_C,
  113. "modelFriendlyName": "Meter",
  114. "func": process_wosensorth_c,
  115. "manufacturer_id": 2409,
  116. },
  117. "v": {
  118. "modelName": SwitchbotModel.HUB2,
  119. "modelFriendlyName": "Hub 2",
  120. "func": process_wohub2,
  121. "manufacturer_id": 2409,
  122. },
  123. "g": {
  124. "modelName": SwitchbotModel.PLUG_MINI,
  125. "modelFriendlyName": "Plug Mini",
  126. "func": process_woplugmini,
  127. "manufacturer_id": 2409,
  128. },
  129. "j": {
  130. "modelName": SwitchbotModel.PLUG_MINI,
  131. "modelFriendlyName": "Plug Mini (JP)",
  132. "func": process_woplugmini,
  133. "manufacturer_id": 2409,
  134. },
  135. "u": {
  136. "modelName": SwitchbotModel.COLOR_BULB,
  137. "modelFriendlyName": "Color Bulb",
  138. "func": process_color_bulb,
  139. "manufacturer_id": 2409,
  140. },
  141. "q": {
  142. "modelName": SwitchbotModel.CEILING_LIGHT,
  143. "modelFriendlyName": "Ceiling Light",
  144. "func": process_woceiling,
  145. "manufacturer_id": 2409,
  146. },
  147. "n": {
  148. "modelName": SwitchbotModel.CEILING_LIGHT,
  149. "modelFriendlyName": "Ceiling Light Pro",
  150. "func": process_woceiling,
  151. "manufacturer_id": 2409,
  152. },
  153. "e": {
  154. "modelName": SwitchbotModel.HUMIDIFIER,
  155. "modelFriendlyName": "Humidifier",
  156. "func": process_wohumidifier,
  157. "manufacturer_id": 741,
  158. "manufacturer_data_length": 6,
  159. },
  160. "#": {
  161. "modelName": SwitchbotModel.EVAPORATIVE_HUMIDIFIER,
  162. "modelFriendlyName": "Evaporative Humidifier",
  163. "func": process_evaporative_humidifier,
  164. "manufacturer_id": 2409,
  165. },
  166. "o": {
  167. "modelName": SwitchbotModel.LOCK,
  168. "modelFriendlyName": "Lock",
  169. "func": process_wolock,
  170. "manufacturer_id": 2409,
  171. },
  172. "$": {
  173. "modelName": SwitchbotModel.LOCK_PRO,
  174. "modelFriendlyName": "Lock Pro",
  175. "func": process_wolock_pro,
  176. "manufacturer_id": 2409,
  177. },
  178. "x": {
  179. "modelName": SwitchbotModel.BLIND_TILT,
  180. "modelFriendlyName": "Blind Tilt",
  181. "func": process_woblindtilt,
  182. "manufacturer_id": 2409,
  183. },
  184. "&": {
  185. "modelName": SwitchbotModel.LEAK,
  186. "modelFriendlyName": "Leak Detector",
  187. "func": process_leak,
  188. "manufacturer_id": 2409,
  189. },
  190. "y": {
  191. "modelName": SwitchbotModel.KEYPAD,
  192. "modelFriendlyName": "Keypad",
  193. "func": process_wokeypad,
  194. "manufacturer_id": 2409,
  195. },
  196. "<": {
  197. "modelName": SwitchbotModel.RELAY_SWITCH_1PM,
  198. "modelFriendlyName": "Relay Switch 1PM",
  199. "func": process_worelay_switch_1pm,
  200. "manufacturer_id": 2409,
  201. },
  202. ";": {
  203. "modelName": SwitchbotModel.RELAY_SWITCH_1,
  204. "modelFriendlyName": "Relay Switch 1",
  205. "func": process_worelay_switch_1,
  206. "manufacturer_id": 2409,
  207. },
  208. "b": {
  209. "modelName": SwitchbotModel.REMOTE,
  210. "modelFriendlyName": "Remote",
  211. "func": process_woremote,
  212. "manufacturer_id": 89,
  213. },
  214. ",": {
  215. "modelName": SwitchbotModel.ROLLER_SHADE,
  216. "modelFriendlyName": "Roller Shade",
  217. "func": process_worollershade,
  218. "manufacturer_id": 2409,
  219. },
  220. "%": {
  221. "modelName": SwitchbotModel.HUBMINI_MATTER,
  222. "modelFriendlyName": "HubMini Matter",
  223. "func": process_hubmini_matter,
  224. "manufacturer_id": 2409,
  225. },
  226. "~": {
  227. "modelName": SwitchbotModel.CIRCULATOR_FAN,
  228. "modelFriendlyName": "Circulator Fan",
  229. "func": process_fan,
  230. "manufacturer_id": 2409,
  231. },
  232. ".": {
  233. "modelName": SwitchbotModel.K20_VACUUM,
  234. "modelFriendlyName": "K20 Vacuum",
  235. "func": process_vacuum,
  236. "manufacturer_id": 2409,
  237. },
  238. "z": {
  239. "modelName": SwitchbotModel.S10_VACUUM,
  240. "modelFriendlyName": "S10 Vacuum",
  241. "func": process_vacuum,
  242. "manufacturer_id": 2409,
  243. },
  244. "3": {
  245. "modelName": SwitchbotModel.K10_PRO_COMBO_VACUUM,
  246. "modelFriendlyName": "K10+ Pro Combo Vacuum",
  247. "func": process_vacuum,
  248. "manufacturer_id": 2409,
  249. },
  250. "}": {
  251. "modelName": SwitchbotModel.K10_VACUUM,
  252. "modelFriendlyName": "K10+ Vacuum",
  253. "func": process_vacuum_k,
  254. "manufacturer_id": 2409,
  255. },
  256. "(": {
  257. "modelName": SwitchbotModel.K10_PRO_VACUUM,
  258. "modelFriendlyName": "K10+ Pro Vacuum",
  259. "func": process_vacuum_k,
  260. "manufacturer_id": 2409,
  261. },
  262. "*": {
  263. "modelName": SwitchbotModel.AIR_PURIFIER,
  264. "modelFriendlyName": "Air Purifier",
  265. "func": process_air_purifier,
  266. "manufacturer_id": 2409,
  267. },
  268. "+": {
  269. "modelName": SwitchbotModel.AIR_PURIFIER,
  270. "modelFriendlyName": "Air Purifier",
  271. "func": process_air_purifier,
  272. "manufacturer_id": 2409,
  273. },
  274. "7": {
  275. "modelName": SwitchbotModel.AIR_PURIFIER_TABLE,
  276. "modelFriendlyName": "Air Purifier Table",
  277. "func": process_air_purifier,
  278. "manufacturer_id": 2409,
  279. },
  280. "8": {
  281. "modelName": SwitchbotModel.AIR_PURIFIER_TABLE,
  282. "modelFriendlyName": "Air Purifier Table",
  283. "func": process_air_purifier,
  284. "manufacturer_id": 2409,
  285. },
  286. b"\x00\x10\xb9\x40": {
  287. "modelName": SwitchbotModel.HUB3,
  288. "modelFriendlyName": "Hub3",
  289. "func": process_hub3,
  290. "manufacturer_id": 2409,
  291. },
  292. }
  293. _SWITCHBOT_MODEL_TO_CHAR = {
  294. model_data["modelName"]: model_chr
  295. for model_chr, model_data in SUPPORTED_TYPES.items()
  296. }
  297. MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = {
  298. mfr_id: [] for mfr_id in MFR_DATA_ORDER
  299. }
  300. for model_chr, model in SUPPORTED_TYPES.items():
  301. if "manufacturer_id" in model:
  302. mfr_id = model["manufacturer_id"]
  303. MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model))
  304. def parse_advertisement_data(
  305. device: BLEDevice,
  306. advertisement_data: AdvertisementData,
  307. model: SwitchbotModel | None = None,
  308. ) -> SwitchBotAdvertisement | None:
  309. """Parse advertisement data."""
  310. service_data = advertisement_data.service_data
  311. _service_data = None
  312. for uuid in SERVICE_DATA_ORDER:
  313. if uuid in service_data:
  314. _service_data = service_data[uuid]
  315. break
  316. _mfr_data = None
  317. _mfr_id = None
  318. for mfr_id in MFR_DATA_ORDER:
  319. if mfr_id in advertisement_data.manufacturer_data:
  320. _mfr_id = mfr_id
  321. _mfr_data = advertisement_data.manufacturer_data[mfr_id]
  322. break
  323. if _mfr_data is None and _service_data is None:
  324. return None
  325. try:
  326. data = _parse_data(
  327. _service_data,
  328. _mfr_data,
  329. _mfr_id,
  330. model,
  331. )
  332. except Exception: # pylint: disable=broad-except
  333. _LOGGER.exception("Failed to parse advertisement data: %s", advertisement_data)
  334. return None
  335. if not data:
  336. return None
  337. return SwitchBotAdvertisement(
  338. device.address, data, device, advertisement_data.rssi, bool(_service_data)
  339. )
  340. @lru_cache(maxsize=128)
  341. def _parse_data(
  342. _service_data: bytes | None,
  343. _mfr_data: bytes | None,
  344. _mfr_id: int | None = None,
  345. _switchbot_model: SwitchbotModel | None = None,
  346. ) -> dict[str, Any] | None:
  347. """Parse advertisement data."""
  348. _model = chr(_service_data[0] & 0b01111111) if _service_data else None
  349. if _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR:
  350. _model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model]
  351. if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA:
  352. for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]:
  353. if model_data.get("manufacturer_data_length") == len(_mfr_data):
  354. _model = model_chr
  355. break
  356. if (
  357. _service_data
  358. and len(_service_data) > 5
  359. and _service_data[-4:] in SUPPORTED_TYPES
  360. ):
  361. _model = _service_data[-4:]
  362. if not _model:
  363. return None
  364. _isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False
  365. data = {
  366. "rawAdvData": _service_data,
  367. "data": {},
  368. "model": _model,
  369. "isEncrypted": _isEncrypted,
  370. }
  371. type_data = SUPPORTED_TYPES.get(_model)
  372. if type_data:
  373. model_data = type_data["func"](_service_data, _mfr_data)
  374. if model_data:
  375. data.update(
  376. {
  377. "modelFriendlyName": type_data["modelFriendlyName"],
  378. "modelName": type_data["modelName"],
  379. "data": model_data,
  380. }
  381. )
  382. return data