relay_switch.py 10 KB


  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from ..const import SwitchbotModel
  6. from ..models import SwitchBotAdvertisement
  7. from .device import (
  8. SwitchbotEncryptedDevice,
  9. SwitchbotSequenceDevice,
  10. update_after_operation,
  11. )
  12. _LOGGER = logging.getLogger(__name__)
  13. COMMAND_HEADER = "57"
  14. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  15. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  16. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  17. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  18. COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
  19. COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
  20. COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
  21. MULTI_CHANNEL_COMMANDS_TURN_ON = {
  22. SwitchbotModel.RELAY_SWITCH_2PM: {
  23. 1: "570f70010d00",
  24. 2: "570f70010700",
  25. }
  26. }
  27. MULTI_CHANNEL_COMMANDS_TURN_OFF = {
  28. SwitchbotModel.RELAY_SWITCH_2PM: {
  29. 1: "570f70010c00",
  30. 2: "570f70010300",
  31. }
  32. }
  33. MULTI_CHANNEL_COMMANDS_TOGGLE = {
  34. SwitchbotModel.RELAY_SWITCH_2PM: {
  35. 1: "570f70010e00",
  36. 2: "570f70010b00",
  37. }
  38. }
  39. MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
  40. SwitchbotModel.RELAY_SWITCH_2PM: {
  41. 1: COMMAND_GET_CHANNEL1_INFO,
  42. 2: COMMAND_GET_CHANNEL2_INFO,
  43. }
  44. }
  45. class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  46. """Representation of a Switchbot relay switch 1pm."""
  47. def __init__(
  48. self,
  49. device: BLEDevice,
  50. key_id: str,
  51. encryption_key: str,
  52. interface: int = 0,
  53. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  54. **kwargs: Any,
  55. ) -> None:
  56. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  57. @classmethod
  58. async def verify_encryption_key(
  59. cls,
  60. device: BLEDevice,
  61. key_id: str,
  62. encryption_key: str,
  63. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  64. **kwargs: Any,
  65. ) -> bool:
  66. return await super().verify_encryption_key(
  67. device, key_id, encryption_key, model, **kwargs
  68. )
  69. def _reset_power_data(self, data: dict[str, Any]) -> None:
  70. """Reset power-related data to 0."""
  71. for key in ["power", "current", "voltage"]:
  72. data[key] = 0
  73. def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
  74. """Parse common data from raw bytes."""
  75. return {
  76. "sequence_number": raw_data[1],
  77. "isOn": bool(raw_data[2] & 0b10000000),
  78. "firmware": raw_data[16] / 10.0,
  79. "channel2_isOn": bool(raw_data[2] & 0b01000000),
  80. }
  81. def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
  82. """Parse user-specific data from raw bytes."""
  83. _energy = int.from_bytes(raw_data[1:4], "big") / 60000
  84. _energy_usage_yesterday = int.from_bytes(raw_data[4:7], "big") / 60000
  85. _use_time = int.from_bytes(raw_data[7:9], "big") / 60.0
  86. _voltage = int.from_bytes(raw_data[9:11], "big") / 10.0
  87. _current = int.from_bytes(raw_data[11:13], "big") / 1000.0
  88. _power = int.from_bytes(raw_data[13:15], "big") / 10.0
  89. return {
  90. "energy": 0.01 if 0 < _energy <= 0.01 else round(_energy, 2),
  91. "energy usage yesterday": 0.01
  92. if 0 < _energy_usage_yesterday <= 0.01
  93. else round(_energy_usage_yesterday, 2),
  94. "use_time": round(_use_time, 1),
  95. "voltage": 0.1 if 0 < _voltage <= 0.1 else round(_voltage),
  96. "current": 0.1 if 0 < _current <= 0.1 else round(_current, 1),
  97. "power": 0.1 if 0 < _power <= 0.1 else round(_power, 1),
  98. }
  99. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  100. """Update device data from advertisement."""
  101. adv_data = advertisement.data["data"]
  102. channel = self._channel if hasattr(self, "_channel") else None
  103. if self._model in (
  104. SwitchbotModel.RELAY_SWITCH_1PM,
  105. SwitchbotModel.RELAY_SWITCH_2PM,
  106. ):
  107. if channel is None:
  108. adv_data["voltage"] = self._get_adv_value("voltage") or 0
  109. adv_data["current"] = self._get_adv_value("current") or 0
  110. adv_data["power"] = self._get_adv_value("power") or 0
  111. adv_data["energy"] = self._get_adv_value("energy") or 0
  112. else:
  113. for i in range(1, channel + 1):
  114. adv_data[i] = adv_data.get(i, {})
  115. adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
  116. adv_data[i]["current"] = self._get_adv_value("current", i) or 0
  117. adv_data[i]["power"] = self._get_adv_value("power", i) or 0
  118. adv_data[i]["energy"] = self._get_adv_value("energy", i) or 0
  119. super().update_from_advertisement(advertisement)
  120. def get_current_time_and_start_time(self) -> int:
  121. """Get current time in seconds since epoch."""
  122. current_time = int(time.time())
  123. current_time_hex = f"{current_time:08x}"
  124. current_day_start_time = int(current_time / 86400) * 86400
  125. current_day_start_time_hex = f"{current_day_start_time:08x}"
  126. return current_time_hex, current_day_start_time_hex
  127. async def get_basic_info(self) -> dict[str, Any] | None:
  128. """Get device basic settings."""
  129. current_time_hex, current_day_start_time_hex = (
  130. self.get_current_time_and_start_time()
  131. )
  132. if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
  133. return None
  134. if not (
  135. _channel1_data := await self._get_basic_info(
  136. COMMAND_GET_CHANNEL1_INFO.format(
  137. current_time_hex, current_day_start_time_hex
  138. )
  139. )
  140. ):
  141. return None
  142. _LOGGER.debug(
  143. "on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex()
  144. )
  145. common_data = self._parse_common_data(_data)
  146. user_data = self._parse_user_data(_channel1_data)
  147. if self._model in (
  148. SwitchbotModel.RELAY_SWITCH_1,
  149. SwitchbotModel.GARAGE_DOOR_OPENER,
  150. ):
  151. for key in ["voltage", "current", "power", "energy"]:
  152. user_data.pop(key, None)
  153. if not common_data["isOn"]:
  154. self._reset_power_data(user_data)
  155. garage_door_opener_data = {"door_open": not bool(_data[2] & 0b00100000)}
  156. _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
  157. if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
  158. return common_data | garage_door_opener_data
  159. return common_data | user_data
  160. @update_after_operation
  161. async def turn_on(self) -> bool:
  162. """Turn device on."""
  163. result = await self._send_command(COMMAND_TURN_ON)
  164. return self._check_command_result(result, 0, {1})
  165. @update_after_operation
  166. async def turn_off(self) -> bool:
  167. """Turn device off."""
  168. result = await self._send_command(COMMAND_TURN_OFF)
  169. return self._check_command_result(result, 0, {1})
  170. @update_after_operation
  171. async def async_toggle(self, **kwargs) -> bool:
  172. """Toggle device."""
  173. result = await self._send_command(COMMAND_TOGGLE)
  174. return self._check_command_result(result, 0, {1})
  175. def is_on(self) -> bool | None:
  176. """Return switch state from cache."""
  177. return self._get_adv_value("isOn")
  178. class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
  179. """Representation of a Switchbot relay switch 2pm."""
  180. def __init__(
  181. self,
  182. device: BLEDevice,
  183. key_id: str,
  184. encryption_key: str,
  185. interface: int = 0,
  186. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM,
  187. **kwargs: Any,
  188. ) -> None:
  189. super().__init__(device, key_id, encryption_key, interface, model, **kwargs)
  190. self._channel = 2
  191. @property
  192. def channel(self) -> int:
  193. return self._channel
  194. def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
  195. """Return parsed device data, optionally for a specific channel."""
  196. data = self.data.get("data") or {}
  197. return data.get(channel, {})
  198. async def get_basic_info(self):
  199. current_time_hex, current_day_start_time_hex = (
  200. self.get_current_time_and_start_time()
  201. )
  202. if not (common_data := await super().get_basic_info()):
  203. return None
  204. if not (
  205. _channel2_data := await self._get_basic_info(
  206. COMMAND_GET_CHANNEL2_INFO.format(
  207. current_time_hex, current_day_start_time_hex
  208. )
  209. )
  210. ):
  211. return None
  212. _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
  213. channel2_data = self._parse_user_data(_channel2_data)
  214. channel2_data["isOn"] = common_data["channel2_isOn"]
  215. if not channel2_data["isOn"]:
  216. self._reset_power_data(channel2_data)
  217. _LOGGER.debug(
  218. "channel1_data: %s, channel2_data: %s", common_data, channel2_data
  219. )
  220. return {1: common_data, 2: channel2_data}
  221. @update_after_operation
  222. async def turn_on(self, channel: int) -> bool:
  223. """Turn device on."""
  224. result = await self._send_command(
  225. MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel]
  226. )
  227. return self._check_command_result(result, 0, {1})
  228. @update_after_operation
  229. async def turn_off(self, channel: int) -> bool:
  230. """Turn device off."""
  231. result = await self._send_command(
  232. MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel]
  233. )
  234. return self._check_command_result(result, 0, {1})
  235. @update_after_operation
  236. async def async_toggle(self, channel: int) -> bool:
  237. """Toggle device."""
  238. result = await self._send_command(
  239. MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel]
  240. )
  241. return self._check_command_result(result, 0, {1})
  242. def is_on(self, channel: int) -> bool | None:
  243. """Return switch state from cache."""
  244. return self._get_adv_value("isOn", channel)
  245. def switch_mode(self, channel: int) -> bool | None:
  246. """Return true or false from cache."""
  247. return self._get_adv_value("switchMode", channel)