relay_switch.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  6. from ..const import SwitchbotModel
  7. from ..models import SwitchBotAdvertisement
  8. from .device import SwitchbotEncryptedDevice
  9. _LOGGER = logging.getLogger(__name__)
  10. COMMAND_HEADER = "57"
  11. COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
  12. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  13. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  14. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  15. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  16. COMMAND_GET_SWITCH_STATE = f"{COMMAND_HEADER}0f7101000000"
  17. PASSIVE_POLL_INTERVAL = 10 * 60
  18. class SwitchbotRelaySwitch(SwitchbotEncryptedDevice):
  19. """Representation of a Switchbot relay switch 1pm."""
  20. def __init__(
  21. self,
  22. device: BLEDevice,
  23. key_id: str,
  24. encryption_key: str,
  25. interface: int = 0,
  26. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  27. **kwargs: Any,
  28. ) -> None:
  29. self._force_next_update = False
  30. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  31. @classmethod
  32. async def verify_encryption_key(
  33. cls,
  34. device: BLEDevice,
  35. key_id: str,
  36. encryption_key: str,
  37. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  38. **kwargs: Any,
  39. ) -> bool:
  40. return await super().verify_encryption_key(
  41. device, key_id, encryption_key, model, **kwargs
  42. )
  43. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  44. """Update device data from advertisement."""
  45. # Obtain voltage and current through command.
  46. adv_data = advertisement.data["data"]
  47. if previous_voltage := self._get_adv_value("voltage"):
  48. adv_data["voltage"] = previous_voltage
  49. if previous_current := self._get_adv_value("current"):
  50. adv_data["current"] = previous_current
  51. current_state = self._get_adv_value("sequence_number")
  52. super().update_from_advertisement(advertisement)
  53. new_state = self._get_adv_value("sequence_number")
  54. _LOGGER.debug(
  55. "%s: update advertisement: %s (seq before: %s) (seq after: %s)",
  56. self.name,
  57. advertisement,
  58. current_state,
  59. new_state,
  60. )
  61. if current_state != new_state:
  62. self._force_next_update = True
  63. async def update(self, interface: int | None = None) -> None:
  64. """Update state of device."""
  65. if info := await self.get_voltage_and_current():
  66. self._last_full_update = time.monotonic()
  67. self._update_parsed_data(info)
  68. self._fire_callbacks()
  69. async def get_voltage_and_current(self) -> dict[str, Any] | None:
  70. """Get voltage and current because advtisement don't have these"""
  71. result = await self._send_command(COMMAND_GET_VOLTAGE_AND_CURRENT)
  72. ok = self._check_command_result(result, 0, {1})
  73. if ok:
  74. return {
  75. "voltage": ((result[9] << 8) + result[10]) / 10,
  76. "current": (result[11] << 8) + result[12],
  77. }
  78. return None
  79. async def get_basic_info(self) -> dict[str, Any] | None:
  80. """Get the current state of the switch."""
  81. result = await self._send_command(COMMAND_GET_SWITCH_STATE)
  82. if self._check_command_result(result, 0, {1}):
  83. return {
  84. "is_on": result[1] & 0x01 != 0,
  85. }
  86. return None
  87. def poll_needed(self, seconds_since_last_poll: float | None) -> bool:
  88. """Return if device needs polling."""
  89. if self._force_next_update:
  90. self._force_next_update = False
  91. return True
  92. if (
  93. seconds_since_last_poll is not None
  94. and seconds_since_last_poll < PASSIVE_POLL_INTERVAL
  95. ):
  96. return False
  97. time_since_last_full_update = time.monotonic() - self._last_full_update
  98. if time_since_last_full_update < PASSIVE_POLL_INTERVAL:
  99. return False
  100. return True
  101. async def turn_on(self) -> bool:
  102. """Turn device on."""
  103. result = await self._send_command(COMMAND_TURN_ON)
  104. ok = self._check_command_result(result, 0, {1})
  105. if ok:
  106. self._override_state({"isOn": True})
  107. self._fire_callbacks()
  108. return ok
  109. async def turn_off(self) -> bool:
  110. """Turn device off."""
  111. result = await self._send_command(COMMAND_TURN_OFF)
  112. ok = self._check_command_result(result, 0, {1})
  113. if ok:
  114. self._override_state({"isOn": False})
  115. self._fire_callbacks()
  116. return ok
  117. async def async_toggle(self, **kwargs) -> bool:
  118. """Toggle device."""
  119. result = await self._send_command(COMMAND_TOGGLE)
  120. status = self._check_command_result(result, 0, {1})
  121. return status
  122. def is_on(self) -> bool | None:
  123. """Return switch state from cache."""
  124. return self._get_adv_value("isOn")
  125. async def _send_command(
  126. self, key: str, retry: int | None = None, encrypt: bool = True
  127. ) -> bytes | None:
  128. if not encrypt:
  129. return await super()._send_command(key[:2] + "000000" + key[2:], retry)
  130. result = await self._ensure_encryption_initialized()
  131. if not result:
  132. return None
  133. encrypted = (
  134. key[:2] + self._key_id + self._iv[0:2].hex() + self._encrypt(key[2:])
  135. )
  136. result = await super()._send_command(encrypted, retry)
  137. return result[:1] + self._decrypt(result[4:])
  138. async def _ensure_encryption_initialized(self) -> bool:
  139. if self._iv is not None:
  140. return True
  141. result = await self._send_command(
  142. COMMAND_GET_CK_IV + self._key_id, encrypt=False
  143. )
  144. ok = self._check_command_result(result, 0, {1})
  145. if ok:
  146. self._iv = result[4:]
  147. return ok
  148. async def _execute_disconnect(self) -> None:
  149. await super()._execute_disconnect()
  150. self._iv = None
  151. self._cipher = None
  152. def _get_cipher(self) -> Cipher:
  153. if self._cipher is None:
  154. self._cipher = Cipher(
  155. algorithms.AES128(self._encryption_key), modes.CTR(self._iv)
  156. )
  157. return self._cipher
  158. def _encrypt(self, data: str) -> str:
  159. if len(data) == 0:
  160. return ""
  161. encryptor = self._get_cipher().encryptor()
  162. return (encryptor.update(bytearray.fromhex(data)) + encryptor.finalize()).hex()
  163. def _decrypt(self, data: bytearray) -> bytes:
  164. if len(data) == 0:
  165. return b""
  166. decryptor = self._get_cipher().decryptor()
  167. return decryptor.update(data) + decryptor.finalize()