discovery.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. """Discover switchbot devices."""
  2. from __future__ import annotations
  3. import asyncio
  4. import logging
  5. import bleak
  6. from bleak.backends.device import BLEDevice
  7. from bleak.backends.scanner import AdvertisementData
  8. from .adv_parser import parse_advertisement_data
  9. from .const import DEFAULT_RETRY_COUNT, DEFAULT_RETRY_TIMEOUT, DEFAULT_SCAN_TIMEOUT
  10. from .models import SwitchBotAdvertisement
  11. _LOGGER = logging.getLogger(__name__)
  12. CONNECT_LOCK = asyncio.Lock()
  13. class GetSwitchbotDevices:
  14. """Scan for all Switchbot devices and return by type."""
  15. def __init__(self, interface: int = 0) -> None:
  16. """Get switchbot devices class constructor."""
  17. self._interface = f"hci{interface}"
  18. self._adv_data: dict[str, SwitchBotAdvertisement] = {}
  19. def detection_callback(
  20. self,
  21. device: BLEDevice,
  22. advertisement_data: AdvertisementData,
  23. ) -> None:
  24. """Callback for device detection."""
  25. discovery = parse_advertisement_data(device, advertisement_data)
  26. if discovery:
  27. self._adv_data[discovery.address] = discovery
  28. async def discover(
  29. self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT
  30. ) -> dict:
  31. """Find switchbot devices and their advertisement data."""
  32. devices = None
  33. devices = bleak.BleakScanner(
  34. # TODO: Find new UUIDs to filter on. For example, see
  35. # https://github.com/OpenWonderLabs/SwitchBotAPI-BLE/blob/4ad138bb09f0fbbfa41b152ca327a78c1d0b6ba9/devicetypes/meter.md
  36. adapter=self._interface,
  37. )
  38. devices.register_detection_callback(self.detection_callback)
  39. async with CONNECT_LOCK:
  40. await devices.start()
  41. await asyncio.sleep(scan_timeout)
  42. await devices.stop()
  43. if devices is None:
  44. if retry < 1:
  45. _LOGGER.error(
  46. "Scanning for Switchbot devices failed. Stop trying", exc_info=True
  47. )
  48. return self._adv_data
  49. _LOGGER.warning(
  50. "Error scanning for Switchbot devices. Retrying (remaining: %d)",
  51. retry,
  52. )
  53. await asyncio.sleep(DEFAULT_RETRY_TIMEOUT)
  54. return await self.discover(retry - 1, scan_timeout)
  55. return self._adv_data
  56. async def _get_devices_by_model(
  57. self,
  58. model: str,
  59. ) -> dict:
  60. """Get switchbot devices by type."""
  61. if not self._adv_data:
  62. await self.discover()
  63. return {
  64. address: adv
  65. for address, adv in self._adv_data.items()
  66. if adv.data.get("model") == model
  67. }
  68. async def get_blind_tilts(self) -> dict[str, SwitchBotAdvertisement]:
  69. """Return all WoBlindTilt/BlindTilts devices with services data."""
  70. regular_blinds = await self._get_devices_by_model("x")
  71. pairing_blinds = await self._get_devices_by_model("X")
  72. return {**regular_blinds, **pairing_blinds}
  73. async def get_curtains(self) -> dict[str, SwitchBotAdvertisement]:
  74. """Return all WoCurtain/Curtains devices with services data."""
  75. regular_curtains = await self._get_devices_by_model("c")
  76. pairing_curtains = await self._get_devices_by_model("C")
  77. regular_curtains3 = await self._get_devices_by_model("{")
  78. pairing_curtains3 = await self._get_devices_by_model("[")
  79. return {
  80. **regular_curtains,
  81. **pairing_curtains,
  82. **regular_curtains3,
  83. **pairing_curtains3,
  84. }
  85. async def get_bots(self) -> dict[str, SwitchBotAdvertisement]:
  86. """Return all WoHand/Bot devices with services data."""
  87. return await self._get_devices_by_model("H")
  88. async def get_tempsensors(self) -> dict[str, SwitchBotAdvertisement]:
  89. """Return all WoSensorTH/Temp sensor devices with services data."""
  90. base_meters = await self._get_devices_by_model("T")
  91. plus_meters = await self._get_devices_by_model("i")
  92. io_meters = await self._get_devices_by_model("w")
  93. return {**base_meters, **plus_meters, **io_meters}
  94. async def get_contactsensors(self) -> dict[str, SwitchBotAdvertisement]:
  95. """Return all WoContact/Contact sensor devices with services data."""
  96. return await self._get_devices_by_model("d")
  97. async def get_locks(self) -> dict[str, SwitchBotAdvertisement]:
  98. """Return all WoLock/Locks devices with services data."""
  99. return await self._get_devices_by_model("o")
  100. async def get_device_data(
  101. self, address: str
  102. ) -> dict[str, SwitchBotAdvertisement] | None:
  103. """Return data for specific device."""
  104. if not self._adv_data:
  105. await self.discover()
  106. return {
  107. device: adv
  108. for device, adv in self._adv_data.items()
  109. # MacOS uses UUIDs instead of MAC addresses
  110. if adv.data.get("address") == address
  111. }