adv_parser.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Callable
  5. from functools import lru_cache
  6. from typing import Any, TypedDict
  7. from bleak.backends.device import BLEDevice
  8. from bleak.backends.scanner import AdvertisementData
  9. from .adv_parsers.blind_tilt import process_woblindtilt
  10. from .adv_parsers.bot import process_wohand
  11. from .adv_parsers.bulb import process_color_bulb
  12. from .adv_parsers.ceiling_light import process_woceiling
  13. from .adv_parsers.contact import process_wocontact
  14. from .adv_parsers.curtain import process_wocurtain
  15. from .adv_parsers.fan import process_fan
  16. from .adv_parsers.hub2 import process_wohub2
  17. from .adv_parsers.hubmini_matter import process_hubmini_matter
  18. from .adv_parsers.humidifier import process_evaporative_humidifier, process_wohumidifier
  19. from .adv_parsers.keypad import process_wokeypad
  20. from .adv_parsers.leak import process_leak
  21. from .adv_parsers.light_strip import process_wostrip
  22. from .adv_parsers.lock import process_wolock, process_wolock_pro
  23. from .adv_parsers.meter import process_wosensorth, process_wosensorth_c
  24. from .adv_parsers.motion import process_wopresence
  25. from .adv_parsers.plug import process_woplugmini
  26. from .adv_parsers.relay_switch import (
  27. process_worelay_switch_1,
  28. process_worelay_switch_1pm,
  29. )
  30. from .adv_parsers.remote import process_woremote
  31. from .adv_parsers.roller_shade import process_worollershade
  32. from .adv_parsers.vacuum import process_vacuum, process_vacuum_k
  33. from .const import SwitchbotModel
  34. from .models import SwitchBotAdvertisement
  35. _LOGGER = logging.getLogger(__name__)
  36. SERVICE_DATA_ORDER = (
  37. "0000fd3d-0000-1000-8000-00805f9b34fb",
  38. "00000d00-0000-1000-8000-00805f9b34fb",
  39. )
  40. MFR_DATA_ORDER = (2409, 741, 89)
  41. class SwitchbotSupportedType(TypedDict):
  42. """Supported type of Switchbot."""
  43. modelName: SwitchbotModel
  44. modelFriendlyName: str
  45. func: Callable[[bytes, bytes | None], dict[str, bool | int]]
  46. manufacturer_id: int | None
  47. manufacturer_data_length: int | None
  48. SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
  49. "d": {
  50. "modelName": SwitchbotModel.CONTACT_SENSOR,
  51. "modelFriendlyName": "Contact Sensor",
  52. "func": process_wocontact,
  53. "manufacturer_id": 2409,
  54. },
  55. "H": {
  56. "modelName": SwitchbotModel.BOT,
  57. "modelFriendlyName": "Bot",
  58. "func": process_wohand,
  59. "manufacturer_id": 89,
  60. },
  61. "s": {
  62. "modelName": SwitchbotModel.MOTION_SENSOR,
  63. "modelFriendlyName": "Motion Sensor",
  64. "func": process_wopresence,
  65. "manufacturer_id": 2409,
  66. },
  67. "r": {
  68. "modelName": SwitchbotModel.LIGHT_STRIP,
  69. "modelFriendlyName": "Light Strip",
  70. "func": process_wostrip,
  71. "manufacturer_id": 2409,
  72. },
  73. "{": {
  74. "modelName": SwitchbotModel.CURTAIN,
  75. "modelFriendlyName": "Curtain 3",
  76. "func": process_wocurtain,
  77. "manufacturer_id": 2409,
  78. },
  79. "c": {
  80. "modelName": SwitchbotModel.CURTAIN,
  81. "modelFriendlyName": "Curtain",
  82. "func": process_wocurtain,
  83. "manufacturer_id": 2409,
  84. },
  85. "w": {
  86. "modelName": SwitchbotModel.IO_METER,
  87. "modelFriendlyName": "Indoor/Outdoor Meter",
  88. "func": process_wosensorth,
  89. "manufacturer_id": 2409,
  90. },
  91. "i": {
  92. "modelName": SwitchbotModel.METER,
  93. "modelFriendlyName": "Meter Plus",
  94. "func": process_wosensorth,
  95. "manufacturer_id": 2409,
  96. },
  97. "T": {
  98. "modelName": SwitchbotModel.METER,
  99. "modelFriendlyName": "Meter",
  100. "func": process_wosensorth,
  101. "manufacturer_id": 2409,
  102. },
  103. "4": {
  104. "modelName": SwitchbotModel.METER_PRO,
  105. "modelFriendlyName": "Meter",
  106. "func": process_wosensorth,
  107. "manufacturer_id": 2409,
  108. },
  109. "5": {
  110. "modelName": SwitchbotModel.METER_PRO_C,
  111. "modelFriendlyName": "Meter",
  112. "func": process_wosensorth_c,
  113. "manufacturer_id": 2409,
  114. },
  115. "v": {
  116. "modelName": SwitchbotModel.HUB2,
  117. "modelFriendlyName": "Hub 2",
  118. "func": process_wohub2,
  119. "manufacturer_id": 2409,
  120. },
  121. "g": {
  122. "modelName": SwitchbotModel.PLUG_MINI,
  123. "modelFriendlyName": "Plug Mini",
  124. "func": process_woplugmini,
  125. "manufacturer_id": 2409,
  126. },
  127. "j": {
  128. "modelName": SwitchbotModel.PLUG_MINI,
  129. "modelFriendlyName": "Plug Mini (JP)",
  130. "func": process_woplugmini,
  131. "manufacturer_id": 2409,
  132. },
  133. "u": {
  134. "modelName": SwitchbotModel.COLOR_BULB,
  135. "modelFriendlyName": "Color Bulb",
  136. "func": process_color_bulb,
  137. "manufacturer_id": 2409,
  138. },
  139. "q": {
  140. "modelName": SwitchbotModel.CEILING_LIGHT,
  141. "modelFriendlyName": "Ceiling Light",
  142. "func": process_woceiling,
  143. "manufacturer_id": 2409,
  144. },
  145. "n": {
  146. "modelName": SwitchbotModel.CEILING_LIGHT,
  147. "modelFriendlyName": "Ceiling Light Pro",
  148. "func": process_woceiling,
  149. "manufacturer_id": 2409,
  150. },
  151. "e": {
  152. "modelName": SwitchbotModel.HUMIDIFIER,
  153. "modelFriendlyName": "Humidifier",
  154. "func": process_wohumidifier,
  155. "manufacturer_id": 741,
  156. "manufacturer_data_length": 6,
  157. },
  158. "#": {
  159. "modelName": SwitchbotModel.EVAPORATIVE_HUMIDIFIER,
  160. "modelFriendlyName": "Evaporative Humidifier",
  161. "func": process_evaporative_humidifier,
  162. "manufacturer_id": 2409,
  163. },
  164. "o": {
  165. "modelName": SwitchbotModel.LOCK,
  166. "modelFriendlyName": "Lock",
  167. "func": process_wolock,
  168. "manufacturer_id": 2409,
  169. },
  170. "$": {
  171. "modelName": SwitchbotModel.LOCK_PRO,
  172. "modelFriendlyName": "Lock Pro",
  173. "func": process_wolock_pro,
  174. "manufacturer_id": 2409,
  175. },
  176. "x": {
  177. "modelName": SwitchbotModel.BLIND_TILT,
  178. "modelFriendlyName": "Blind Tilt",
  179. "func": process_woblindtilt,
  180. "manufacturer_id": 2409,
  181. },
  182. "&": {
  183. "modelName": SwitchbotModel.LEAK,
  184. "modelFriendlyName": "Leak Detector",
  185. "func": process_leak,
  186. "manufacturer_id": 2409,
  187. },
  188. "y": {
  189. "modelName": SwitchbotModel.KEYPAD,
  190. "modelFriendlyName": "Keypad",
  191. "func": process_wokeypad,
  192. "manufacturer_id": 2409,
  193. },
  194. "<": {
  195. "modelName": SwitchbotModel.RELAY_SWITCH_1PM,
  196. "modelFriendlyName": "Relay Switch 1PM",
  197. "func": process_worelay_switch_1pm,
  198. "manufacturer_id": 2409,
  199. },
  200. ";": {
  201. "modelName": SwitchbotModel.RELAY_SWITCH_1,
  202. "modelFriendlyName": "Relay Switch 1",
  203. "func": process_worelay_switch_1,
  204. "manufacturer_id": 2409,
  205. },
  206. "b": {
  207. "modelName": SwitchbotModel.REMOTE,
  208. "modelFriendlyName": "Remote",
  209. "func": process_woremote,
  210. "manufacturer_id": 89,
  211. },
  212. ",": {
  213. "modelName": SwitchbotModel.ROLLER_SHADE,
  214. "modelFriendlyName": "Roller Shade",
  215. "func": process_worollershade,
  216. "manufacturer_id": 2409,
  217. },
  218. "%": {
  219. "modelName": SwitchbotModel.HUBMINI_MATTER,
  220. "modelFriendlyName": "HubMini Matter",
  221. "func": process_hubmini_matter,
  222. "manufacturer_id": 2409,
  223. },
  224. "~": {
  225. "modelName": SwitchbotModel.CIRCULATOR_FAN,
  226. "modelFriendlyName": "Circulator Fan",
  227. "func": process_fan,
  228. "manufacturer_id": 2409,
  229. },
  230. ".": {
  231. "modelName": SwitchbotModel.K20_VACUUM,
  232. "modelFriendlyName": "K20 Vacuum",
  233. "func": process_vacuum,
  234. "manufacturer_id": 2409,
  235. },
  236. "z": {
  237. "modelName": SwitchbotModel.S10_VACUUM,
  238. "modelFriendlyName": "S10 Vacuum",
  239. "func": process_vacuum,
  240. "manufacturer_id": 2409,
  241. },
  242. "3": {
  243. "modelName": SwitchbotModel.K10_PRO_COMBO_VACUUM,
  244. "modelFriendlyName": "K10+ Pro Combo Vacuum",
  245. "func": process_vacuum,
  246. "manufacturer_id": 2409,
  247. },
  248. "}": {
  249. "modelName": SwitchbotModel.K10_VACUUM,
  250. "modelFriendlyName": "K10+ Vacuum",
  251. "func": process_vacuum_k,
  252. "manufacturer_id": 2409,
  253. },
  254. "(": {
  255. "modelName": SwitchbotModel.K10_PRO_VACUUM,
  256. "modelFriendlyName": "K10+ Pro Vacuum",
  257. "func": process_vacuum_k,
  258. "manufacturer_id": 2409,
  259. },
  260. }
  261. _SWITCHBOT_MODEL_TO_CHAR = {
  262. model_data["modelName"]: model_chr
  263. for model_chr, model_data in SUPPORTED_TYPES.items()
  264. }
  265. MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = {
  266. mfr_id: [] for mfr_id in MFR_DATA_ORDER
  267. }
  268. for model_chr, model in SUPPORTED_TYPES.items():
  269. if "manufacturer_id" in model:
  270. mfr_id = model["manufacturer_id"]
  271. MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model))
  272. def parse_advertisement_data(
  273. device: BLEDevice,
  274. advertisement_data: AdvertisementData,
  275. model: SwitchbotModel | None = None,
  276. ) -> SwitchBotAdvertisement | None:
  277. """Parse advertisement data."""
  278. service_data = advertisement_data.service_data
  279. _service_data = None
  280. for uuid in SERVICE_DATA_ORDER:
  281. if uuid in service_data:
  282. _service_data = service_data[uuid]
  283. break
  284. _mfr_data = None
  285. _mfr_id = None
  286. for mfr_id in MFR_DATA_ORDER:
  287. if mfr_id in advertisement_data.manufacturer_data:
  288. _mfr_id = mfr_id
  289. _mfr_data = advertisement_data.manufacturer_data[mfr_id]
  290. break
  291. if _mfr_data is None and _service_data is None:
  292. return None
  293. try:
  294. data = _parse_data(
  295. _service_data,
  296. _mfr_data,
  297. _mfr_id,
  298. model,
  299. )
  300. except Exception: # pylint: disable=broad-except
  301. _LOGGER.exception("Failed to parse advertisement data: %s", advertisement_data)
  302. return None
  303. if not data:
  304. return None
  305. return SwitchBotAdvertisement(
  306. device.address, data, device, advertisement_data.rssi, bool(_service_data)
  307. )
  308. @lru_cache(maxsize=128)
  309. def _parse_data(
  310. _service_data: bytes | None,
  311. _mfr_data: bytes | None,
  312. _mfr_id: int | None = None,
  313. _switchbot_model: SwitchbotModel | None = None,
  314. ) -> dict[str, Any] | None:
  315. """Parse advertisement data."""
  316. _model = chr(_service_data[0] & 0b01111111) if _service_data else None
  317. if _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR:
  318. _model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model]
  319. if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA:
  320. for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]:
  321. if model_data.get("manufacturer_data_length") == len(_mfr_data):
  322. _model = model_chr
  323. break
  324. if not _model:
  325. return None
  326. _isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False
  327. data = {
  328. "rawAdvData": _service_data,
  329. "data": {},
  330. "model": _model,
  331. "isEncrypted": _isEncrypted,
  332. }
  333. type_data = SUPPORTED_TYPES.get(_model)
  334. if type_data:
  335. model_data = type_data["func"](_service_data, _mfr_data)
  336. if model_data:
  337. data.update(
  338. {
  339. "modelFriendlyName": type_data["modelFriendlyName"],
  340. "modelName": type_data["modelName"],
  341. "data": model_data,
  342. }
  343. )
  344. return data