relay_switch.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import time
  2. from typing import Any
  3. from bleak.backends.device import BLEDevice
  4. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  5. from ..const import SwitchbotModel
  6. from .device import SwitchbotSequenceDevice
  7. COMMAND_HEADER = "57"
  8. COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
  9. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  10. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  11. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  12. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  13. PASSIVE_POLL_INTERVAL = 1 * 60
  14. class SwitchbotRelaySwitch(SwitchbotSequenceDevice):
  15. """Representation of a Switchbot relay switch 1pm."""
  16. def __init__(
  17. self,
  18. device: BLEDevice,
  19. key_id: str,
  20. encryption_key: str,
  21. interface: int = 0,
  22. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  23. **kwargs: Any,
  24. ) -> None:
  25. if len(key_id) == 0:
  26. raise ValueError("key_id is missing")
  27. elif len(key_id) != 2:
  28. raise ValueError("key_id is invalid")
  29. if len(encryption_key) == 0:
  30. raise ValueError("encryption_key is missing")
  31. elif len(encryption_key) != 32:
  32. raise ValueError("encryption_key is invalid")
  33. self._iv = None
  34. self._cipher = None
  35. self._key_id = key_id
  36. self._encryption_key = bytearray.fromhex(encryption_key)
  37. self._model: SwitchbotModel = model
  38. super().__init__(device, None, interface, **kwargs)
  39. async def update(self, interface: int | None = None) -> None:
  40. """Update state of device."""
  41. if info := await self.get_voltage_and_current():
  42. self._last_full_update = time.monotonic()
  43. self._update_parsed_data(info)
  44. self._fire_callbacks()
  45. async def get_voltage_and_current(self) -> dict[str, Any] | None:
  46. """Get voltage and current because advtisement don't have these"""
  47. result = await self._send_command(COMMAND_GET_VOLTAGE_AND_CURRENT)
  48. ok = self._check_command_result(result, 0, {1})
  49. if ok:
  50. return {
  51. "voltage": (result[9] << 8) + result[10],
  52. "current": (result[11] << 8) + result[12],
  53. }
  54. return None
  55. def poll_needed(self, seconds_since_last_poll: float | None) -> bool:
  56. """Return if device needs polling."""
  57. if (
  58. seconds_since_last_poll is not None
  59. and seconds_since_last_poll < PASSIVE_POLL_INTERVAL
  60. ):
  61. return False
  62. time_since_last_full_update = time.monotonic() - self._last_full_update
  63. if time_since_last_full_update < PASSIVE_POLL_INTERVAL:
  64. return False
  65. return True
  66. async def turn_on(self) -> bool:
  67. """Turn device on."""
  68. result = await self._send_command(COMMAND_TURN_ON)
  69. ok = self._check_command_result(result, 0, {1})
  70. if ok:
  71. self._override_state({"isOn": True})
  72. self._fire_callbacks()
  73. return ok
  74. async def turn_off(self) -> bool:
  75. """Turn device off."""
  76. result = await self._send_command(COMMAND_TURN_OFF)
  77. ok = self._check_command_result(result, 0, {1})
  78. if ok:
  79. self._override_state({"isOn": False})
  80. self._fire_callbacks()
  81. return ok
  82. async def async_toggle(self, **kwargs) -> bool:
  83. """Toggle device."""
  84. result = await self._send_command(COMMAND_TOGGLE)
  85. status = self._check_command_result(result, 0, {1})
  86. return status
  87. def is_on(self) -> bool | None:
  88. """Return switch state from cache."""
  89. return self._get_adv_value("isOn")
  90. async def _send_command(
  91. self, key: str, retry: int | None = None, encrypt: bool = True
  92. ) -> bytes | None:
  93. if not encrypt:
  94. return await super()._send_command(key[:2] + "000000" + key[2:], retry)
  95. result = await self._ensure_encryption_initialized()
  96. if not result:
  97. return None
  98. encrypted = (
  99. key[:2] + self._key_id + self._iv[0:2].hex() + self._encrypt(key[2:])
  100. )
  101. result = await super()._send_command(encrypted, retry)
  102. return result[:1] + self._decrypt(result[4:])
  103. async def _ensure_encryption_initialized(self) -> bool:
  104. if self._iv is not None:
  105. return True
  106. result = await self._send_command(
  107. COMMAND_GET_CK_IV + self._key_id, encrypt=False
  108. )
  109. ok = self._check_command_result(result, 0, {1})
  110. if ok:
  111. self._iv = result[4:]
  112. return ok
  113. async def _execute_disconnect(self) -> None:
  114. await super()._execute_disconnect()
  115. self._iv = None
  116. self._cipher = None
  117. def _get_cipher(self) -> Cipher:
  118. if self._cipher is None:
  119. self._cipher = Cipher(
  120. algorithms.AES128(self._encryption_key), modes.CTR(self._iv)
  121. )
  122. return self._cipher
  123. def _encrypt(self, data: str) -> str:
  124. if len(data) == 0:
  125. return ""
  126. encryptor = self._get_cipher().encryptor()
  127. return (encryptor.update(bytearray.fromhex(data)) + encryptor.finalize()).hex()
  128. def _decrypt(self, data: bytearray) -> bytes:
  129. if len(data) == 0:
  130. return b""
  131. decryptor = self._get_cipher().decryptor()
  132. return decryptor.update(data) + decryptor.finalize()