1
0

device.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import asyncio
  4. import binascii
  5. import logging
  6. from typing import Any
  7. from uuid import UUID
  8. import async_timeout
  9. import bleak
  10. from bleak.backends.device import BLEDevice
  11. from bleak.backends.service import BleakGATTServiceCollection
  12. from bleak_retry_connector import (
  13. BleakClientWithServiceCache,
  14. ble_device_has_changed,
  15. establish_connection,
  16. )
  17. from ..const import DEFAULT_RETRY_COUNT, DEFAULT_SCAN_TIMEOUT
  18. from ..discovery import GetSwitchbotDevices
  19. from ..models import SwitchBotAdvertisement
  20. _LOGGER = logging.getLogger(__name__)
  21. # Keys common to all device types
  22. DEVICE_GET_BASIC_SETTINGS_KEY = "5702"
  23. DEVICE_SET_MODE_KEY = "5703"
  24. DEVICE_SET_EXTENDED_KEY = "570f"
  25. # Base key when encryption is set
  26. KEY_PASSWORD_PREFIX = "571"
  27. def _sb_uuid(comms_type: str = "service") -> UUID | str:
  28. """Return Switchbot UUID."""
  29. _uuid = {"tx": "002", "rx": "003", "service": "d00"}
  30. if comms_type in _uuid:
  31. return UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b")
  32. return "Incorrect type, choose between: tx, rx or service"
  33. class SwitchbotDevice:
  34. """Base Representation of a Switchbot Device."""
  35. def __init__(
  36. self,
  37. device: BLEDevice,
  38. password: str | None = None,
  39. interface: int = 0,
  40. **kwargs: Any,
  41. ) -> None:
  42. """Switchbot base class constructor."""
  43. self._interface = f"hci{interface}"
  44. self._device = device
  45. self._sb_adv_data: SwitchBotAdvertisement | None = None
  46. self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
  47. self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
  48. self._connect_lock = asyncio.Lock()
  49. if password is None or password == "":
  50. self._password_encoded = None
  51. else:
  52. self._password_encoded = "%08x" % (
  53. binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
  54. )
  55. self._cached_services: BleakGATTServiceCollection | None = None
  56. def _commandkey(self, key: str) -> str:
  57. """Add password to key if set."""
  58. if self._password_encoded is None:
  59. return key
  60. key_action = key[3]
  61. key_suffix = key[4:]
  62. return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
  63. async def _sendcommand(self, key: str, retry: int) -> bytes:
  64. """Send command to device and read response."""
  65. command = bytearray.fromhex(self._commandkey(key))
  66. _LOGGER.debug("Sending command to switchbot %s", command)
  67. max_attempts = retry + 1
  68. async with self._connect_lock:
  69. for attempt in range(max_attempts):
  70. try:
  71. return await self._send_command_locked(key, command)
  72. except (bleak.BleakError, asyncio.exceptions.TimeoutError):
  73. if attempt == retry:
  74. _LOGGER.error(
  75. "Switchbot communication failed. Stopping trying",
  76. exc_info=True,
  77. )
  78. return b"\x00"
  79. _LOGGER.debug("Switchbot communication failed with:", exc_info=True)
  80. raise RuntimeError("Unreachable")
  81. @property
  82. def name(self) -> str:
  83. """Return device name."""
  84. return f"{self._device.name} ({self._device.address})"
  85. async def _send_command_locked(self, key: str, command: bytes) -> bytes:
  86. """Send command to device and read response."""
  87. client: BleakClientWithServiceCache | None = None
  88. try:
  89. _LOGGER.debug("%s: Connnecting to switchbot", self.name)
  90. client = await establish_connection(
  91. BleakClientWithServiceCache,
  92. self._device,
  93. self.name,
  94. max_attempts=1,
  95. cached_services=self._cached_services,
  96. )
  97. self._cached_services = client.services
  98. _LOGGER.debug(
  99. "%s: Connnected to switchbot: %s", self.name, client.is_connected
  100. )
  101. read_char = client.services.get_characteristic(_sb_uuid(comms_type="rx"))
  102. write_char = client.services.get_characteristic(_sb_uuid(comms_type="tx"))
  103. future: asyncio.Future[bytearray] = asyncio.Future()
  104. def _notification_handler(_sender: int, data: bytearray) -> None:
  105. """Handle notification responses."""
  106. if future.done():
  107. _LOGGER.debug("%s: Notification handler already done", self.name)
  108. return
  109. future.set_result(data)
  110. _LOGGER.debug("%s: Subscribe to notifications", self.name)
  111. await client.start_notify(read_char, _notification_handler)
  112. _LOGGER.debug("%s: Sending command, %s", self.name, key)
  113. await client.write_gatt_char(write_char, command, False)
  114. async with async_timeout.timeout(5):
  115. notify_msg = await future
  116. _LOGGER.info("%s: Notification received: %s", self.name, notify_msg)
  117. _LOGGER.debug("%s: UnSubscribe to notifications", self.name)
  118. await client.stop_notify(read_char)
  119. finally:
  120. if client:
  121. await client.disconnect()
  122. if notify_msg == b"\x07":
  123. _LOGGER.error("Password required")
  124. elif notify_msg == b"\t":
  125. _LOGGER.error("Password incorrect")
  126. return notify_msg
  127. def get_address(self) -> str:
  128. """Return address of device."""
  129. return self._device.address
  130. def _get_adv_value(self, key: str) -> Any:
  131. """Return value from advertisement data."""
  132. if not self._sb_adv_data:
  133. return None
  134. return self._sb_adv_data.data["data"][key]
  135. def get_battery_percent(self) -> Any:
  136. """Return device battery level in percent."""
  137. return self._get_adv_value("battery")
  138. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  139. """Update device data from advertisement."""
  140. self._sb_adv_data = advertisement
  141. if self._device and ble_device_has_changed(self._device, advertisement.device):
  142. self._cached_services = None
  143. self._device = advertisement.device
  144. async def get_device_data(
  145. self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
  146. ) -> dict | None:
  147. """Find switchbot devices and their advertisement data."""
  148. if interface:
  149. _interface: int = interface
  150. else:
  151. _interface = int(self._interface.replace("hci", ""))
  152. _data = await GetSwitchbotDevices(interface=_interface).discover(
  153. retry=retry, scan_timeout=self._scan_timeout
  154. )
  155. if self._device.address in _data:
  156. self._sb_adv_data = _data[self._device.address]
  157. return self._sb_adv_data
  158. async def _get_basic_info(self) -> dict | None:
  159. """Return basic info of device."""
  160. _data = await self._sendcommand(
  161. key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
  162. )
  163. if _data in (b"\x07", b"\x00"):
  164. _LOGGER.error("Unsuccessful, please try again")
  165. return None
  166. return _data