relay_switch.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  6. from ..const import SwitchbotModel
  7. from ..models import SwitchBotAdvertisement
  8. from .device import SwitchbotEncryptedDevice
  9. _LOGGER = logging.getLogger(__name__)
  10. COMMAND_HEADER = "57"
  11. COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
  12. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  13. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  14. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  15. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  16. PASSIVE_POLL_INTERVAL = 10 * 60
  17. class SwitchbotRelaySwitch(SwitchbotEncryptedDevice):
  18. """Representation of a Switchbot relay switch 1pm."""
  19. def __init__(
  20. self,
  21. device: BLEDevice,
  22. key_id: str,
  23. encryption_key: str,
  24. interface: int = 0,
  25. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  26. **kwargs: Any,
  27. ) -> None:
  28. self._force_next_update = False
  29. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  30. @classmethod
  31. async def verify_encryption_key(
  32. cls,
  33. device: BLEDevice,
  34. key_id: str,
  35. encryption_key: str,
  36. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  37. **kwargs: Any,
  38. ) -> bool:
  39. return await super().verify_encryption_key(
  40. device, key_id, encryption_key, model, **kwargs
  41. )
  42. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  43. """Update device data from advertisement."""
  44. # Obtain voltage and current through command.
  45. adv_data = advertisement.data["data"]
  46. if previous_voltage := self._get_adv_value("voltage"):
  47. adv_data["voltage"] = previous_voltage
  48. if previous_current := self._get_adv_value("current"):
  49. adv_data["current"] = previous_current
  50. current_state = self._get_adv_value("sequence_number")
  51. super().update_from_advertisement(advertisement)
  52. new_state = self._get_adv_value("sequence_number")
  53. _LOGGER.debug(
  54. "%s: update advertisement: %s (seq before: %s) (seq after: %s)",
  55. self.name,
  56. advertisement,
  57. current_state,
  58. new_state,
  59. )
  60. if current_state != new_state:
  61. self._force_next_update = True
  62. async def update(self, interface: int | None = None) -> None:
  63. """Update state of device."""
  64. if info := await self.get_voltage_and_current():
  65. self._last_full_update = time.monotonic()
  66. self._update_parsed_data(info)
  67. self._fire_callbacks()
  68. async def get_voltage_and_current(self) -> dict[str, Any] | None:
  69. """Get voltage and current because advtisement don't have these"""
  70. result = await self._send_command(COMMAND_GET_VOLTAGE_AND_CURRENT)
  71. ok = self._check_command_result(result, 0, {1})
  72. if ok:
  73. return {
  74. "voltage": ((result[9] << 8) + result[10]) / 10,
  75. "current": (result[11] << 8) + result[12],
  76. }
  77. return None
  78. def poll_needed(self, seconds_since_last_poll: float | None) -> bool:
  79. """Return if device needs polling."""
  80. if self._force_next_update:
  81. self._force_next_update = False
  82. return True
  83. if (
  84. seconds_since_last_poll is not None
  85. and seconds_since_last_poll < PASSIVE_POLL_INTERVAL
  86. ):
  87. return False
  88. time_since_last_full_update = time.monotonic() - self._last_full_update
  89. if time_since_last_full_update < PASSIVE_POLL_INTERVAL:
  90. return False
  91. return True
  92. async def turn_on(self) -> bool:
  93. """Turn device on."""
  94. result = await self._send_command(COMMAND_TURN_ON)
  95. ok = self._check_command_result(result, 0, {1})
  96. if ok:
  97. self._override_state({"isOn": True})
  98. self._fire_callbacks()
  99. return ok
  100. async def turn_off(self) -> bool:
  101. """Turn device off."""
  102. result = await self._send_command(COMMAND_TURN_OFF)
  103. ok = self._check_command_result(result, 0, {1})
  104. if ok:
  105. self._override_state({"isOn": False})
  106. self._fire_callbacks()
  107. return ok
  108. async def async_toggle(self, **kwargs) -> bool:
  109. """Toggle device."""
  110. result = await self._send_command(COMMAND_TOGGLE)
  111. status = self._check_command_result(result, 0, {1})
  112. return status
  113. def is_on(self) -> bool | None:
  114. """Return switch state from cache."""
  115. return self._get_adv_value("isOn")
  116. async def _send_command(
  117. self, key: str, retry: int | None = None, encrypt: bool = True
  118. ) -> bytes | None:
  119. if not encrypt:
  120. return await super()._send_command(key[:2] + "000000" + key[2:], retry)
  121. result = await self._ensure_encryption_initialized()
  122. if not result:
  123. return None
  124. encrypted = (
  125. key[:2] + self._key_id + self._iv[0:2].hex() + self._encrypt(key[2:])
  126. )
  127. result = await super()._send_command(encrypted, retry)
  128. return result[:1] + self._decrypt(result[4:])
  129. async def _ensure_encryption_initialized(self) -> bool:
  130. if self._iv is not None:
  131. return True
  132. result = await self._send_command(
  133. COMMAND_GET_CK_IV + self._key_id, encrypt=False
  134. )
  135. ok = self._check_command_result(result, 0, {1})
  136. if ok:
  137. self._iv = result[4:]
  138. return ok
  139. async def _execute_disconnect(self) -> None:
  140. await super()._execute_disconnect()
  141. self._iv = None
  142. self._cipher = None
  143. def _get_cipher(self) -> Cipher:
  144. if self._cipher is None:
  145. self._cipher = Cipher(
  146. algorithms.AES128(self._encryption_key), modes.CTR(self._iv)
  147. )
  148. return self._cipher
  149. def _encrypt(self, data: str) -> str:
  150. if len(data) == 0:
  151. return ""
  152. encryptor = self._get_cipher().encryptor()
  153. return (encryptor.update(bytearray.fromhex(data)) + encryptor.finalize()).hex()
  154. def _decrypt(self, data: bytearray) -> bytes:
  155. if len(data) == 0:
  156. return b""
  157. decryptor = self._get_cipher().decryptor()
  158. return decryptor.update(data) + decryptor.finalize()