adv_parser.py 4.5 KB

  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Callable
  5. from functools import lru_cache
  6. from typing import TypedDict
  7. from bleak.backends.device import BLEDevice
  8. from bleak.backends.scanner import AdvertisementData
  9. from .adv_parsers.bot import process_wohand
  10. from .adv_parsers.bulb import process_color_bulb
  11. from .adv_parsers.ceiling_light import process_woceiling
  12. from .adv_parsers.contact import process_wocontact
  13. from .adv_parsers.curtain import process_wocurtain
  14. from .adv_parsers.humidifier import process_wohumidifier
  15. from .adv_parsers.light_strip import process_wostrip
  16. from .adv_parsers.meter import process_wosensorth
  17. from .adv_parsers.motion import process_wopresence
  18. from .adv_parsers.plug import process_woplugmini
  19. from .const import SwitchbotModel
  20. from .models import SwitchBotAdvertisement
  21. _LOGGER = logging.getLogger(__name__)
  23. "0000fd3d-0000-1000-8000-00805f9b34fb",
  24. "00000d00-0000-1000-8000-00805f9b34fb",
  25. )
  26. class SwitchbotSupportedType(TypedDict):
  27. """Supported type of Switchbot."""
  28. modelName: SwitchbotModel
  29. modelFriendlyName: str
  30. func: Callable[[bytes, bytes | None], dict[str, bool | int]]
  31. SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
  32. "d": {
  33. "modelName": SwitchbotModel.CONTACT_SENSOR,
  34. "modelFriendlyName": "Contact Sensor",
  35. "func": process_wocontact,
  36. },
  37. "H": {
  38. "modelName": SwitchbotModel.BOT,
  39. "modelFriendlyName": "Bot",
  40. "func": process_wohand,
  41. },
  42. "s": {
  43. "modelName": SwitchbotModel.MOTION_SENSOR,
  44. "modelFriendlyName": "Motion Sensor",
  45. "func": process_wopresence,
  46. },
  47. "r": {
  48. "modelName": SwitchbotModel.LIGHT_STRIP,
  49. "modelFriendlyName": "Light Strip",
  50. "func": process_wostrip,
  51. },
  52. "c": {
  53. "modelName": SwitchbotModel.CURTAIN,
  54. "modelFriendlyName": "Curtain",
  55. "func": process_wocurtain,
  56. },
  57. "T": {
  58. "modelName": SwitchbotModel.METER,
  59. "modelFriendlyName": "Meter",
  60. "func": process_wosensorth,
  61. },
  62. "i": {
  63. "modelName": SwitchbotModel.METER,
  64. "modelFriendlyName": "Meter Plus",
  65. "func": process_wosensorth,
  66. },
  67. "g": {
  68. "modelName": SwitchbotModel.PLUG_MINI,
  69. "modelFriendlyName": "Plug Mini",
  70. "func": process_woplugmini,
  71. },
  72. "u": {
  73. "modelName": SwitchbotModel.COLOR_BULB,
  74. "modelFriendlyName": "Color Bulb",
  75. "func": process_color_bulb,
  76. },
  77. "q": {
  78. "modelName": SwitchbotModel.CEILING_LIGHT,
  79. "modelFriendlyName": "Ceiling Light",
  80. "func": process_woceiling,
  81. },
  82. "e": {
  83. "modelName": SwitchbotModel.HUMIDIFIER,
  84. "modelFriendlyName": "Humidifier",
  85. "func": process_wohumidifier,
  86. },
  87. }
  88. def parse_advertisement_data(
  89. device: BLEDevice, advertisement_data: AdvertisementData
  90. ) -> SwitchBotAdvertisement | None:
  91. """Parse advertisement data."""
  92. _mgr_datas = list(advertisement_data.manufacturer_data.values())
  93. service_data = advertisement_data.service_data
  94. if not service_data:
  95. return None
  96. _service_data = None
  97. for uuid in SERVICE_DATA_ORDER:
  98. if uuid in service_data:
  99. _service_data = service_data[uuid]
  100. break
  101. if not _service_data:
  102. return None
  103. _mfr_data = _mgr_datas[0] if _mgr_datas else None
  104. try:
  105. data = _parse_data(_service_data, _mfr_data)
  106. except Exception as err: # pylint: disable=broad-except
  107. _LOGGER.exception(
  108. "Failed to parse advertisement data: %s: %s", advertisement_data, err
  109. )
  110. return None
  111. return SwitchBotAdvertisement(device.address, data, device, advertisement_data.rssi)
  112. @lru_cache(maxsize=128)
  113. def _parse_data(
  114. _service_data: bytes, _mfr_data: bytes | None
  115. ) -> SwitchBotAdvertisement | None:
  116. """Parse advertisement data."""
  117. _model = chr(_service_data[0] & 0b01111111)
  118. data = {
  119. "rawAdvData": _service_data,
  120. "data": {},
  121. "model": _model,
  122. "isEncrypted": bool(_service_data[0] & 0b10000000),
  123. }
  124. type_data = SUPPORTED_TYPES.get(_model)
  125. if type_data:
  126. model_data = type_data["func"](_service_data, _mfr_data)
  127. if model_data:
  128. data.update(
  129. {
  130. "modelFriendlyName": type_data["modelFriendlyName"],
  131. "modelName": type_data["modelName"],
  132. "data": model_data,
  133. }
  134. )
  135. return data