device.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import asyncio
  4. import binascii
  5. import logging
  6. from typing import Any
  7. from uuid import UUID
  8. import async_timeout
  9. import bleak
  10. from bleak.backends.device import BLEDevice
  11. from bleak_retry_connector import BleakClient, establish_connection
  12. from ..const import DEFAULT_RETRY_COUNT, DEFAULT_SCAN_TIMEOUT
  13. from ..discovery import GetSwitchbotDevices
  14. from ..models import SwitchBotAdvertisement
  15. _LOGGER = logging.getLogger(__name__)
  16. # Keys common to all device types
  17. DEVICE_GET_BASIC_SETTINGS_KEY = "5702"
  18. DEVICE_SET_MODE_KEY = "5703"
  19. DEVICE_SET_EXTENDED_KEY = "570f"
  20. # Base key when encryption is set
  21. KEY_PASSWORD_PREFIX = "571"
  22. def _sb_uuid(comms_type: str = "service") -> UUID | str:
  23. """Return Switchbot UUID."""
  24. _uuid = {"tx": "002", "rx": "003", "service": "d00"}
  25. if comms_type in _uuid:
  26. return UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b")
  27. return "Incorrect type, choose between: tx, rx or service"
  28. class SwitchbotDevice:
  29. """Base Representation of a Switchbot Device."""
  30. def __init__(
  31. self,
  32. device: BLEDevice,
  33. password: str | None = None,
  34. interface: int = 0,
  35. **kwargs: Any,
  36. ) -> None:
  37. """Switchbot base class constructor."""
  38. self._interface = f"hci{interface}"
  39. self._device = device
  40. self._sb_adv_data: SwitchBotAdvertisement | None = None
  41. self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
  42. self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
  43. self._connect_lock = asyncio.Lock()
  44. if password is None or password == "":
  45. self._password_encoded = None
  46. else:
  47. self._password_encoded = "%08x" % (
  48. binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
  49. )
  50. def _commandkey(self, key: str) -> str:
  51. """Add password to key if set."""
  52. if self._password_encoded is None:
  53. return key
  54. key_action = key[3]
  55. key_suffix = key[4:]
  56. return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
  57. async def _sendcommand(self, key: str, retry: int) -> bytes:
  58. """Send command to device and read response."""
  59. command = bytearray.fromhex(self._commandkey(key))
  60. _LOGGER.debug("Sending command to switchbot %s", command)
  61. max_attempts = retry + 1
  62. async with self._connect_lock:
  63. for attempt in range(max_attempts):
  64. try:
  65. return await self._send_command_locked(key, command)
  66. except (bleak.BleakError, asyncio.exceptions.TimeoutError):
  67. if attempt == retry:
  68. _LOGGER.error(
  69. "Switchbot communication failed. Stopping trying",
  70. exc_info=True,
  71. )
  72. return b"\x00"
  73. _LOGGER.debug("Switchbot communication failed with:", exc_info=True)
  74. raise RuntimeError("Unreachable")
  75. @property
  76. def name(self) -> str:
  77. """Return device name."""
  78. return f"{self._device.name} ({self._device.address})"
  79. async def _send_command_locked(self, key: str, command: bytes) -> bytes:
  80. """Send command to device and read response."""
  81. client: BleakClient | None = None
  82. try:
  83. _LOGGER.debug("%s: Connnecting to switchbot", self.name)
  84. client = await establish_connection(
  85. BleakClient, self._device, self.name, max_attempts=1
  86. )
  87. _LOGGER.debug(
  88. "%s: Connnected to switchbot: %s", self.name, client.is_connected
  89. )
  90. read_char = client.services.get_characteristic(_sb_uuid(comms_type="rx"))
  91. write_char = client.services.get_characteristic(_sb_uuid(comms_type="tx"))
  92. future: asyncio.Future[bytearray] = asyncio.Future()
  93. def _notification_handler(_sender: int, data: bytearray) -> None:
  94. """Handle notification responses."""
  95. if future.done():
  96. _LOGGER.debug("%s: Notification handler already done", self.name)
  97. return
  98. future.set_result(data)
  99. _LOGGER.debug("%s: Subscribe to notifications", self.name)
  100. await client.start_notify(read_char, _notification_handler)
  101. _LOGGER.debug("%s: Sending command, %s", self.name, key)
  102. await client.write_gatt_char(write_char, command, False)
  103. async with async_timeout.timeout(5):
  104. notify_msg = await future
  105. _LOGGER.info("%s: Notification received: %s", self.name, notify_msg)
  106. _LOGGER.debug("%s: UnSubscribe to notifications", self.name)
  107. await client.stop_notify(read_char)
  108. finally:
  109. if client:
  110. await client.disconnect()
  111. if notify_msg == b"\x07":
  112. _LOGGER.error("Password required")
  113. elif notify_msg == b"\t":
  114. _LOGGER.error("Password incorrect")
  115. return notify_msg
  116. def get_address(self) -> str:
  117. """Return address of device."""
  118. return self._device.address
  119. def _get_adv_value(self, key: str) -> Any:
  120. """Return value from advertisement data."""
  121. if not self._sb_adv_data:
  122. return None
  123. return self._sb_adv_data.data["data"][key]
  124. def get_battery_percent(self) -> Any:
  125. """Return device battery level in percent."""
  126. return self._get_adv_value("battery")
  127. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  128. """Update device data from advertisement."""
  129. self._sb_adv_data = advertisement
  130. self._device = advertisement.device
  131. async def get_device_data(
  132. self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
  133. ) -> dict | None:
  134. """Find switchbot devices and their advertisement data."""
  135. if interface:
  136. _interface: int = interface
  137. else:
  138. _interface = int(self._interface.replace("hci", ""))
  139. _data = await GetSwitchbotDevices(interface=_interface).discover(
  140. retry=retry, scan_timeout=self._scan_timeout
  141. )
  142. if self._device.address in _data:
  143. self._sb_adv_data = _data[self._device.address]
  144. return self._sb_adv_data
  145. async def _get_basic_info(self) -> dict | None:
  146. """Return basic info of device."""
  147. _data = await self._sendcommand(
  148. key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
  149. )
  150. if _data in (b"\x07", b"\x00"):
  151. _LOGGER.error("Unsuccessful, please try again")
  152. return None
  153. return _data