relay_switch.py 9.9 KB


  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from ..const import SwitchbotModel
  6. from ..models import SwitchBotAdvertisement
  7. from .device import (
  8. SwitchbotEncryptedDevice,
  9. SwitchbotSequenceDevice,
  10. update_after_operation,
  11. )
  12. _LOGGER = logging.getLogger(__name__)
  13. COMMAND_HEADER = "57"
  14. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  15. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  16. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  17. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  18. COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
  19. COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
  20. COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
  21. MULTI_CHANNEL_COMMANDS_TURN_ON = {
  22. SwitchbotModel.RELAY_SWITCH_2PM: {
  23. 1: "570f70010d00",
  24. 2: "570f70010700",
  25. }
  26. }
  27. MULTI_CHANNEL_COMMANDS_TURN_OFF = {
  28. SwitchbotModel.RELAY_SWITCH_2PM: {
  29. 1: "570f70010c00",
  30. 2: "570f70010300",
  31. }
  32. }
  33. MULTI_CHANNEL_COMMANDS_TOGGLE = {
  34. SwitchbotModel.RELAY_SWITCH_2PM: {
  35. 1: "570f70010e00",
  36. 2: "570f70010b00",
  37. }
  38. }
  39. MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
  40. SwitchbotModel.RELAY_SWITCH_2PM: {
  41. 1: COMMAND_GET_CHANNEL1_INFO,
  42. 2: COMMAND_GET_CHANNEL2_INFO,
  43. }
  44. }
  45. class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  46. """Representation of a Switchbot relay switch 1pm."""
  47. def __init__(
  48. self,
  49. device: BLEDevice,
  50. key_id: str,
  51. encryption_key: str,
  52. interface: int = 0,
  53. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  54. **kwargs: Any,
  55. ) -> None:
  56. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  57. @classmethod
  58. async def verify_encryption_key(
  59. cls,
  60. device: BLEDevice,
  61. key_id: str,
  62. encryption_key: str,
  63. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  64. **kwargs: Any,
  65. ) -> bool:
  66. return await super().verify_encryption_key(
  67. device, key_id, encryption_key, model, **kwargs
  68. )
  69. def _reset_power_data(self, data: dict[str, Any]) -> None:
  70. """Reset power-related data to 0."""
  71. for key in ["power", "current", "voltage"]:
  72. data[key] = 0
  73. def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
  74. """Parse common data from raw bytes."""
  75. return {
  76. "sequence_number": raw_data[1],
  77. "isOn": bool(raw_data[2] & 0b10000000),
  78. "firmware": raw_data[16] / 10.0,
  79. "channel2_isOn": bool(raw_data[2] & 0b01000000),
  80. }
  81. def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
  82. """Parse user-specific data from raw bytes."""
  83. _energy = int.from_bytes(raw_data[1:4], "big") / 60000
  84. _energy_usage_yesterday = int.from_bytes(raw_data[4:7], "big") / 60000
  85. _use_time = int.from_bytes(raw_data[7:9], "big") / 60.0
  86. _voltage = int.from_bytes(raw_data[9:11], "big") / 10.0
  87. _current = int.from_bytes(raw_data[11:13], "big") / 1000.0
  88. _power = int.from_bytes(raw_data[13:15], "big") / 10.0
  89. return {
  90. "energy": 0.01 if 0 < _energy <= 0.01 else round(_energy, 2),
  91. "energy usage yesterday": 0.01 if 0 < _energy_usage_yesterday <= 0.01 else round(_energy_usage_yesterday, 2),
  92. "use_time": round(_use_time, 1),
  93. "voltage": 0.1 if 0 < _voltage <= 0.1 else round(_voltage),
  94. "current": 0.1 if 0 < _current <= 0.1 else round(_current, 1),
  95. "power": 0.1 if 0 < _power <= 0.1 else round(_power, 1),
  96. }
  97. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  98. """Update device data from advertisement."""
  99. adv_data = advertisement.data["data"]
  100. channel = self._channel if hasattr(self, "_channel") else None
  101. if self._model in (
  102. SwitchbotModel.RELAY_SWITCH_1PM,
  103. SwitchbotModel.RELAY_SWITCH_2PM,
  104. ):
  105. if channel is None:
  106. adv_data["voltage"] = self._get_adv_value("voltage") or 0
  107. adv_data["current"] = self._get_adv_value("current") or 0
  108. adv_data["power"] = self._get_adv_value("power") or 0
  109. adv_data["energy"] = self._get_adv_value("energy") or 0
  110. else:
  111. for i in range(1, channel + 1):
  112. adv_data[i] = adv_data.get(i, {})
  113. adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
  114. adv_data[i]["current"] = self._get_adv_value("current", i) or 0
  115. adv_data[i]["power"] = self._get_adv_value("power", i) or 0
  116. adv_data[i]["energy"] = self._get_adv_value("energy", i) or 0
  117. super().update_from_advertisement(advertisement)
  118. def get_current_time_and_start_time(self) -> int:
  119. """Get current time in seconds since epoch."""
  120. current_time = int(time.time())
  121. current_time_hex = f"{current_time:08x}"
  122. current_day_start_time = int(current_time / 86400) * 86400
  123. current_day_start_time_hex = f"{current_day_start_time:08x}"
  124. return current_time_hex, current_day_start_time_hex
  125. async def get_basic_info(self) -> dict[str, Any] | None:
  126. """Get device basic settings."""
  127. current_time_hex, current_day_start_time_hex = self.get_current_time_and_start_time()
  128. if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
  129. return None
  130. if not (_channel1_data := await self._get_basic_info(COMMAND_GET_CHANNEL1_INFO.format(current_time_hex, current_day_start_time_hex))):
  131. return None
  132. _LOGGER.debug("on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex())
  133. common_data = self._parse_common_data(_data)
  134. user_data = self._parse_user_data(_channel1_data)
  135. if self._model in (SwitchbotModel.RELAY_SWITCH_1, SwitchbotModel.GARAGE_DOOR_OPENER):
  136. for key in ["voltage", "current", "power", "energy"]:
  137. user_data.pop(key, None)
  138. if not common_data["isOn"]:
  139. self._reset_power_data(user_data)
  140. garage_door_opener_data = {"door_open": not bool(_data[2] & 0b00100000)}
  141. _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
  142. if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
  143. return common_data | garage_door_opener_data
  144. return common_data | user_data
  145. @update_after_operation
  146. async def turn_on(self) -> bool:
  147. """Turn device on."""
  148. result = await self._send_command(COMMAND_TURN_ON)
  149. return self._check_command_result(result, 0, {1})
  150. @update_after_operation
  151. async def turn_off(self) -> bool:
  152. """Turn device off."""
  153. result = await self._send_command(COMMAND_TURN_OFF)
  154. return self._check_command_result(result, 0, {1})
  155. @update_after_operation
  156. async def async_toggle(self, **kwargs) -> bool:
  157. """Toggle device."""
  158. result = await self._send_command(COMMAND_TOGGLE)
  159. return self._check_command_result(result, 0, {1})
  160. def is_on(self) -> bool | None:
  161. """Return switch state from cache."""
  162. return self._get_adv_value("isOn")
  163. class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
  164. """Representation of a Switchbot relay switch 2pm."""
  165. def __init__(
  166. self,
  167. device: BLEDevice,
  168. key_id: str,
  169. encryption_key: str,
  170. interface: int = 0,
  171. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM,
  172. **kwargs: Any,
  173. ) -> None:
  174. super().__init__(device, key_id, encryption_key, interface, model, **kwargs)
  175. self._channel = 2
  176. @property
  177. def channel(self) -> int:
  178. return self._channel
  179. def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
  180. """Return parsed device data, optionally for a specific channel."""
  181. data = self.data.get("data") or {}
  182. return data.get(channel, {})
  183. async def get_basic_info(self):
  184. current_time_hex, current_day_start_time_hex = self.get_current_time_and_start_time()
  185. if not (common_data := await super().get_basic_info()):
  186. return None
  187. if not (
  188. _channel2_data := await self._get_basic_info(COMMAND_GET_CHANNEL2_INFO.format(current_time_hex, current_day_start_time_hex))
  189. ):
  190. return None
  191. _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
  192. channel2_data = self._parse_user_data(_channel2_data)
  193. channel2_data["isOn"] = common_data["channel2_isOn"]
  194. if not channel2_data["isOn"]:
  195. self._reset_power_data(channel2_data)
  196. _LOGGER.debug("channel1_data: %s, channel2_data: %s", common_data, channel2_data)
  197. return {1: common_data, 2: channel2_data}
  198. @update_after_operation
  199. async def turn_on(self, channel: int) -> bool:
  200. """Turn device on."""
  201. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel])
  202. return self._check_command_result(result, 0, {1})
  203. @update_after_operation
  204. async def turn_off(self, channel: int) -> bool:
  205. """Turn device off."""
  206. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel])
  207. return self._check_command_result(result, 0, {1})
  208. @update_after_operation
  209. async def async_toggle(self, channel: int) -> bool:
  210. """Toggle device."""
  211. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel])
  212. return self._check_command_result(result, 0, {1})
  213. def is_on(self, channel: int) -> bool | None:
  214. """Return switch state from cache."""
  215. return self._get_adv_value("isOn", channel)
  216. def switch_mode(self, channel: int) -> bool | None:
  217. """Return true or false from cache."""
  218. return self._get_adv_value("switchMode", channel)