relay_switch.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  6. from ..const import SwitchbotModel
  7. from ..models import SwitchBotAdvertisement
  8. from .device import SwitchbotDevice
  9. _LOGGER = logging.getLogger(__name__)
  10. COMMAND_HEADER = "57"
  11. COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
  12. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  13. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  14. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  15. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  16. PASSIVE_POLL_INTERVAL = 10 * 60
  17. class SwitchbotRelaySwitch(SwitchbotDevice):
  18. """Representation of a Switchbot relay switch 1pm."""
  19. def __init__(
  20. self,
  21. device: BLEDevice,
  22. key_id: str,
  23. encryption_key: str,
  24. interface: int = 0,
  25. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  26. **kwargs: Any,
  27. ) -> None:
  28. if len(key_id) == 0:
  29. raise ValueError("key_id is missing")
  30. elif len(key_id) != 2:
  31. raise ValueError("key_id is invalid")
  32. if len(encryption_key) == 0:
  33. raise ValueError("encryption_key is missing")
  34. elif len(encryption_key) != 32:
  35. raise ValueError("encryption_key is invalid")
  36. self._iv = None
  37. self._cipher = None
  38. self._key_id = key_id
  39. self._encryption_key = bytearray.fromhex(encryption_key)
  40. self._model: SwitchbotModel = model
  41. self._force_next_update = False
  42. super().__init__(device, None, interface, **kwargs)
  43. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  44. """Update device data from advertisement."""
  45. # Obtain voltage and current through command.
  46. adv_data = advertisement.data["data"]
  47. if previous_voltage := self._get_adv_value("voltage"):
  48. adv_data["voltage"] = previous_voltage
  49. if previous_current := self._get_adv_value("current"):
  50. adv_data["current"] = previous_current
  51. current_state = self._get_adv_value("sequence_number")
  52. super().update_from_advertisement(advertisement)
  53. new_state = self._get_adv_value("sequence_number")
  54. _LOGGER.debug(
  55. "%s: update advertisement: %s (seq before: %s) (seq after: %s)",
  56. self.name,
  57. advertisement,
  58. current_state,
  59. new_state,
  60. )
  61. if current_state != new_state:
  62. self._force_next_update = True
  63. async def update(self, interface: int | None = None) -> None:
  64. """Update state of device."""
  65. if info := await self.get_voltage_and_current():
  66. self._last_full_update = time.monotonic()
  67. self._update_parsed_data(info)
  68. self._fire_callbacks()
  69. async def get_voltage_and_current(self) -> dict[str, Any] | None:
  70. """Get voltage and current because advtisement don't have these"""
  71. result = await self._send_command(COMMAND_GET_VOLTAGE_AND_CURRENT)
  72. ok = self._check_command_result(result, 0, {1})
  73. if ok:
  74. return {
  75. "voltage": ((result[9] << 8) + result[10]) / 10,
  76. "current": (result[11] << 8) + result[12],
  77. }
  78. return None
  79. def poll_needed(self, seconds_since_last_poll: float | None) -> bool:
  80. """Return if device needs polling."""
  81. if self._force_next_update:
  82. self._force_next_update = False
  83. return True
  84. if (
  85. seconds_since_last_poll is not None
  86. and seconds_since_last_poll < PASSIVE_POLL_INTERVAL
  87. ):
  88. return False
  89. time_since_last_full_update = time.monotonic() - self._last_full_update
  90. if time_since_last_full_update < PASSIVE_POLL_INTERVAL:
  91. return False
  92. return True
  93. async def turn_on(self) -> bool:
  94. """Turn device on."""
  95. result = await self._send_command(COMMAND_TURN_ON)
  96. ok = self._check_command_result(result, 0, {1})
  97. if ok:
  98. self._override_state({"isOn": True})
  99. self._fire_callbacks()
  100. return ok
  101. async def turn_off(self) -> bool:
  102. """Turn device off."""
  103. result = await self._send_command(COMMAND_TURN_OFF)
  104. ok = self._check_command_result(result, 0, {1})
  105. if ok:
  106. self._override_state({"isOn": False})
  107. self._fire_callbacks()
  108. return ok
  109. async def async_toggle(self, **kwargs) -> bool:
  110. """Toggle device."""
  111. result = await self._send_command(COMMAND_TOGGLE)
  112. status = self._check_command_result(result, 0, {1})
  113. return status
  114. def is_on(self) -> bool | None:
  115. """Return switch state from cache."""
  116. return self._get_adv_value("isOn")
  117. async def _send_command(
  118. self, key: str, retry: int | None = None, encrypt: bool = True
  119. ) -> bytes | None:
  120. if not encrypt:
  121. return await super()._send_command(key[:2] + "000000" + key[2:], retry)
  122. result = await self._ensure_encryption_initialized()
  123. if not result:
  124. return None
  125. encrypted = (
  126. key[:2] + self._key_id + self._iv[0:2].hex() + self._encrypt(key[2:])
  127. )
  128. result = await super()._send_command(encrypted, retry)
  129. return result[:1] + self._decrypt(result[4:])
  130. async def _ensure_encryption_initialized(self) -> bool:
  131. if self._iv is not None:
  132. return True
  133. result = await self._send_command(
  134. COMMAND_GET_CK_IV + self._key_id, encrypt=False
  135. )
  136. ok = self._check_command_result(result, 0, {1})
  137. if ok:
  138. self._iv = result[4:]
  139. return ok
  140. async def _execute_disconnect(self) -> None:
  141. await super()._execute_disconnect()
  142. self._iv = None
  143. self._cipher = None
  144. def _get_cipher(self) -> Cipher:
  145. if self._cipher is None:
  146. self._cipher = Cipher(
  147. algorithms.AES128(self._encryption_key), modes.CTR(self._iv)
  148. )
  149. return self._cipher
  150. def _encrypt(self, data: str) -> str:
  151. if len(data) == 0:
  152. return ""
  153. encryptor = self._get_cipher().encryptor()
  154. return (encryptor.update(bytearray.fromhex(data)) + encryptor.finalize()).hex()
  155. def _decrypt(self, data: bytearray) -> bytes:
  156. if len(data) == 0:
  157. return b""
  158. decryptor = self._get_cipher().decryptor()
  159. return decryptor.update(data) + decryptor.finalize()