relay_switch.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from ..const import SwitchbotModel
  6. from ..models import SwitchBotAdvertisement
  7. from .device import (
  8. SwitchbotEncryptedDevice,
  9. SwitchbotSequenceDevice,
  10. update_after_operation,
  11. )
  12. _LOGGER = logging.getLogger(__name__)
  13. COMMAND_HEADER = "57"
  14. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  15. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  16. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  17. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  18. COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
  19. COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
  20. COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
  21. MULTI_CHANNEL_COMMANDS_TURN_ON = {
  22. SwitchbotModel.RELAY_SWITCH_2PM: {
  23. 1: "570f70010d00",
  24. 2: "570f70010700",
  25. }
  26. }
  27. MULTI_CHANNEL_COMMANDS_TURN_OFF = {
  28. SwitchbotModel.RELAY_SWITCH_2PM: {
  29. 1: "570f70010c00",
  30. 2: "570f70010300",
  31. }
  32. }
  33. MULTI_CHANNEL_COMMANDS_TOGGLE = {
  34. SwitchbotModel.RELAY_SWITCH_2PM: {
  35. 1: "570f70010e00",
  36. 2: "570f70010b00",
  37. }
  38. }
  39. MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
  40. SwitchbotModel.RELAY_SWITCH_2PM: {
  41. 1: COMMAND_GET_CHANNEL1_INFO,
  42. 2: COMMAND_GET_CHANNEL2_INFO,
  43. }
  44. }
  45. class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  46. """Representation of a Switchbot relay switch 1pm."""
  47. def __init__(
  48. self,
  49. device: BLEDevice,
  50. key_id: str,
  51. encryption_key: str,
  52. interface: int = 0,
  53. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  54. **kwargs: Any,
  55. ) -> None:
  56. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  57. @classmethod
  58. async def verify_encryption_key(
  59. cls,
  60. device: BLEDevice,
  61. key_id: str,
  62. encryption_key: str,
  63. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  64. **kwargs: Any,
  65. ) -> bool:
  66. return await super().verify_encryption_key(
  67. device, key_id, encryption_key, model, **kwargs
  68. )
  69. def _reset_power_data(self, data: dict[str, Any]) -> None:
  70. """Reset power-related data to 0."""
  71. for key in ["power", "current", "voltage"]:
  72. data[key] = 0
  73. def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
  74. """Parse common data from raw bytes."""
  75. return {
  76. "sequence_number": raw_data[1],
  77. "isOn": bool(raw_data[2] & 0b10000000),
  78. "firmware": raw_data[16] / 10.0,
  79. "channel2_isOn": bool(raw_data[2] & 0b01000000),
  80. }
  81. def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
  82. """Parse user-specific data from raw bytes."""
  83. return {
  84. "Electricity": round(int.from_bytes(raw_data[1:4], "big") / 60000, 2),
  85. "Electricity Usage Yesterday": round(int.from_bytes(raw_data[4:7], "big") / 60000, 2),
  86. "use_time": round(int.from_bytes(raw_data[7:9], "big") / 60, 2),
  87. "voltage": int.from_bytes(raw_data[9:11], "big") / 10.0,
  88. "current": int.from_bytes(raw_data[11:13], "big"),
  89. "power": int.from_bytes(raw_data[13:15], "big") / 10.0,
  90. }
  91. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  92. """Update device data from advertisement."""
  93. adv_data = advertisement.data["data"]
  94. channel = self._channel if hasattr(self, "_channel") else None
  95. if self._model in (
  96. SwitchbotModel.RELAY_SWITCH_1PM,
  97. SwitchbotModel.RELAY_SWITCH_2PM,
  98. ):
  99. if channel is None:
  100. adv_data["voltage"] = self._get_adv_value("voltage") or 0
  101. adv_data["current"] = self._get_adv_value("current") or 0
  102. adv_data["power"] = self._get_adv_value("power") or 0
  103. else:
  104. for i in range(1, channel + 1):
  105. adv_data[i] = adv_data.get(i, {})
  106. adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
  107. adv_data[i]["current"] = self._get_adv_value("current", i) or 0
  108. adv_data[i]["power"] = self._get_adv_value("power", i) or 0
  109. super().update_from_advertisement(advertisement)
  110. def get_current_time_and_start_time(self) -> int:
  111. """Get current time in seconds since epoch."""
  112. current_time = int(time.time())
  113. current_time_hex = f"{current_time:08x}"
  114. current_day_start_time = int(current_time / 86400) * 86400
  115. current_day_start_time_hex = f"{current_day_start_time:08x}"
  116. return current_time_hex, current_day_start_time_hex
  117. async def get_basic_info(self) -> dict[str, Any] | None:
  118. """Get device basic settings."""
  119. current_time_hex, current_day_start_time_hex = self.get_current_time_and_start_time()
  120. if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
  121. return None
  122. if not (_channel1_data := await self._get_basic_info(COMMAND_GET_CHANNEL1_INFO.format(current_time_hex, current_day_start_time_hex))):
  123. return None
  124. _LOGGER.debug("on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex())
  125. common_data = self._parse_common_data(_data)
  126. user_data = self._parse_user_data(_channel1_data)
  127. if self._model in (SwitchbotModel.RELAY_SWITCH_1, SwitchbotModel.GARAGE_DOOR_OPENER):
  128. for key in ["voltage", "current", "power"]:
  129. user_data.pop(key, None)
  130. if not common_data["isOn"]:
  131. self._reset_power_data(user_data)
  132. garage_door_opener_data = {"door_open": not bool(_data[7] & 0b00100000)}
  133. _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
  134. if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
  135. return common_data | garage_door_opener_data
  136. return common_data | user_data
  137. @update_after_operation
  138. async def turn_on(self) -> bool:
  139. """Turn device on."""
  140. result = await self._send_command(COMMAND_TURN_ON)
  141. return self._check_command_result(result, 0, {1})
  142. @update_after_operation
  143. async def turn_off(self) -> bool:
  144. """Turn device off."""
  145. result = await self._send_command(COMMAND_TURN_OFF)
  146. return self._check_command_result(result, 0, {1})
  147. @update_after_operation
  148. async def async_toggle(self, **kwargs) -> bool:
  149. """Toggle device."""
  150. result = await self._send_command(COMMAND_TOGGLE)
  151. return self._check_command_result(result, 0, {1})
  152. def is_on(self) -> bool | None:
  153. """Return switch state from cache."""
  154. return self._get_adv_value("isOn")
  155. class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
  156. """Representation of a Switchbot relay switch 2pm."""
  157. def __init__(
  158. self,
  159. device: BLEDevice,
  160. key_id: str,
  161. encryption_key: str,
  162. interface: int = 0,
  163. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM,
  164. **kwargs: Any,
  165. ) -> None:
  166. super().__init__(device, key_id, encryption_key, interface, model, **kwargs)
  167. self._channel = 2
  168. @property
  169. def channel(self) -> int:
  170. return self._channel
  171. def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
  172. """Return parsed device data, optionally for a specific channel."""
  173. data = self.data.get("data") or {}
  174. return data.get(channel, {})
  175. async def get_basic_info(self):
  176. current_time_hex, current_day_start_time_hex = self.get_current_time_and_start_time()
  177. if not (common_data := await super().get_basic_info()):
  178. return None
  179. if not (
  180. _channel2_data := await self._get_basic_info(COMMAND_GET_CHANNEL2_INFO.format(current_time_hex, current_day_start_time_hex))
  181. ):
  182. return None
  183. _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
  184. channel2_data = self._parse_user_data(_channel2_data)
  185. channel2_data["isOn"] = common_data["channel2_isOn"]
  186. if not channel2_data["isOn"]:
  187. self._reset_power_data(channel2_data)
  188. _LOGGER.debug("channel1_data: %s, channel2_data: %s", common_data, channel2_data)
  189. return {1: common_data, 2: channel2_data}
  190. @update_after_operation
  191. async def turn_on(self, channel: int) -> bool:
  192. """Turn device on."""
  193. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel])
  194. return self._check_command_result(result, 0, {1})
  195. @update_after_operation
  196. async def turn_off(self, channel: int) -> bool:
  197. """Turn device off."""
  198. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel])
  199. return self._check_command_result(result, 0, {1})
  200. @update_after_operation
  201. async def async_toggle(self, channel: int) -> bool:
  202. """Toggle device."""
  203. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel])
  204. return self._check_command_result(result, 0, {1})
  205. def is_on(self, channel: int) -> bool | None:
  206. """Return switch state from cache."""
  207. return self._get_adv_value("isOn", channel)
  208. def switch_mode(self, channel: int) -> bool | None:
  209. """Return true or false from cache."""
  210. return self._get_adv_value("switchMode", channel)