relay_switch.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from ..const import SwitchbotModel
  6. from ..helpers import parse_power_data, parse_uint24_be
  7. from ..models import SwitchBotAdvertisement
  8. from .device import (
  9. SwitchbotEncryptedDevice,
  10. SwitchbotSequenceDevice,
  11. update_after_operation,
  12. )
  13. _LOGGER = logging.getLogger(__name__)
  14. # Bit masks for status parsing
  15. SWITCH1_ON_MASK = 0b10000000
  16. SWITCH2_ON_MASK = 0b01000000
  17. DOOR_OPEN_MASK = 0b00100000
  18. COMMAND_HEADER = "57"
  19. COMMAND_CONTROL = "570f70"
  20. COMMAND_TOGGLE = f"{COMMAND_CONTROL}010200"
  21. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  22. COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
  23. COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
  24. COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
  25. MULTI_CHANNEL_COMMANDS_TURN_ON = {
  26. SwitchbotModel.RELAY_SWITCH_2PM: {
  27. 1: "570f70010d00",
  28. 2: "570f70010700",
  29. }
  30. }
  31. MULTI_CHANNEL_COMMANDS_TURN_OFF = {
  32. SwitchbotModel.RELAY_SWITCH_2PM: {
  33. 1: "570f70010c00",
  34. 2: "570f70010300",
  35. }
  36. }
  37. MULTI_CHANNEL_COMMANDS_TOGGLE = {
  38. SwitchbotModel.RELAY_SWITCH_2PM: {
  39. 1: "570f70010e00",
  40. 2: "570f70010b00",
  41. }
  42. }
  43. MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
  44. SwitchbotModel.RELAY_SWITCH_2PM: {
  45. 1: COMMAND_GET_CHANNEL1_INFO,
  46. 2: COMMAND_GET_CHANNEL2_INFO,
  47. }
  48. }
  49. class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  50. """Representation of a Switchbot relay switch 1pm."""
  51. _turn_on_command = f"{COMMAND_CONTROL}010100"
  52. _turn_off_command = f"{COMMAND_CONTROL}010000"
  53. def __init__(
  54. self,
  55. device: BLEDevice,
  56. key_id: str,
  57. encryption_key: str,
  58. interface: int = 0,
  59. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  60. **kwargs: Any,
  61. ) -> None:
  62. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  63. @classmethod
  64. async def verify_encryption_key(
  65. cls,
  66. device: BLEDevice,
  67. key_id: str,
  68. encryption_key: str,
  69. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  70. **kwargs: Any,
  71. ) -> bool:
  72. return await super().verify_encryption_key(
  73. device, key_id, encryption_key, model, **kwargs
  74. )
  75. def _reset_power_data(self, data: dict[str, Any]) -> None:
  76. """Reset power-related data to 0."""
  77. for key in ["power", "current", "voltage"]:
  78. data[key] = 0
  79. def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
  80. """Parse common data from raw bytes."""
  81. return {
  82. "sequence_number": raw_data[1],
  83. "isOn": bool(raw_data[2] & SWITCH1_ON_MASK),
  84. "firmware": raw_data[16] / 10.0,
  85. "channel2_isOn": bool(raw_data[2] & SWITCH2_ON_MASK),
  86. }
  87. def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
  88. """Parse user-specific data from raw bytes."""
  89. _energy = parse_uint24_be(raw_data, 1) / 60000
  90. _energy_usage_yesterday = parse_uint24_be(raw_data, 4) / 60000
  91. _use_time = parse_power_data(raw_data, 7, 60.0)
  92. _voltage = parse_power_data(raw_data, 9, 10.0)
  93. _current = parse_power_data(raw_data, 11, 1000.0)
  94. _power = parse_power_data(raw_data, 13, 10.0)
  95. return {
  96. "energy": 0.01 if 0 < _energy <= 0.01 else round(_energy, 2),
  97. "energy usage yesterday": 0.01
  98. if 0 < _energy_usage_yesterday <= 0.01
  99. else round(_energy_usage_yesterday, 2),
  100. "use_time": round(_use_time, 1),
  101. "voltage": 0.1 if 0 < _voltage <= 0.1 else round(_voltage),
  102. "current": 0.1 if 0 < _current <= 0.1 else round(_current, 1),
  103. "power": 0.1 if 0 < _power <= 0.1 else round(_power, 1),
  104. }
  105. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  106. """Update device data from advertisement."""
  107. adv_data = advertisement.data["data"]
  108. channel = self._channel if hasattr(self, "_channel") else None
  109. if self._model in (
  110. SwitchbotModel.RELAY_SWITCH_1PM,
  111. SwitchbotModel.RELAY_SWITCH_2PM,
  112. ):
  113. if channel is None:
  114. adv_data["voltage"] = self._get_adv_value("voltage") or 0
  115. adv_data["current"] = self._get_adv_value("current") or 0
  116. adv_data["power"] = self._get_adv_value("power") or 0
  117. adv_data["energy"] = self._get_adv_value("energy") or 0
  118. else:
  119. for i in range(1, channel + 1):
  120. adv_data[i] = adv_data.get(i, {})
  121. adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
  122. adv_data[i]["current"] = self._get_adv_value("current", i) or 0
  123. adv_data[i]["power"] = self._get_adv_value("power", i) or 0
  124. adv_data[i]["energy"] = self._get_adv_value("energy", i) or 0
  125. super().update_from_advertisement(advertisement)
  126. def get_current_time_and_start_time(self) -> int:
  127. """Get current time in seconds since epoch."""
  128. current_time = int(time.time())
  129. current_time_hex = f"{current_time:08x}"
  130. current_day_start_time = int(current_time / 86400) * 86400
  131. current_day_start_time_hex = f"{current_day_start_time:08x}"
  132. return current_time_hex, current_day_start_time_hex
  133. async def get_basic_info(self) -> dict[str, Any] | None:
  134. """Get device basic settings."""
  135. current_time_hex, current_day_start_time_hex = (
  136. self.get_current_time_and_start_time()
  137. )
  138. if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
  139. return None
  140. if not (
  141. _channel1_data := await self._get_basic_info(
  142. COMMAND_GET_CHANNEL1_INFO.format(
  143. current_time_hex, current_day_start_time_hex
  144. )
  145. )
  146. ):
  147. return None
  148. _LOGGER.debug(
  149. "on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex()
  150. )
  151. common_data = self._parse_common_data(_data)
  152. user_data = self._parse_user_data(_channel1_data)
  153. if self._model in (
  154. SwitchbotModel.RELAY_SWITCH_1,
  155. SwitchbotModel.GARAGE_DOOR_OPENER,
  156. ):
  157. for key in ["voltage", "current", "power", "energy"]:
  158. user_data.pop(key, None)
  159. if not common_data["isOn"]:
  160. self._reset_power_data(user_data)
  161. garage_door_opener_data = {"door_open": not bool(_data[2] & DOOR_OPEN_MASK)}
  162. _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
  163. if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
  164. return common_data | garage_door_opener_data
  165. return common_data | user_data
  166. @update_after_operation
  167. async def async_toggle(self, **kwargs) -> bool:
  168. """Toggle device."""
  169. result = await self._send_command(COMMAND_TOGGLE)
  170. return self._check_command_result(result, 0, {1})
  171. def is_on(self) -> bool | None:
  172. """Return switch state from cache."""
  173. return self._get_adv_value("isOn")
  174. class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
  175. """Representation of a Switchbot relay switch 2pm."""
  176. def __init__(
  177. self,
  178. device: BLEDevice,
  179. key_id: str,
  180. encryption_key: str,
  181. interface: int = 0,
  182. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM,
  183. **kwargs: Any,
  184. ) -> None:
  185. super().__init__(device, key_id, encryption_key, interface, model, **kwargs)
  186. self._channel = 2
  187. @property
  188. def channel(self) -> int:
  189. return self._channel
  190. def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
  191. """Return parsed device data, optionally for a specific channel."""
  192. data = self.data.get("data") or {}
  193. return data.get(channel, {})
  194. async def get_basic_info(self):
  195. current_time_hex, current_day_start_time_hex = (
  196. self.get_current_time_and_start_time()
  197. )
  198. if not (common_data := await super().get_basic_info()):
  199. return None
  200. if not (
  201. _channel2_data := await self._get_basic_info(
  202. COMMAND_GET_CHANNEL2_INFO.format(
  203. current_time_hex, current_day_start_time_hex
  204. )
  205. )
  206. ):
  207. return None
  208. _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
  209. channel2_data = self._parse_user_data(_channel2_data)
  210. channel2_data["isOn"] = common_data["channel2_isOn"]
  211. if not channel2_data["isOn"]:
  212. self._reset_power_data(channel2_data)
  213. _LOGGER.debug(
  214. "channel1_data: %s, channel2_data: %s", common_data, channel2_data
  215. )
  216. return {1: common_data, 2: channel2_data}
  217. @update_after_operation
  218. async def turn_on(self, channel: int) -> bool:
  219. """Turn device on."""
  220. result = await self._send_command(
  221. MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel]
  222. )
  223. return self._check_command_result(result, 0, {1})
  224. @update_after_operation
  225. async def turn_off(self, channel: int) -> bool:
  226. """Turn device off."""
  227. result = await self._send_command(
  228. MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel]
  229. )
  230. return self._check_command_result(result, 0, {1})
  231. @update_after_operation
  232. async def async_toggle(self, channel: int) -> bool:
  233. """Toggle device."""
  234. result = await self._send_command(
  235. MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel]
  236. )
  237. return self._check_command_result(result, 0, {1})
  238. def is_on(self, channel: int) -> bool | None:
  239. """Return switch state from cache."""
  240. return self._get_adv_value("isOn", channel)
  241. def switch_mode(self, channel: int) -> bool | None:
  242. """Return true or false from cache."""
  243. return self._get_adv_value("switchMode", channel)