relay_switch.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. import asyncio
  2. import logging
  3. import time
  4. from typing import Any
  5. from bleak.backends.device import BLEDevice
  6. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  7. from ..const import SwitchbotModel
  8. from ..models import SwitchBotAdvertisement
  9. from .device import SwitchbotDevice
  10. _LOGGER = logging.getLogger(__name__)
  11. COMMAND_HEADER = "57"
  12. COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
  13. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  14. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  15. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  16. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  17. PASSIVE_POLL_INTERVAL = 10 * 60
  18. class SwitchbotRelaySwitch(SwitchbotDevice):
  19. """Representation of a Switchbot relay switch 1pm."""
  20. def __init__(
  21. self,
  22. device: BLEDevice,
  23. key_id: str,
  24. encryption_key: str,
  25. interface: int = 0,
  26. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  27. **kwargs: Any,
  28. ) -> None:
  29. if len(key_id) == 0:
  30. raise ValueError("key_id is missing")
  31. elif len(key_id) != 2:
  32. raise ValueError("key_id is invalid")
  33. if len(encryption_key) == 0:
  34. raise ValueError("encryption_key is missing")
  35. elif len(encryption_key) != 32:
  36. raise ValueError("encryption_key is invalid")
  37. self._iv = None
  38. self._cipher = None
  39. self._key_id = key_id
  40. self._encryption_key = bytearray.fromhex(encryption_key)
  41. self._model: SwitchbotModel = model
  42. self._force_next_update = False
  43. super().__init__(device, None, interface, **kwargs)
  44. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  45. """Update device data from advertisement."""
  46. # Obtain voltage and current through command.
  47. adv_data = advertisement.data["data"]
  48. if previous_voltage := self._get_adv_value("voltage"):
  49. adv_data["voltage"] = previous_voltage
  50. if previous_current := self._get_adv_value("current"):
  51. adv_data["current"] = previous_current
  52. current_state = self._get_adv_value("sequence_number")
  53. super().update_from_advertisement(advertisement)
  54. new_state = self._get_adv_value("sequence_number")
  55. _LOGGER.debug(
  56. "%s: update advertisement: %s (seq before: %s) (seq after: %s)",
  57. self.name,
  58. advertisement,
  59. current_state,
  60. new_state,
  61. )
  62. if current_state != new_state:
  63. self._force_next_update = True
  64. async def update(self, interface: int | None = None) -> None:
  65. """Update state of device."""
  66. if info := await self.get_voltage_and_current():
  67. self._last_full_update = time.monotonic()
  68. self._update_parsed_data(info)
  69. self._fire_callbacks()
  70. async def get_voltage_and_current(self) -> dict[str, Any] | None:
  71. """Get voltage and current because advtisement don't have these"""
  72. result = await self._send_command(COMMAND_GET_VOLTAGE_AND_CURRENT)
  73. ok = self._check_command_result(result, 0, {1})
  74. if ok:
  75. return {
  76. "voltage": ((result[9] << 8) + result[10]) / 10,
  77. "current": (result[11] << 8) + result[12],
  78. }
  79. return None
  80. def poll_needed(self, seconds_since_last_poll: float | None) -> bool:
  81. """Return if device needs polling."""
  82. if self._force_next_update:
  83. self._force_next_update = False
  84. return True
  85. if (
  86. seconds_since_last_poll is not None
  87. and seconds_since_last_poll < PASSIVE_POLL_INTERVAL
  88. ):
  89. return False
  90. time_since_last_full_update = time.monotonic() - self._last_full_update
  91. if time_since_last_full_update < PASSIVE_POLL_INTERVAL:
  92. return False
  93. return True
  94. async def turn_on(self) -> bool:
  95. """Turn device on."""
  96. result = await self._send_command(COMMAND_TURN_ON)
  97. ok = self._check_command_result(result, 0, {1})
  98. if ok:
  99. self._override_state({"isOn": True})
  100. self._fire_callbacks()
  101. return ok
  102. async def turn_off(self) -> bool:
  103. """Turn device off."""
  104. result = await self._send_command(COMMAND_TURN_OFF)
  105. ok = self._check_command_result(result, 0, {1})
  106. if ok:
  107. self._override_state({"isOn": False})
  108. self._fire_callbacks()
  109. return ok
  110. async def async_toggle(self, **kwargs) -> bool:
  111. """Toggle device."""
  112. result = await self._send_command(COMMAND_TOGGLE)
  113. status = self._check_command_result(result, 0, {1})
  114. return status
  115. def is_on(self) -> bool | None:
  116. """Return switch state from cache."""
  117. return self._get_adv_value("isOn")
  118. async def _send_command(
  119. self, key: str, retry: int | None = None, encrypt: bool = True
  120. ) -> bytes | None:
  121. if not encrypt:
  122. return await super()._send_command(key[:2] + "000000" + key[2:], retry)
  123. result = await self._ensure_encryption_initialized()
  124. if not result:
  125. return None
  126. encrypted = (
  127. key[:2] + self._key_id + self._iv[0:2].hex() + self._encrypt(key[2:])
  128. )
  129. result = await super()._send_command(encrypted, retry)
  130. return result[:1] + self._decrypt(result[4:])
  131. async def _ensure_encryption_initialized(self) -> bool:
  132. if self._iv is not None:
  133. return True
  134. result = await self._send_command(
  135. COMMAND_GET_CK_IV + self._key_id, encrypt=False
  136. )
  137. ok = self._check_command_result(result, 0, {1})
  138. if ok:
  139. self._iv = result[4:]
  140. return ok
  141. async def _execute_disconnect(self) -> None:
  142. await super()._execute_disconnect()
  143. self._iv = None
  144. self._cipher = None
  145. def _get_cipher(self) -> Cipher:
  146. if self._cipher is None:
  147. self._cipher = Cipher(
  148. algorithms.AES128(self._encryption_key), modes.CTR(self._iv)
  149. )
  150. return self._cipher
  151. def _encrypt(self, data: str) -> str:
  152. if len(data) == 0:
  153. return ""
  154. encryptor = self._get_cipher().encryptor()
  155. return (encryptor.update(bytearray.fromhex(data)) + encryptor.finalize()).hex()
  156. def _decrypt(self, data: bytearray) -> bytes:
  157. if len(data) == 0:
  158. return b""
  159. decryptor = self._get_cipher().decryptor()
  160. return decryptor.update(data) + decryptor.finalize()