relay_switch.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. import logging
  2. import time
  3. from typing import Any
  4. from ..const import SwitchbotModel
  5. from ..helpers import parse_power_data, parse_uint24_be
  6. from ..models import SwitchBotAdvertisement
  7. from .device import (
  8. SwitchbotEncryptedDevice,
  9. SwitchbotSequenceDevice,
  10. update_after_operation,
  11. )
  12. _LOGGER = logging.getLogger(__name__)
  13. # Bit masks for status parsing
  14. SWITCH1_ON_MASK = 0b10000000
  15. SWITCH2_ON_MASK = 0b01000000
  16. DOOR_OPEN_MASK = 0b00100000
  17. COMMAND_HEADER = "57"
  18. COMMAND_CONTROL = "570f70"
  19. COMMAND_TOGGLE = f"{COMMAND_CONTROL}010200"
  20. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  21. COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
  22. COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
  23. COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
  24. MULTI_CHANNEL_COMMANDS_TURN_ON = {
  25. SwitchbotModel.RELAY_SWITCH_2PM: {
  26. 1: "570f70010d00",
  27. 2: "570f70010700",
  28. }
  29. }
  30. MULTI_CHANNEL_COMMANDS_TURN_OFF = {
  31. SwitchbotModel.RELAY_SWITCH_2PM: {
  32. 1: "570f70010c00",
  33. 2: "570f70010300",
  34. }
  35. }
  36. MULTI_CHANNEL_COMMANDS_TOGGLE = {
  37. SwitchbotModel.RELAY_SWITCH_2PM: {
  38. 1: "570f70010e00",
  39. 2: "570f70010b00",
  40. }
  41. }
  42. MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
  43. SwitchbotModel.RELAY_SWITCH_2PM: {
  44. 1: COMMAND_GET_CHANNEL1_INFO,
  45. 2: COMMAND_GET_CHANNEL2_INFO,
  46. }
  47. }
  48. class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  49. """Representation of a Switchbot relay switch 1pm."""
  50. _model = SwitchbotModel.RELAY_SWITCH_1PM
  51. _turn_on_command = f"{COMMAND_CONTROL}010100"
  52. _turn_off_command = f"{COMMAND_CONTROL}010000"
  53. def _reset_power_data(self, data: dict[str, Any]) -> None:
  54. """Reset power-related data to 0."""
  55. for key in ["power", "current", "voltage"]:
  56. data[key] = 0
  57. def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
  58. """Parse common data from raw bytes."""
  59. return {
  60. "sequence_number": raw_data[1],
  61. "isOn": bool(raw_data[2] & SWITCH1_ON_MASK),
  62. "firmware": raw_data[16] / 10.0,
  63. "channel2_isOn": bool(raw_data[2] & SWITCH2_ON_MASK),
  64. }
  65. def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
  66. """Parse user-specific data from raw bytes."""
  67. _energy = parse_uint24_be(raw_data, 1) / 60000
  68. _energy_usage_yesterday = parse_uint24_be(raw_data, 4) / 60000
  69. _use_time = parse_power_data(raw_data, 7, 60.0)
  70. _voltage = parse_power_data(raw_data, 9, 10.0)
  71. _current = parse_power_data(raw_data, 11, 1000.0)
  72. _power = parse_power_data(raw_data, 13, 10.0)
  73. return {
  74. "energy": 0.01 if 0 < _energy <= 0.01 else round(_energy, 2),
  75. "energy usage yesterday": 0.01
  76. if 0 < _energy_usage_yesterday <= 0.01
  77. else round(_energy_usage_yesterday, 2),
  78. "use_time": round(_use_time, 1),
  79. "voltage": 0.1 if 0 < _voltage <= 0.1 else round(_voltage),
  80. "current": 0.1 if 0 < _current <= 0.1 else round(_current, 1),
  81. "power": 0.1 if 0 < _power <= 0.1 else round(_power, 1),
  82. }
  83. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  84. """Update device data from advertisement."""
  85. adv_data = advertisement.data["data"]
  86. channel = self._channel if hasattr(self, "_channel") else None
  87. if self._model in (
  88. SwitchbotModel.RELAY_SWITCH_1PM,
  89. SwitchbotModel.RELAY_SWITCH_2PM,
  90. SwitchbotModel.PLUG_MINI_EU,
  91. ):
  92. if channel is None:
  93. adv_data["voltage"] = self._get_adv_value("voltage") or 0
  94. adv_data["current"] = self._get_adv_value("current") or 0
  95. adv_data["power"] = self._get_adv_value("power") or 0
  96. adv_data["energy"] = self._get_adv_value("energy") or 0
  97. else:
  98. for i in range(1, channel + 1):
  99. adv_data[i] = adv_data.get(i, {})
  100. adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
  101. adv_data[i]["current"] = self._get_adv_value("current", i) or 0
  102. adv_data[i]["power"] = self._get_adv_value("power", i) or 0
  103. adv_data[i]["energy"] = self._get_adv_value("energy", i) or 0
  104. super().update_from_advertisement(advertisement)
  105. def get_current_time_and_start_time(self) -> int:
  106. """Get current time in seconds since epoch."""
  107. current_time = int(time.time())
  108. current_time_hex = f"{current_time:08x}"
  109. current_day_start_time = int(current_time / 86400) * 86400
  110. current_day_start_time_hex = f"{current_day_start_time:08x}"
  111. return current_time_hex, current_day_start_time_hex
  112. async def get_basic_info(self) -> dict[str, Any] | None:
  113. """Get device basic settings."""
  114. current_time_hex, current_day_start_time_hex = (
  115. self.get_current_time_and_start_time()
  116. )
  117. if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
  118. return None
  119. if not (
  120. _channel1_data := await self._get_basic_info(
  121. COMMAND_GET_CHANNEL1_INFO.format(
  122. current_time_hex, current_day_start_time_hex
  123. )
  124. )
  125. ):
  126. return None
  127. _LOGGER.debug(
  128. "on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex()
  129. )
  130. common_data = self._parse_common_data(_data)
  131. user_data = self._parse_user_data(_channel1_data)
  132. if self._model in (
  133. SwitchbotModel.RELAY_SWITCH_1,
  134. SwitchbotModel.GARAGE_DOOR_OPENER,
  135. ):
  136. for key in ["voltage", "current", "power", "energy"]:
  137. user_data.pop(key, None)
  138. if not common_data["isOn"]:
  139. self._reset_power_data(user_data)
  140. garage_door_opener_data = {"door_open": not bool(_data[2] & DOOR_OPEN_MASK)}
  141. _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
  142. if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
  143. return common_data | garage_door_opener_data
  144. return common_data | user_data
  145. @update_after_operation
  146. async def async_toggle(self, **kwargs) -> bool:
  147. """Toggle device."""
  148. result = await self._send_command(COMMAND_TOGGLE)
  149. return self._check_command_result(result, 0, {1})
  150. def is_on(self) -> bool | None:
  151. """Return switch state from cache."""
  152. return self._get_adv_value("isOn")
  153. def door_open(self) -> bool | None:
  154. """Return garage door state from cache."""
  155. return self._get_adv_value("door_open")
  156. class SwitchbotGarageDoorOpener(SwitchbotRelaySwitch):
  157. """Representation of a Switchbot garage door opener."""
  158. _model = SwitchbotModel.GARAGE_DOOR_OPENER
  159. _open_command = f"{COMMAND_CONTROL}110129"
  160. _close_command = f"{COMMAND_CONTROL}110229"
  161. _press_command = f"{COMMAND_CONTROL}110329" # for garage door opener toggle
  162. class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
  163. """Representation of a Switchbot relay switch 2pm."""
  164. _model = SwitchbotModel.RELAY_SWITCH_2PM
  165. _channel: int = 2
  166. @property
  167. def channel(self) -> int:
  168. return self._channel
  169. def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
  170. """Return parsed device data, optionally for a specific channel."""
  171. data = self.data.get("data") or {}
  172. return data.get(channel, {})
  173. async def get_basic_info(self):
  174. current_time_hex, current_day_start_time_hex = (
  175. self.get_current_time_and_start_time()
  176. )
  177. if not (common_data := await super().get_basic_info()):
  178. return None
  179. if not (
  180. _channel2_data := await self._get_basic_info(
  181. COMMAND_GET_CHANNEL2_INFO.format(
  182. current_time_hex, current_day_start_time_hex
  183. )
  184. )
  185. ):
  186. return None
  187. _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
  188. channel2_data = self._parse_user_data(_channel2_data)
  189. channel2_data["isOn"] = common_data["channel2_isOn"]
  190. if not channel2_data["isOn"]:
  191. self._reset_power_data(channel2_data)
  192. _LOGGER.debug(
  193. "channel1_data: %s, channel2_data: %s", common_data, channel2_data
  194. )
  195. return {1: common_data, 2: channel2_data}
  196. @update_after_operation
  197. async def turn_on(self, channel: int) -> bool:
  198. """Turn device on."""
  199. result = await self._send_command(
  200. MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel]
  201. )
  202. return self._check_command_result(result, 0, {1})
  203. @update_after_operation
  204. async def turn_off(self, channel: int) -> bool:
  205. """Turn device off."""
  206. result = await self._send_command(
  207. MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel]
  208. )
  209. return self._check_command_result(result, 0, {1})
  210. @update_after_operation
  211. async def async_toggle(self, channel: int) -> bool:
  212. """Toggle device."""
  213. result = await self._send_command(
  214. MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel]
  215. )
  216. return self._check_command_result(result, 0, {1})
  217. def is_on(self, channel: int) -> bool | None:
  218. """Return switch state from cache."""
  219. return self._get_adv_value("isOn", channel)
  220. def switch_mode(self, channel: int) -> bool | None:
  221. """Return true or false from cache."""
  222. return self._get_adv_value("switchMode", channel)