1
0

relay_switch.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from ..const import SwitchbotModel
  6. from ..models import SwitchBotAdvertisement
  7. from .device import (
  8. SwitchbotEncryptedDevice,
  9. SwitchbotSequenceDevice,
  10. update_after_operation,
  11. )
  12. _LOGGER = logging.getLogger(__name__)
  13. COMMAND_HEADER = "57"
  14. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  15. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  16. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  17. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  18. COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
  19. COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
  20. COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
  21. MULTI_CHANNEL_COMMANDS_TURN_ON = {
  22. SwitchbotModel.RELAY_SWITCH_2PM: {
  23. 1: "570f70010d00",
  24. 2: "570f70010700",
  25. }
  26. }
  27. MULTI_CHANNEL_COMMANDS_TURN_OFF = {
  28. SwitchbotModel.RELAY_SWITCH_2PM: {
  29. 1: "570f70010c00",
  30. 2: "570f70010300",
  31. }
  32. }
  33. MULTI_CHANNEL_COMMANDS_TOGGLE = {
  34. SwitchbotModel.RELAY_SWITCH_2PM: {
  35. 1: "570f70010e00",
  36. 2: "570f70010b00",
  37. }
  38. }
  39. MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
  40. SwitchbotModel.RELAY_SWITCH_2PM: {
  41. 1: COMMAND_GET_CHANNEL1_INFO,
  42. 2: COMMAND_GET_CHANNEL2_INFO,
  43. }
  44. }
  45. class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  46. """Representation of a Switchbot relay switch 1pm."""
  47. def __init__(
  48. self,
  49. device: BLEDevice,
  50. key_id: str,
  51. encryption_key: str,
  52. interface: int = 0,
  53. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  54. **kwargs: Any,
  55. ) -> None:
  56. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  57. @classmethod
  58. async def verify_encryption_key(
  59. cls,
  60. device: BLEDevice,
  61. key_id: str,
  62. encryption_key: str,
  63. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  64. **kwargs: Any,
  65. ) -> bool:
  66. return await super().verify_encryption_key(
  67. device, key_id, encryption_key, model, **kwargs
  68. )
  69. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  70. """Update device data from advertisement."""
  71. adv_data = advertisement.data["data"]
  72. channel = self._channel if hasattr(self, "_channel") else None
  73. if channel is None:
  74. adv_data["voltage"] = self._get_adv_value("voltage") or 0
  75. adv_data["current"] = self._get_adv_value("current") or 0
  76. else:
  77. for i in range(1, channel + 1):
  78. adv_data[i] = adv_data.get(i, {})
  79. adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
  80. adv_data[i]["current"] = self._get_adv_value("current", i) or 0
  81. super().update_from_advertisement(advertisement)
  82. def get_current_time_and_start_time(self) -> int:
  83. """Get current time in seconds since epoch."""
  84. current_time = int(time.time())
  85. current_time_hex = f"{current_time:08x}"
  86. current_day_start_time = int(current_time / 86400) * 86400
  87. current_day_start_time_hex = f"{current_day_start_time:08x}"
  88. return current_time_hex, current_day_start_time_hex
  89. async def get_basic_info(self) -> dict[str, Any] | None:
  90. """Get device basic settings."""
  91. current_time_hex, current_day_start_time_hex = self.get_current_time_and_start_time()
  92. if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
  93. return None
  94. if not (_channel1_data := await self._get_basic_info(COMMAND_GET_CHANNEL1_INFO.format(current_time_hex, current_day_start_time_hex))):
  95. return None
  96. common_data = {
  97. "isOn": bool(_data[2] & 0b10000000),
  98. "firmware": _data[16] / 10.0,
  99. "use_time": int.from_bytes(_channel1_data[7:9], "big"),
  100. }
  101. user_data = {
  102. "Electricity Usage Today": int.from_bytes(_channel1_data[1:4], "big"),
  103. "Electricity Usage Yesterday": int.from_bytes(_channel1_data[4:7], "big"),
  104. "voltage": int.from_bytes(_channel1_data[9:11], "big") / 10.0,
  105. "current": int.from_bytes(_channel1_data[11:13], "big"),
  106. "power": int.from_bytes(_channel1_data[13:15], "big") / 10.0,
  107. }
  108. garage_door_opener_data = {
  109. "door_open": not bool(_data[7] & 0b00100000),
  110. }
  111. _LOGGER.debug("common_data: %s, garage_door_opener_data: %s", common_data, garage_door_opener_data)
  112. if self._model == SwitchbotModel.RELAY_SWITCH_1:
  113. return common_data
  114. if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
  115. return common_data | garage_door_opener_data
  116. return common_data | user_data
  117. @update_after_operation
  118. async def turn_on(self) -> bool:
  119. """Turn device on."""
  120. result = await self._send_command(COMMAND_TURN_ON)
  121. return self._check_command_result(result, 0, {1})
  122. @update_after_operation
  123. async def turn_off(self) -> bool:
  124. """Turn device off."""
  125. result = await self._send_command(COMMAND_TURN_OFF)
  126. return self._check_command_result(result, 0, {1})
  127. @update_after_operation
  128. async def async_toggle(self, **kwargs) -> bool:
  129. """Toggle device."""
  130. result = await self._send_command(COMMAND_TOGGLE)
  131. return self._check_command_result(result, 0, {1})
  132. def is_on(self) -> bool | None:
  133. """Return switch state from cache."""
  134. return self._get_adv_value("isOn")
  135. class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
  136. """Representation of a Switchbot relay switch 2pm."""
  137. def __init__(
  138. self,
  139. device: BLEDevice,
  140. key_id: str,
  141. encryption_key: str,
  142. interface: int = 0,
  143. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM,
  144. **kwargs: Any,
  145. ) -> None:
  146. super().__init__(device, key_id, encryption_key, interface, model, **kwargs)
  147. self._channel = 2
  148. @property
  149. def channel(self) -> int:
  150. return self._channel
  151. def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
  152. """Return parsed device data, optionally for a specific channel."""
  153. data = self.data.get("data") or {}
  154. return data.get(channel, {})
  155. async def get_basic_info(self):
  156. current_time_hex, current_day_start_time_hex = self.get_current_time_and_start_time()
  157. if not (common_data := await super().get_basic_info()):
  158. return None
  159. if not (
  160. _channel2_data := await self._get_basic_info(COMMAND_GET_CHANNEL2_INFO.format(current_time_hex, current_day_start_time_hex))
  161. ):
  162. return None
  163. result = {
  164. 1: common_data,
  165. 2: {
  166. "isOn": bool(_channel2_data[2] & 0b01000000),
  167. "Electricity Usage Today": int.from_bytes(_channel2_data[1:4], "big"),
  168. "Electricity Usage Yesterday": int.from_bytes(_channel2_data[4:7], "big"),
  169. "use_time": int.from_bytes(_channel2_data[7:9], "big"),
  170. "voltage": int.from_bytes(_channel2_data[9:11], "big") / 10.0,
  171. "current": int.from_bytes(_channel2_data[11:13], "big"),
  172. "power": int.from_bytes(_channel2_data[13:15], "big") / 10.0,
  173. }
  174. }
  175. _LOGGER.debug("Multi channel basic info: %s", result)
  176. return result
  177. @update_after_operation
  178. async def turn_on(self, channel: int) -> bool:
  179. """Turn device on."""
  180. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel])
  181. return self._check_command_result(result, 0, {1})
  182. @update_after_operation
  183. async def turn_off(self, channel: int) -> bool:
  184. """Turn device off."""
  185. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel])
  186. return self._check_command_result(result, 0, {1})
  187. @update_after_operation
  188. async def async_toggle(self, channel: int) -> bool:
  189. """Toggle device."""
  190. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel])
  191. return self._check_command_result(result, 0, {1})
  192. def is_on(self, channel: int) -> bool | None:
  193. """Return switch state from cache."""
  194. return self._get_adv_value("isOn", channel)
  195. def switch_mode(self, channel: int) -> bool | None:
  196. """Return true or false from cache."""
  197. return self._get_adv_value("switchMode", channel)