relay_switch.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from ..const import SwitchbotModel
  6. from ..models import SwitchBotAdvertisement
  7. from .device import (
  8. SwitchbotEncryptedDevice,
  9. SwitchbotSequenceDevice,
  10. update_after_operation,
  11. )
  12. _LOGGER = logging.getLogger(__name__)
  13. COMMAND_HEADER = "57"
  14. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  15. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  16. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  17. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  18. COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
  19. COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
  20. COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
  21. MULTI_CHANNEL_COMMANDS_TURN_ON = {
  22. SwitchbotModel.RELAY_SWITCH_2PM: {
  23. 1: "570f70010d00",
  24. 2: "570f70010700",
  25. }
  26. }
  27. MULTI_CHANNEL_COMMANDS_TURN_OFF = {
  28. SwitchbotModel.RELAY_SWITCH_2PM: {
  29. 1: "570f70010c00",
  30. 2: "570f70010300",
  31. }
  32. }
  33. MULTI_CHANNEL_COMMANDS_TOGGLE = {
  34. SwitchbotModel.RELAY_SWITCH_2PM: {
  35. 1: "570f70010e00",
  36. 2: "570f70010b00",
  37. }
  38. }
  39. MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
  40. SwitchbotModel.RELAY_SWITCH_2PM: {
  41. 1: COMMAND_GET_CHANNEL1_INFO,
  42. 2: COMMAND_GET_CHANNEL2_INFO,
  43. }
  44. }
  45. class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  46. """Representation of a Switchbot relay switch 1pm."""
  47. def __init__(
  48. self,
  49. device: BLEDevice,
  50. key_id: str,
  51. encryption_key: str,
  52. interface: int = 0,
  53. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  54. **kwargs: Any,
  55. ) -> None:
  56. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  57. @classmethod
  58. async def verify_encryption_key(
  59. cls,
  60. device: BLEDevice,
  61. key_id: str,
  62. encryption_key: str,
  63. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  64. **kwargs: Any,
  65. ) -> bool:
  66. return await super().verify_encryption_key(
  67. device, key_id, encryption_key, model, **kwargs
  68. )
  69. def _reset_power_data(self, data: dict[str, Any]) -> None:
  70. """Reset power-related data to 0."""
  71. for key in ["power", "current", "voltage"]:
  72. data[key] = 0
  73. def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
  74. """Parse common data from raw bytes."""
  75. return {
  76. "sequence_number": raw_data[1],
  77. "isOn": bool(raw_data[2] & 0b10000000),
  78. "firmware": raw_data[16] / 10.0,
  79. "channel2_isOn": bool(raw_data[2] & 0b01000000),
  80. }
  81. def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
  82. """Parse user-specific data from raw bytes."""
  83. _energy = int.from_bytes(raw_data[1:4], "big") / 60000
  84. _energy_usage_yesterday = int.from_bytes(raw_data[4:7], "big") / 60000
  85. _use_time = int.from_bytes(raw_data[7:9], "big") / 60.0
  86. _voltage = int.from_bytes(raw_data[9:11], "big") / 10.0
  87. _current = int.from_bytes(raw_data[11:13], "big") / 1000.0
  88. _power = int.from_bytes(raw_data[13:15], "big") / 10.0
  89. return {
  90. "energy": 0.01 if 0 < _energy <= 0.01 else round(_energy, 2),
  91. "energy usage yesterday": 0.01 if 0 < _energy_usage_yesterday <= 0.01 else round(_energy_usage_yesterday, 2),
  92. "use_time": round(_use_time, 1),
  93. "voltage": 0.1 if 0 < _voltage <= 0.1 else round(_voltage),
  94. "current": 0.1 if 0 < _current <= 0.1 else round(_current, 1),
  95. "power": 0.1 if 0 < _power <= 0.1 else round(_power, 1),
  96. }
  97. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  98. """Update device data from advertisement."""
  99. adv_data = advertisement.data["data"]
  100. channel = self._channel if hasattr(self, "_channel") else None
  101. if self._model in (
  102. SwitchbotModel.RELAY_SWITCH_1PM,
  103. SwitchbotModel.RELAY_SWITCH_2PM,
  104. ):
  105. if channel is None:
  106. adv_data["voltage"] = self._get_adv_value("voltage") or 0
  107. adv_data["current"] = self._get_adv_value("current") or 0
  108. adv_data["power"] = self._get_adv_value("power") or 0
  109. adv_data["energy"] = self._get_adv_value("energy") or 0
  110. else:
  111. for i in range(1, channel + 1):
  112. adv_data[i] = adv_data.get(i, {})
  113. adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
  114. adv_data[i]["current"] = self._get_adv_value("current", i) or 0
  115. adv_data[i]["power"] = self._get_adv_value("power", i) or 0
  116. adv_data[i]["energy"] = self._get_adv_value("energy", i) or 0
  117. super().update_from_advertisement(advertisement)
  118. def get_current_time_and_start_time(self) -> int:
  119. """Get current time in seconds since epoch."""
  120. current_time = int(time.time())
  121. current_time_hex = f"{current_time:08x}"
  122. current_day_start_time = int(current_time / 86400) * 86400
  123. current_day_start_time_hex = f"{current_day_start_time:08x}"
  124. return current_time_hex, current_day_start_time_hex
  125. async def get_basic_info(self) -> dict[str, Any] | None:
  126. """Get device basic settings."""
  127. current_time_hex, current_day_start_time_hex = self.get_current_time_and_start_time()
  128. if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
  129. return None
  130. if not (_channel1_data := await self._get_basic_info(COMMAND_GET_CHANNEL1_INFO.format(current_time_hex, current_day_start_time_hex))):
  131. return None
  132. _LOGGER.debug("on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex())
  133. common_data = self._parse_common_data(_data)
  134. user_data = self._parse_user_data(_channel1_data)
  135. if self._model in (SwitchbotModel.RELAY_SWITCH_1, SwitchbotModel.GARAGE_DOOR_OPENER):
  136. for key in ["voltage", "current", "power", "energy"]:
  137. user_data.pop(key, None)
  138. if not common_data["isOn"]:
  139. self._reset_power_data(user_data)
  140. garage_door_opener_data = {"door_open": not bool(_data[2] & 0b00100000)}
  141. _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
  142. if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
  143. return common_data | garage_door_opener_data
  144. return common_data | user_data
  145. @update_after_operation
  146. async def turn_on(self) -> bool:
  147. """Turn device on."""
  148. result = await self._send_command(COMMAND_TURN_ON)
  149. return self._check_command_result(result, 0, {1})
  150. @update_after_operation
  151. async def turn_off(self) -> bool:
  152. """Turn device off."""
  153. result = await self._send_command(COMMAND_TURN_OFF)
  154. return self._check_command_result(result, 0, {1})
  155. @update_after_operation
  156. async def async_toggle(self, **kwargs) -> bool:
  157. """Toggle device."""
  158. result = await self._send_command(COMMAND_TOGGLE)
  159. return self._check_command_result(result, 0, {1})
  160. def is_on(self) -> bool | None:
  161. """Return switch state from cache."""
  162. return self._get_adv_value("isOn")
  163. class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
  164. """Representation of a Switchbot relay switch 2pm."""
  165. def __init__(
  166. self,
  167. device: BLEDevice,
  168. key_id: str,
  169. encryption_key: str,
  170. interface: int = 0,
  171. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM,
  172. **kwargs: Any,
  173. ) -> None:
  174. super().__init__(device, key_id, encryption_key, interface, model, **kwargs)
  175. self._channel = 2
  176. @property
  177. def channel(self) -> int:
  178. return self._channel
  179. def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
  180. """Return parsed device data, optionally for a specific channel."""
  181. data = self.data.get("data") or {}
  182. return data.get(channel, {})
  183. async def get_basic_info(self):
  184. current_time_hex, current_day_start_time_hex = self.get_current_time_and_start_time()
  185. if not (common_data := await super().get_basic_info()):
  186. return None
  187. if not (
  188. _channel2_data := await self._get_basic_info(COMMAND_GET_CHANNEL2_INFO.format(current_time_hex, current_day_start_time_hex))
  189. ):
  190. return None
  191. _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
  192. channel2_data = self._parse_user_data(_channel2_data)
  193. channel2_data["isOn"] = common_data["channel2_isOn"]
  194. if not channel2_data["isOn"]:
  195. self._reset_power_data(channel2_data)
  196. _LOGGER.debug("channel1_data: %s, channel2_data: %s", common_data, channel2_data)
  197. return {1: common_data, 2: channel2_data}
  198. @update_after_operation
  199. async def turn_on(self, channel: int) -> bool:
  200. """Turn device on."""
  201. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel])
  202. return self._check_command_result(result, 0, {1})
  203. @update_after_operation
  204. async def turn_off(self, channel: int) -> bool:
  205. """Turn device off."""
  206. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel])
  207. return self._check_command_result(result, 0, {1})
  208. @update_after_operation
  209. async def async_toggle(self, channel: int) -> bool:
  210. """Toggle device."""
  211. result = await self._send_command(MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel])
  212. return self._check_command_result(result, 0, {1})
  213. def is_on(self, channel: int) -> bool | None:
  214. """Return switch state from cache."""
  215. return self._get_adv_value("isOn", channel)
  216. def switch_mode(self, channel: int) -> bool | None:
  217. """Return true or false from cache."""
  218. return self._get_adv_value("switchMode", channel)