adv_parser.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Callable
  5. from functools import lru_cache
  6. from typing import Any, TypedDict
  7. from bleak.backends.device import BLEDevice
  8. from bleak.backends.scanner import AdvertisementData
  9. from .adv_parsers.blind_tilt import process_woblindtilt
  10. from .adv_parsers.bot import process_wohand
  11. from .adv_parsers.bulb import process_color_bulb
  12. from .adv_parsers.ceiling_light import process_woceiling
  13. from .adv_parsers.contact import process_wocontact
  14. from .adv_parsers.curtain import process_wocurtain
  15. from .adv_parsers.hub2 import process_wohub2
  16. from .adv_parsers.humidifier import process_wohumidifier
  17. from .adv_parsers.light_strip import process_wostrip
  18. from .adv_parsers.lock import process_wolock, process_wolock_pro
  19. from .adv_parsers.meter import process_wosensorth, process_wosensorth_c
  20. from .adv_parsers.motion import process_wopresence
  21. from .adv_parsers.plug import process_woplugmini
  22. from .adv_parsers.relay_switch import (
  23. process_worelay_switch_1plus,
  24. process_worelay_switch_1pm,
  25. )
  26. from .const import SwitchbotModel
  27. from .models import SwitchBotAdvertisement
  28. _LOGGER = logging.getLogger(__name__)
  29. SERVICE_DATA_ORDER = (
  30. "0000fd3d-0000-1000-8000-00805f9b34fb",
  31. "00000d00-0000-1000-8000-00805f9b34fb",
  32. )
  33. MFR_DATA_ORDER = (2409, 741, 89)
  34. class SwitchbotSupportedType(TypedDict):
  35. """Supported type of Switchbot."""
  36. modelName: SwitchbotModel
  37. modelFriendlyName: str
  38. func: Callable[[bytes, bytes | None], dict[str, bool | int]]
  39. manufacturer_id: int | None
  40. manufacturer_data_length: int | None
  41. SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
  42. "d": {
  43. "modelName": SwitchbotModel.CONTACT_SENSOR,
  44. "modelFriendlyName": "Contact Sensor",
  45. "func": process_wocontact,
  46. "manufacturer_id": 2409,
  47. },
  48. "H": {
  49. "modelName": SwitchbotModel.BOT,
  50. "modelFriendlyName": "Bot",
  51. "func": process_wohand,
  52. "manufacturer_id": 89,
  53. },
  54. "s": {
  55. "modelName": SwitchbotModel.MOTION_SENSOR,
  56. "modelFriendlyName": "Motion Sensor",
  57. "func": process_wopresence,
  58. "manufacturer_id": 2409,
  59. },
  60. "r": {
  61. "modelName": SwitchbotModel.LIGHT_STRIP,
  62. "modelFriendlyName": "Light Strip",
  63. "func": process_wostrip,
  64. "manufacturer_id": 2409,
  65. },
  66. "{": {
  67. "modelName": SwitchbotModel.CURTAIN,
  68. "modelFriendlyName": "Curtain 3",
  69. "func": process_wocurtain,
  70. "manufacturer_id": 2409,
  71. },
  72. "c": {
  73. "modelName": SwitchbotModel.CURTAIN,
  74. "modelFriendlyName": "Curtain",
  75. "func": process_wocurtain,
  76. "manufacturer_id": 2409,
  77. },
  78. "w": {
  79. "modelName": SwitchbotModel.IO_METER,
  80. "modelFriendlyName": "Indoor/Outdoor Meter",
  81. "func": process_wosensorth,
  82. "manufacturer_id": 2409,
  83. },
  84. "i": {
  85. "modelName": SwitchbotModel.METER,
  86. "modelFriendlyName": "Meter Plus",
  87. "func": process_wosensorth,
  88. "manufacturer_id": 2409,
  89. },
  90. "T": {
  91. "modelName": SwitchbotModel.METER,
  92. "modelFriendlyName": "Meter",
  93. "func": process_wosensorth,
  94. "manufacturer_id": 2409,
  95. },
  96. "4": {
  97. "modelName": SwitchbotModel.METER_PRO,
  98. "modelFriendlyName": "Meter",
  99. "func": process_wosensorth,
  100. "manufacturer_id": 2409,
  101. },
  102. "5": {
  103. "modelName": SwitchbotModel.METER_PRO_C,
  104. "modelFriendlyName": "Meter",
  105. "func": process_wosensorth_c,
  106. "manufacturer_id": 2409,
  107. },
  108. "v": {
  109. "modelName": SwitchbotModel.HUB2,
  110. "modelFriendlyName": "Hub 2",
  111. "func": process_wohub2,
  112. "manufacturer_id": 2409,
  113. },
  114. "g": {
  115. "modelName": SwitchbotModel.PLUG_MINI,
  116. "modelFriendlyName": "Plug Mini",
  117. "func": process_woplugmini,
  118. "manufacturer_id": 2409,
  119. },
  120. "j": {
  121. "modelName": SwitchbotModel.PLUG_MINI,
  122. "modelFriendlyName": "Plug Mini (JP)",
  123. "func": process_woplugmini,
  124. "manufacturer_id": 2409,
  125. },
  126. "u": {
  127. "modelName": SwitchbotModel.COLOR_BULB,
  128. "modelFriendlyName": "Color Bulb",
  129. "func": process_color_bulb,
  130. "manufacturer_id": 2409,
  131. },
  132. "q": {
  133. "modelName": SwitchbotModel.CEILING_LIGHT,
  134. "modelFriendlyName": "Ceiling Light",
  135. "func": process_woceiling,
  136. "manufacturer_id": 2409,
  137. },
  138. "n": {
  139. "modelName": SwitchbotModel.CEILING_LIGHT,
  140. "modelFriendlyName": "Ceiling Light Pro",
  141. "func": process_woceiling,
  142. "manufacturer_id": 2409,
  143. },
  144. "e": {
  145. "modelName": SwitchbotModel.HUMIDIFIER,
  146. "modelFriendlyName": "Humidifier",
  147. "func": process_wohumidifier,
  148. "manufacturer_id": 741,
  149. "manufacturer_data_length": 6,
  150. },
  151. "o": {
  152. "modelName": SwitchbotModel.LOCK,
  153. "modelFriendlyName": "Lock",
  154. "func": process_wolock,
  155. "manufacturer_id": 2409,
  156. },
  157. "$": {
  158. "modelName": SwitchbotModel.LOCK_PRO,
  159. "modelFriendlyName": "Lock Pro",
  160. "func": process_wolock_pro,
  161. "manufacturer_id": 2409,
  162. },
  163. "x": {
  164. "modelName": SwitchbotModel.BLIND_TILT,
  165. "modelFriendlyName": "Blind Tilt",
  166. "func": process_woblindtilt,
  167. "manufacturer_id": 2409,
  168. },
  169. "<": {
  170. "modelName": SwitchbotModel.RelaySwitch1PM,
  171. "modelFriendlyName": "Relay Switch 1PM",
  172. "func": process_worelay_switch_1pm,
  173. "manufacturer_id": 2409,
  174. },
  175. ";": {
  176. "modelName": SwitchbotModel.RelaySwitch1Plus,
  177. "modelFriendlyName": "Relay Switch 1",
  178. "func": process_worelay_switch_1plus,
  179. "manufacturer_id": 2409,
  180. },
  181. }
  182. _SWITCHBOT_MODEL_TO_CHAR = {
  183. model_data["modelName"]: model_chr
  184. for model_chr, model_data in SUPPORTED_TYPES.items()
  185. }
  186. MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = {
  187. mfr_id: [] for mfr_id in MFR_DATA_ORDER
  188. }
  189. for model_chr, model in SUPPORTED_TYPES.items():
  190. if "manufacturer_id" in model:
  191. mfr_id = model["manufacturer_id"]
  192. MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model))
  193. def parse_advertisement_data(
  194. device: BLEDevice,
  195. advertisement_data: AdvertisementData,
  196. model: SwitchbotModel | None = None,
  197. ) -> SwitchBotAdvertisement | None:
  198. """Parse advertisement data."""
  199. service_data = advertisement_data.service_data
  200. _service_data = None
  201. for uuid in SERVICE_DATA_ORDER:
  202. if uuid in service_data:
  203. _service_data = service_data[uuid]
  204. break
  205. _mfr_data = None
  206. _mfr_id = None
  207. for mfr_id in MFR_DATA_ORDER:
  208. if mfr_id in advertisement_data.manufacturer_data:
  209. _mfr_id = mfr_id
  210. _mfr_data = advertisement_data.manufacturer_data[mfr_id]
  211. break
  212. if _mfr_data is None and _service_data is None:
  213. return None
  214. try:
  215. data = _parse_data(
  216. _service_data,
  217. _mfr_data,
  218. _mfr_id,
  219. model,
  220. )
  221. except Exception as err: # pylint: disable=broad-except
  222. _LOGGER.exception(
  223. "Failed to parse advertisement data: %s: %s", advertisement_data, err
  224. )
  225. return None
  226. if not data:
  227. return None
  228. return SwitchBotAdvertisement(
  229. device.address, data, device, advertisement_data.rssi, bool(_service_data)
  230. )
  231. @lru_cache(maxsize=128)
  232. def _parse_data(
  233. _service_data: bytes | None,
  234. _mfr_data: bytes | None,
  235. _mfr_id: int | None = None,
  236. _switchbot_model: SwitchbotModel | None = None,
  237. ) -> dict[str, Any] | None:
  238. """Parse advertisement data."""
  239. _model = chr(_service_data[0] & 0b01111111) if _service_data else None
  240. if _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR:
  241. _model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model]
  242. if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA:
  243. for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]:
  244. if model_data.get("manufacturer_data_length") == len(_mfr_data):
  245. _model = model_chr
  246. break
  247. if not _model:
  248. return None
  249. _isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False
  250. data = {
  251. "rawAdvData": _service_data,
  252. "data": {},
  253. "model": _model,
  254. "isEncrypted": _isEncrypted,
  255. }
  256. type_data = SUPPORTED_TYPES.get(_model)
  257. if type_data:
  258. model_data = type_data["func"](_service_data, _mfr_data)
  259. if model_data:
  260. data.update(
  261. {
  262. "modelFriendlyName": type_data["modelFriendlyName"],
  263. "modelName": type_data["modelName"],
  264. "data": model_data,
  265. }
  266. )
  267. return data