1
0

relay_switch.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. import logging
  2. import time
  3. from typing import Any
  4. from ..const import SwitchbotModel
  5. from ..helpers import parse_power_data, parse_uint24_be
  6. from ..models import SwitchBotAdvertisement
  7. from .device import (
  8. SwitchbotEncryptedDevice,
  9. SwitchbotSequenceDevice,
  10. update_after_operation,
  11. )
  12. _LOGGER = logging.getLogger(__name__)
  13. # Bit masks for status parsing
  14. SWITCH1_ON_MASK = 0b10000000
  15. SWITCH2_ON_MASK = 0b01000000
  16. DOOR_OPEN_MASK = 0b00100000
  17. COMMAND_HEADER = "57"
  18. COMMAND_CONTROL = "570f70"
  19. COMMAND_TOGGLE = f"{COMMAND_CONTROL}010200"
  20. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  21. COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
  22. COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
  23. COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
  24. MULTI_CHANNEL_COMMANDS_TURN_ON = {
  25. SwitchbotModel.RELAY_SWITCH_2PM: {
  26. 1: "570f70010d00",
  27. 2: "570f70010700",
  28. }
  29. }
  30. MULTI_CHANNEL_COMMANDS_TURN_OFF = {
  31. SwitchbotModel.RELAY_SWITCH_2PM: {
  32. 1: "570f70010c00",
  33. 2: "570f70010300",
  34. }
  35. }
  36. MULTI_CHANNEL_COMMANDS_TOGGLE = {
  37. SwitchbotModel.RELAY_SWITCH_2PM: {
  38. 1: "570f70010e00",
  39. 2: "570f70010b00",
  40. }
  41. }
  42. MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
  43. SwitchbotModel.RELAY_SWITCH_2PM: {
  44. 1: COMMAND_GET_CHANNEL1_INFO,
  45. 2: COMMAND_GET_CHANNEL2_INFO,
  46. }
  47. }
  48. class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  49. """Representation of a Switchbot relay switch 1pm."""
  50. _model = SwitchbotModel.RELAY_SWITCH_1PM
  51. _turn_on_command = f"{COMMAND_CONTROL}010100"
  52. _turn_off_command = f"{COMMAND_CONTROL}010000"
  53. def _reset_power_data(self, data: dict[str, Any]) -> None:
  54. """Reset power-related data to 0."""
  55. for key in ["power", "current", "voltage"]:
  56. data[key] = 0
  57. def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
  58. """Parse common data from raw bytes."""
  59. return {
  60. "sequence_number": raw_data[1],
  61. "isOn": bool(raw_data[2] & SWITCH1_ON_MASK),
  62. "firmware": raw_data[16] / 10.0,
  63. "channel2_isOn": bool(raw_data[2] & SWITCH2_ON_MASK),
  64. }
  65. def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
  66. """Parse user-specific data from raw bytes."""
  67. _energy = parse_uint24_be(raw_data, 1) / 60000
  68. _energy_usage_yesterday = parse_uint24_be(raw_data, 4) / 60000
  69. _use_time = parse_power_data(raw_data, 7, 60.0)
  70. _voltage = parse_power_data(raw_data, 9, 10.0)
  71. _current = parse_power_data(raw_data, 11, 1000.0)
  72. _power = parse_power_data(raw_data, 13, 10.0)
  73. return {
  74. "energy": 0.01 if 0 < _energy <= 0.01 else round(_energy, 2),
  75. "energy usage yesterday": 0.01
  76. if 0 < _energy_usage_yesterday <= 0.01
  77. else round(_energy_usage_yesterday, 2),
  78. "use_time": round(_use_time, 1),
  79. "voltage": 0.1 if 0 < _voltage <= 0.1 else round(_voltage),
  80. "current": 0.1 if 0 < _current <= 0.1 else round(_current, 1),
  81. "power": 0.1 if 0 < _power <= 0.1 else round(_power, 1),
  82. }
  83. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  84. """Update device data from advertisement."""
  85. adv_data = advertisement.data["data"]
  86. channel = self._channel if hasattr(self, "_channel") else None
  87. if self._model in (
  88. SwitchbotModel.RELAY_SWITCH_1PM,
  89. SwitchbotModel.RELAY_SWITCH_2PM,
  90. SwitchbotModel.PLUG_MINI_EU,
  91. ):
  92. if channel is None:
  93. adv_data["voltage"] = self._get_adv_value("voltage") or 0
  94. adv_data["current"] = self._get_adv_value("current") or 0
  95. adv_data["power"] = self._get_adv_value("power") or 0
  96. adv_data["energy"] = self._get_adv_value("energy") or 0
  97. else:
  98. for i in range(1, channel + 1):
  99. adv_data[i] = adv_data.get(i, {})
  100. adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
  101. adv_data[i]["current"] = self._get_adv_value("current", i) or 0
  102. adv_data[i]["power"] = self._get_adv_value("power", i) or 0
  103. adv_data[i]["energy"] = self._get_adv_value("energy", i) or 0
  104. super().update_from_advertisement(advertisement)
  105. def get_current_time_and_start_time(self) -> int:
  106. """Get current time in seconds since epoch."""
  107. current_time = int(time.time())
  108. current_time_hex = f"{current_time:08x}"
  109. current_day_start_time = int(current_time / 86400) * 86400
  110. current_day_start_time_hex = f"{current_day_start_time:08x}"
  111. return current_time_hex, current_day_start_time_hex
  112. async def get_basic_info(self) -> dict[str, Any] | None:
  113. """Get device basic settings."""
  114. current_time_hex, current_day_start_time_hex = (
  115. self.get_current_time_and_start_time()
  116. )
  117. if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
  118. return None
  119. if len(_data) < 17:
  120. _LOGGER.warning(
  121. "%s: Short basic-info response (%d bytes): %s",
  122. self.name,
  123. len(_data),
  124. _data.hex(),
  125. )
  126. return None
  127. if not (
  128. _channel1_data := await self._get_basic_info(
  129. COMMAND_GET_CHANNEL1_INFO.format(
  130. current_time_hex, current_day_start_time_hex
  131. )
  132. )
  133. ):
  134. return None
  135. if len(_channel1_data) < 15:
  136. _LOGGER.warning(
  137. "%s: Short channel1 response (%d bytes): %s",
  138. self.name,
  139. len(_channel1_data),
  140. _channel1_data.hex(),
  141. )
  142. return None
  143. _LOGGER.debug(
  144. "on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex()
  145. )
  146. common_data = self._parse_common_data(_data)
  147. user_data = self._parse_user_data(_channel1_data)
  148. if self._model in (
  149. SwitchbotModel.RELAY_SWITCH_1,
  150. SwitchbotModel.GARAGE_DOOR_OPENER,
  151. ):
  152. for key in ["voltage", "current", "power", "energy"]:
  153. user_data.pop(key, None)
  154. if not common_data["isOn"]:
  155. self._reset_power_data(user_data)
  156. garage_door_opener_data = {"door_open": not bool(_data[2] & DOOR_OPEN_MASK)}
  157. _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
  158. if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
  159. return common_data | garage_door_opener_data
  160. return common_data | user_data
  161. @update_after_operation
  162. async def async_toggle(self, **kwargs) -> bool:
  163. """Toggle device."""
  164. result = await self._send_command(COMMAND_TOGGLE)
  165. return self._check_command_result(result, 0, {1})
  166. def is_on(self) -> bool | None:
  167. """Return switch state from cache."""
  168. return self._get_adv_value("isOn")
  169. def door_open(self) -> bool | None:
  170. """Return garage door state from cache."""
  171. return self._get_adv_value("door_open")
  172. class SwitchbotGarageDoorOpener(SwitchbotRelaySwitch):
  173. """Representation of a Switchbot garage door opener."""
  174. _model = SwitchbotModel.GARAGE_DOOR_OPENER
  175. _open_command = f"{COMMAND_CONTROL}110129"
  176. _close_command = f"{COMMAND_CONTROL}110229"
  177. _press_command = f"{COMMAND_CONTROL}110329" # for garage door opener toggle
  178. class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
  179. """Representation of a Switchbot relay switch 2pm."""
  180. _model = SwitchbotModel.RELAY_SWITCH_2PM
  181. _channel: int = 2
  182. @property
  183. def channel(self) -> int:
  184. return self._channel
  185. def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
  186. """Return parsed device data, optionally for a specific channel."""
  187. data = self.data.get("data") or {}
  188. return data.get(channel, {})
  189. async def get_basic_info(self):
  190. current_time_hex, current_day_start_time_hex = (
  191. self.get_current_time_and_start_time()
  192. )
  193. if not (common_data := await super().get_basic_info()):
  194. return None
  195. if not (
  196. _channel2_data := await self._get_basic_info(
  197. COMMAND_GET_CHANNEL2_INFO.format(
  198. current_time_hex, current_day_start_time_hex
  199. )
  200. )
  201. ):
  202. return None
  203. if len(_channel2_data) < 15:
  204. _LOGGER.warning(
  205. "%s: Short channel2 response (%d bytes): %s",
  206. self.name,
  207. len(_channel2_data),
  208. _channel2_data.hex(),
  209. )
  210. return None
  211. _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
  212. channel2_data = self._parse_user_data(_channel2_data)
  213. channel2_data["isOn"] = common_data["channel2_isOn"]
  214. if not channel2_data["isOn"]:
  215. self._reset_power_data(channel2_data)
  216. _LOGGER.debug(
  217. "channel1_data: %s, channel2_data: %s", common_data, channel2_data
  218. )
  219. return {1: common_data, 2: channel2_data}
  220. @update_after_operation
  221. async def turn_on(self, channel: int) -> bool:
  222. """Turn device on."""
  223. result = await self._send_command(
  224. MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel]
  225. )
  226. return self._check_command_result(result, 0, {1})
  227. @update_after_operation
  228. async def turn_off(self, channel: int) -> bool:
  229. """Turn device off."""
  230. result = await self._send_command(
  231. MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel]
  232. )
  233. return self._check_command_result(result, 0, {1})
  234. @update_after_operation
  235. async def async_toggle(self, channel: int) -> bool:
  236. """Toggle device."""
  237. result = await self._send_command(
  238. MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel]
  239. )
  240. return self._check_command_result(result, 0, {1})
  241. def is_on(self, channel: int) -> bool | None:
  242. """Return switch state from cache."""
  243. return self._get_adv_value("isOn", channel)
  244. def switch_mode(self, channel: int) -> bool | None:
  245. """Return true or false from cache."""
  246. return self._get_adv_value("switchMode", channel)