relay_switch.py 9.3 KB


  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from ..const import SwitchbotModel
  6. from ..models import SwitchBotAdvertisement
  7. from .device import (
  8. SwitchbotEncryptedDevice,
  9. SwitchbotSequenceDevice,
  10. update_after_operation,
  11. )
  12. _LOGGER = logging.getLogger(__name__)
  13. COMMAND_HEADER = "57"
  14. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  15. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  16. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  17. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  18. COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
  19. COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
  20. COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
  21. MULTI_CHANNEL_COMMANDS_TURN_ON = {
  22. SwitchbotModel.RELAY_SWITCH_2PM: {
  23. 1: "570f70010d00",
  24. 2: "570f70010700",
  25. }
  26. }
  27. MULTI_CHANNEL_COMMANDS_TURN_OFF = {
  28. SwitchbotModel.RELAY_SWITCH_2PM: {
  29. 1: "570f70010c00",
  30. 2: "570f70010300",
  31. }
  32. }
  33. MULTI_CHANNEL_COMMANDS_TOGGLE = {
  34. SwitchbotModel.RELAY_SWITCH_2PM: {
  35. 1: "570f70010e00",
  36. 2: "570f70010b00",
  37. }
  38. }
  39. MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
  40. SwitchbotModel.RELAY_SWITCH_2PM: {
  41. 1: COMMAND_GET_CHANNEL1_INFO,
  42. 2: COMMAND_GET_CHANNEL2_INFO,
  43. }
  44. }
  45. class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  46. """Representation of a Switchbot relay switch 1pm."""
  47. def __init__(
  48. self,
  49. device: BLEDevice,
  50. key_id: str,
  51. encryption_key: str,
  52. interface: int = 0,
  53. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  54. **kwargs: Any,
  55. ) -> None:
  56. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  57. @classmethod
  58. async def verify_encryption_key(
  59. cls,
  60. device: BLEDevice,
  61. key_id: str,
  62. encryption_key: str,
  63. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  64. **kwargs: Any,
  65. ) -> bool:
  66. return await super().verify_encryption_key(
  67. device, key_id, encryption_key, model, **kwargs
  68. )
  69. def _reset_power_data(self, data: dict[str, Any]) -> None:
  70. """Reset power-related data to 0."""
  71. for key in ["power", "current", "voltage"]:
  72. data[key] = 0
  73. def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
  74. """Parse common data from raw bytes."""
  75. return {
  76. "sequence_number": raw_data[1],
  77. "isOn": bool(raw_data[2] & 0b10000000),
  78. "firmware": raw_data[16] / 10.0,
  79. "channel2_isOn": bool(raw_data[2] & 0b01000000),
  80. }
  81. def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
  82. """Parse user-specific data from raw bytes."""
  83. return {
  84. "Electricity": round(int.from_bytes(raw_data[1:4], "big") / 60000, 2),
  85. "Electricity Usage Yesterday": round(int.from_bytes(raw_data[4:7], "big") / 60000, 2),
  86. "use_time": round(int.from_bytes(raw_data[7:9], "big") / 60, 2),
  87. "voltage": int.from_bytes(raw_data[9:11], "big") / 10.0,
  88. "current": int.from_bytes(raw_data[11:13], "big"),
  89. "power": int.from_bytes(raw_data[13:15], "big") / 10.0,
  90. }
  91. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  92. """Update device data from advertisement."""
  93. adv_data = advertisement.data["data"]
  94. channel = self._channel if hasattr(self, "_channel") else None
  95. if self._model in (
  96. SwitchbotModel.RELAY_SWITCH_1PM,
  97. SwitchbotModel.RELAY_SWITCH_2PM,
  98. ):
  99. if channel is None:
  100. adv_data["voltage"] = self._get_adv_value("voltage") or 0
  101. adv_data["current"] = self._get_adv_value("current") or 0
  102. adv_data["power"] = self._get_adv_value("power") or 0
  103. else:
  104. for i in range(1, channel + 1):
  105. adv_data[i] = adv_data.get(i, {})
  106. adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
  107. adv_data[i]["current"] = self._get_adv_value("current", i) or 0
  108. adv_data[i]["power"] = self._get_adv_value("power", i) or 0
  109. super().update_from_advertisement(advertisement)
  110. def get_current_time_and_start_time(self) -> int:
  111. """Get current time in seconds since epoch."""
  112. current_time = int(time.time())
  113. current_time_hex = f"{current_time:08x}"
  114. current_day_start_time = int(current_time / 86400) * 86400
  115. current_day_start_time_hex = f"{current_day_start_time:08x}"
  116. return current_time_hex, current_day_start_time_hex
  117. async def get_basic_info(self) -> dict[str, Any] | None:
  118. """Get device basic settings."""
  119. current_time_hex, current_day_start_time_hex = self.get_current_time_and_start_time()
  120. if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
  121. return None
  122. if not (_channel1_data := await self._get_basic_info(COMMAND_GET_CHANNEL1_INFO.format(current_time_hex, current_day_start_time_hex))):
  123. return None
  124. _LOGGER.debug("on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex())
  125. common_data = self._parse_common_data(_data)
  126. user_data = self._parse_user_data(_channel1_data)
  127. if self._model in (SwitchbotModel.RELAY_SWITCH_1, SwitchbotModel.GARAGE_DOOR_OPENER):
  128. for key in ["voltage", "current", "power"]:
  129. user_data.pop(key, None)
  130. if not common_data["isOn"]:
  131. self._reset_power_data(user_data)
  132. garage_door_opener_data = {"door_open": not bool(_data[7] & 0b00100000)}
  133. _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
  134. if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
  135. return common_data | garage_door_opener_data
  136. return common_data | user_data
  137. @update_after_operation
  138. async def turn_on(self) -> bool:
  139. """Turn device on."""
  140. result = await self._send_command(COMMAND_TURN_ON)
  141. return self._check_command_result(result, 0, {1})
  142. @update_after_operation
  143. async def turn_off(self) -> bool:
  144. """Turn device off."""
  145. result = await self._send_command(COMMAND_TURN_OFF)
  146. return self._check_command_result(result, 0, {1})
  147. @update_after_operation
  148. async def async_toggle(self, **kwargs) -> bool:
  149. """Toggle device."""
  150. result = await self._send_command(COMMAND_TOGGLE)
  151. return self._check_command_result(result, 0, {1})
  152. def is_on(self) -> bool | None:
  153. """Return switch state from cache."""
  154. return self._get_adv_value("isOn")
  155. class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
  156. """Representation of a Switchbot relay switch 2pm."""
  157. def __init__(
  158. self,
  159. device: BLEDevice,
  160. key_id: str,
  161. encryption_key: str,
  162. interface: int = 0,
  163. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM,
  164. **kwargs: Any,
  165. ) -> None:
  166. super().__init__(device, key_id, encryption_key, interface, model, **kwargs)
  167. self._channel = 2
  168. @property
  169. def channel(self) -> int:
  170. return self._channel
  171. def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
  172. """Return parsed device data, optionally for a specific channel."""
  173. data = self.data.get("data") or {}
  174. return data.get(channel, {})
  175. async def get_basic_info(self):
  176. current_time_hex, current_day_start_time_hex = self.get_current_time_and_start_time()
  177. if not (common_data := await super().get_basic_info()):
  178. return None
  179. if not (
  180. _channel2_data := await self._get_basic_info(COMMAND_GET_CHANNEL2_INFO.format(current_time_hex, current_day_start_time_hex))
  181. ):
  182. return None
  183. _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
  184. channel2_data = self._parse_user_data(_channel2_data)
  185. channel2_data["isOn"] = common_data["channel2_isOn"]
  186. if not channel2_data["isOn"]:
  187. self._reset_power_data(channel2_data)
  188. _LOGGER.debug("channel1_data: %s, channel2_data: %s", common_data, channel2_data)
  189. return {1: common_data, 2: channel2_data}
  190. @update_after_operation
  191. async def turn_on(self, channel: int) -> bool:
  192. """Turn device on."""
  193. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel])
  194. return self._check_command_result(result, 0, {1})
  195. @update_after_operation
  196. async def turn_off(self, channel: int) -> bool:
  197. """Turn device off."""
  198. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel])
  199. return self._check_command_result(result, 0, {1})
  200. @update_after_operation
  201. async def async_toggle(self, channel: int) -> bool:
  202. """Toggle device."""
  203. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel])
  204. return self._check_command_result(result, 0, {1})
  205. def is_on(self, channel: int) -> bool | None:
  206. """Return switch state from cache."""
  207. return self._get_adv_value("isOn", channel)
  208. def switch_mode(self, channel: int) -> bool | None:
  209. """Return true or false from cache."""
  210. return self._get_adv_value("switchMode", channel)