adv_parser.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Callable
  5. from functools import lru_cache
  6. from typing import Any, TypedDict
  7. from bleak.backends.device import BLEDevice
  8. from bleak.backends.scanner import AdvertisementData
  9. from .adv_parsers.blind_tilt import process_woblindtilt
  10. from .adv_parsers.bot import process_wohand
  11. from .adv_parsers.bulb import process_color_bulb
  12. from .adv_parsers.ceiling_light import process_woceiling
  13. from .adv_parsers.contact import process_wocontact
  14. from .adv_parsers.curtain import process_wocurtain
  15. from .adv_parsers.hub2 import process_wohub2
  16. from .adv_parsers.hubmini_matter import process_hubmini_matter
  17. from .adv_parsers.humidifier import process_evaporative_humidifier, process_wohumidifier
  18. from .adv_parsers.keypad import process_wokeypad
  19. from .adv_parsers.leak import process_leak
  20. from .adv_parsers.light_strip import process_wostrip
  21. from .adv_parsers.lock import process_wolock, process_wolock_pro
  22. from .adv_parsers.meter import process_wosensorth, process_wosensorth_c
  23. from .adv_parsers.motion import process_wopresence
  24. from .adv_parsers.plug import process_woplugmini
  25. from .adv_parsers.relay_switch import (
  26. process_worelay_switch_1,
  27. process_worelay_switch_1pm,
  28. )
  29. from .adv_parsers.remote import process_woremote
  30. from .adv_parsers.roller_shade import process_worollershade
  31. from .const import SwitchbotModel
  32. from .models import SwitchBotAdvertisement
  33. _LOGGER = logging.getLogger(__name__)
  34. SERVICE_DATA_ORDER = (
  35. "0000fd3d-0000-1000-8000-00805f9b34fb",
  36. "00000d00-0000-1000-8000-00805f9b34fb",
  37. )
  38. MFR_DATA_ORDER = (2409, 741, 89)
  39. class SwitchbotSupportedType(TypedDict):
  40. """Supported type of Switchbot."""
  41. modelName: SwitchbotModel
  42. modelFriendlyName: str
  43. func: Callable[[bytes, bytes | None], dict[str, bool | int]]
  44. manufacturer_id: int | None
  45. manufacturer_data_length: int | None
  46. SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
  47. "d": {
  48. "modelName": SwitchbotModel.CONTACT_SENSOR,
  49. "modelFriendlyName": "Contact Sensor",
  50. "func": process_wocontact,
  51. "manufacturer_id": 2409,
  52. },
  53. "H": {
  54. "modelName": SwitchbotModel.BOT,
  55. "modelFriendlyName": "Bot",
  56. "func": process_wohand,
  57. "manufacturer_id": 89,
  58. },
  59. "s": {
  60. "modelName": SwitchbotModel.MOTION_SENSOR,
  61. "modelFriendlyName": "Motion Sensor",
  62. "func": process_wopresence,
  63. "manufacturer_id": 2409,
  64. },
  65. "r": {
  66. "modelName": SwitchbotModel.LIGHT_STRIP,
  67. "modelFriendlyName": "Light Strip",
  68. "func": process_wostrip,
  69. "manufacturer_id": 2409,
  70. },
  71. "{": {
  72. "modelName": SwitchbotModel.CURTAIN,
  73. "modelFriendlyName": "Curtain 3",
  74. "func": process_wocurtain,
  75. "manufacturer_id": 2409,
  76. },
  77. "c": {
  78. "modelName": SwitchbotModel.CURTAIN,
  79. "modelFriendlyName": "Curtain",
  80. "func": process_wocurtain,
  81. "manufacturer_id": 2409,
  82. },
  83. "w": {
  84. "modelName": SwitchbotModel.IO_METER,
  85. "modelFriendlyName": "Indoor/Outdoor Meter",
  86. "func": process_wosensorth,
  87. "manufacturer_id": 2409,
  88. },
  89. "i": {
  90. "modelName": SwitchbotModel.METER,
  91. "modelFriendlyName": "Meter Plus",
  92. "func": process_wosensorth,
  93. "manufacturer_id": 2409,
  94. },
  95. "T": {
  96. "modelName": SwitchbotModel.METER,
  97. "modelFriendlyName": "Meter",
  98. "func": process_wosensorth,
  99. "manufacturer_id": 2409,
  100. },
  101. "4": {
  102. "modelName": SwitchbotModel.METER_PRO,
  103. "modelFriendlyName": "Meter",
  104. "func": process_wosensorth,
  105. "manufacturer_id": 2409,
  106. },
  107. "5": {
  108. "modelName": SwitchbotModel.METER_PRO_C,
  109. "modelFriendlyName": "Meter",
  110. "func": process_wosensorth_c,
  111. "manufacturer_id": 2409,
  112. },
  113. "v": {
  114. "modelName": SwitchbotModel.HUB2,
  115. "modelFriendlyName": "Hub 2",
  116. "func": process_wohub2,
  117. "manufacturer_id": 2409,
  118. },
  119. "g": {
  120. "modelName": SwitchbotModel.PLUG_MINI,
  121. "modelFriendlyName": "Plug Mini",
  122. "func": process_woplugmini,
  123. "manufacturer_id": 2409,
  124. },
  125. "j": {
  126. "modelName": SwitchbotModel.PLUG_MINI,
  127. "modelFriendlyName": "Plug Mini (JP)",
  128. "func": process_woplugmini,
  129. "manufacturer_id": 2409,
  130. },
  131. "u": {
  132. "modelName": SwitchbotModel.COLOR_BULB,
  133. "modelFriendlyName": "Color Bulb",
  134. "func": process_color_bulb,
  135. "manufacturer_id": 2409,
  136. },
  137. "q": {
  138. "modelName": SwitchbotModel.CEILING_LIGHT,
  139. "modelFriendlyName": "Ceiling Light",
  140. "func": process_woceiling,
  141. "manufacturer_id": 2409,
  142. },
  143. "n": {
  144. "modelName": SwitchbotModel.CEILING_LIGHT,
  145. "modelFriendlyName": "Ceiling Light Pro",
  146. "func": process_woceiling,
  147. "manufacturer_id": 2409,
  148. },
  149. "e": {
  150. "modelName": SwitchbotModel.HUMIDIFIER,
  151. "modelFriendlyName": "Humidifier",
  152. "func": process_wohumidifier,
  153. "manufacturer_id": 741,
  154. "manufacturer_data_length": 6,
  155. },
  156. "#": {
  157. "modelName": SwitchbotModel.EVAPORATIVE_HUMIDIFIER,
  158. "modelFriendlyName": "Evaporative Humidifier",
  159. "func": process_evaporative_humidifier,
  160. "manufacturer_id": 2409,
  161. },
  162. "o": {
  163. "modelName": SwitchbotModel.LOCK,
  164. "modelFriendlyName": "Lock",
  165. "func": process_wolock,
  166. "manufacturer_id": 2409,
  167. },
  168. "$": {
  169. "modelName": SwitchbotModel.LOCK_PRO,
  170. "modelFriendlyName": "Lock Pro",
  171. "func": process_wolock_pro,
  172. "manufacturer_id": 2409,
  173. },
  174. "x": {
  175. "modelName": SwitchbotModel.BLIND_TILT,
  176. "modelFriendlyName": "Blind Tilt",
  177. "func": process_woblindtilt,
  178. "manufacturer_id": 2409,
  179. },
  180. "&": {
  181. "modelName": SwitchbotModel.LEAK,
  182. "modelFriendlyName": "Leak Detector",
  183. "func": process_leak,
  184. "manufacturer_id": 2409,
  185. },
  186. "y": {
  187. "modelName": SwitchbotModel.KEYPAD,
  188. "modelFriendlyName": "Keypad",
  189. "func": process_wokeypad,
  190. "manufacturer_id": 2409,
  191. },
  192. "<": {
  193. "modelName": SwitchbotModel.RELAY_SWITCH_1PM,
  194. "modelFriendlyName": "Relay Switch 1PM",
  195. "func": process_worelay_switch_1pm,
  196. "manufacturer_id": 2409,
  197. },
  198. ";": {
  199. "modelName": SwitchbotModel.RELAY_SWITCH_1,
  200. "modelFriendlyName": "Relay Switch 1",
  201. "func": process_worelay_switch_1,
  202. "manufacturer_id": 2409,
  203. },
  204. "b": {
  205. "modelName": SwitchbotModel.REMOTE,
  206. "modelFriendlyName": "Remote",
  207. "func": process_woremote,
  208. "manufacturer_id": 89,
  209. },
  210. ",": {
  211. "modelName": SwitchbotModel.ROLLER_SHADE,
  212. "modelFriendlyName": "Roller Shade",
  213. "func": process_worollershade,
  214. "manufacturer_id": 2409,
  215. },
  216. "%": {
  217. "modelName": SwitchbotModel.HUBMINI_MATTER,
  218. "modelFriendlyName": "HubMini Matter",
  219. "func": process_hubmini_matter,
  220. "manufacturer_id": 2409,
  221. },
  222. }
  223. _SWITCHBOT_MODEL_TO_CHAR = {
  224. model_data["modelName"]: model_chr
  225. for model_chr, model_data in SUPPORTED_TYPES.items()
  226. }
  227. MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = {
  228. mfr_id: [] for mfr_id in MFR_DATA_ORDER
  229. }
  230. for model_chr, model in SUPPORTED_TYPES.items():
  231. if "manufacturer_id" in model:
  232. mfr_id = model["manufacturer_id"]
  233. MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model))
  234. def parse_advertisement_data(
  235. device: BLEDevice,
  236. advertisement_data: AdvertisementData,
  237. model: SwitchbotModel | None = None,
  238. ) -> SwitchBotAdvertisement | None:
  239. """Parse advertisement data."""
  240. service_data = advertisement_data.service_data
  241. _service_data = None
  242. for uuid in SERVICE_DATA_ORDER:
  243. if uuid in service_data:
  244. _service_data = service_data[uuid]
  245. break
  246. _mfr_data = None
  247. _mfr_id = None
  248. for mfr_id in MFR_DATA_ORDER:
  249. if mfr_id in advertisement_data.manufacturer_data:
  250. _mfr_id = mfr_id
  251. _mfr_data = advertisement_data.manufacturer_data[mfr_id]
  252. break
  253. if _mfr_data is None and _service_data is None:
  254. return None
  255. try:
  256. data = _parse_data(
  257. _service_data,
  258. _mfr_data,
  259. _mfr_id,
  260. model,
  261. )
  262. except Exception as err: # pylint: disable=broad-except
  263. _LOGGER.exception(
  264. "Failed to parse advertisement data: %s: %s", advertisement_data, err
  265. )
  266. return None
  267. if not data:
  268. return None
  269. return SwitchBotAdvertisement(
  270. device.address, data, device, advertisement_data.rssi, bool(_service_data)
  271. )
  272. @lru_cache(maxsize=128)
  273. def _parse_data(
  274. _service_data: bytes | None,
  275. _mfr_data: bytes | None,
  276. _mfr_id: int | None = None,
  277. _switchbot_model: SwitchbotModel | None = None,
  278. ) -> dict[str, Any] | None:
  279. """Parse advertisement data."""
  280. _model = chr(_service_data[0] & 0b01111111) if _service_data else None
  281. if _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR:
  282. _model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model]
  283. if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA:
  284. for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]:
  285. if model_data.get("manufacturer_data_length") == len(_mfr_data):
  286. _model = model_chr
  287. break
  288. if not _model:
  289. return None
  290. _isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False
  291. data = {
  292. "rawAdvData": _service_data,
  293. "data": {},
  294. "model": _model,
  295. "isEncrypted": _isEncrypted,
  296. }
  297. type_data = SUPPORTED_TYPES.get(_model)
  298. if type_data:
  299. model_data = type_data["func"](_service_data, _mfr_data)
  300. if model_data:
  301. data.update(
  302. {
  303. "modelFriendlyName": type_data["modelFriendlyName"],
  304. "modelName": type_data["modelName"],
  305. "data": model_data,
  306. }
  307. )
  308. return data