relay_switch.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from ..const import SwitchbotModel
  6. from ..helpers import parse_power_data, parse_uint24_be
  7. from ..models import SwitchBotAdvertisement
  8. from .device import (
  9. SwitchbotEncryptedDevice,
  10. SwitchbotSequenceDevice,
  11. update_after_operation,
  12. )
  13. _LOGGER = logging.getLogger(__name__)
  14. # Bit masks for status parsing
  15. SWITCH1_ON_MASK = 0b10000000
  16. SWITCH2_ON_MASK = 0b01000000
  17. DOOR_OPEN_MASK = 0b00100000
  18. COMMAND_HEADER = "57"
  19. COMMAND_CONTROL = "570f70"
  20. COMMAND_TOGGLE = f"{COMMAND_CONTROL}010200"
  21. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  22. COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
  23. COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
  24. COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
  25. MULTI_CHANNEL_COMMANDS_TURN_ON = {
  26. SwitchbotModel.RELAY_SWITCH_2PM: {
  27. 1: "570f70010d00",
  28. 2: "570f70010700",
  29. }
  30. }
  31. MULTI_CHANNEL_COMMANDS_TURN_OFF = {
  32. SwitchbotModel.RELAY_SWITCH_2PM: {
  33. 1: "570f70010c00",
  34. 2: "570f70010300",
  35. }
  36. }
  37. MULTI_CHANNEL_COMMANDS_TOGGLE = {
  38. SwitchbotModel.RELAY_SWITCH_2PM: {
  39. 1: "570f70010e00",
  40. 2: "570f70010b00",
  41. }
  42. }
  43. MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
  44. SwitchbotModel.RELAY_SWITCH_2PM: {
  45. 1: COMMAND_GET_CHANNEL1_INFO,
  46. 2: COMMAND_GET_CHANNEL2_INFO,
  47. }
  48. }
  49. class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  50. """Representation of a Switchbot relay switch 1pm."""
  51. _turn_on_command = f"{COMMAND_CONTROL}010100"
  52. _turn_off_command = f"{COMMAND_CONTROL}010000"
  53. _press_command = f"{COMMAND_CONTROL}110329" # for garage door opener toggle
  54. def __init__(
  55. self,
  56. device: BLEDevice,
  57. key_id: str,
  58. encryption_key: str,
  59. interface: int = 0,
  60. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  61. **kwargs: Any,
  62. ) -> None:
  63. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  64. @classmethod
  65. async def verify_encryption_key(
  66. cls,
  67. device: BLEDevice,
  68. key_id: str,
  69. encryption_key: str,
  70. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  71. **kwargs: Any,
  72. ) -> bool:
  73. return await super().verify_encryption_key(
  74. device, key_id, encryption_key, model, **kwargs
  75. )
  76. def _reset_power_data(self, data: dict[str, Any]) -> None:
  77. """Reset power-related data to 0."""
  78. for key in ["power", "current", "voltage"]:
  79. data[key] = 0
  80. def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
  81. """Parse common data from raw bytes."""
  82. return {
  83. "sequence_number": raw_data[1],
  84. "isOn": bool(raw_data[2] & SWITCH1_ON_MASK),
  85. "firmware": raw_data[16] / 10.0,
  86. "channel2_isOn": bool(raw_data[2] & SWITCH2_ON_MASK),
  87. }
  88. def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
  89. """Parse user-specific data from raw bytes."""
  90. _energy = parse_uint24_be(raw_data, 1) / 60000
  91. _energy_usage_yesterday = parse_uint24_be(raw_data, 4) / 60000
  92. _use_time = parse_power_data(raw_data, 7, 60.0)
  93. _voltage = parse_power_data(raw_data, 9, 10.0)
  94. _current = parse_power_data(raw_data, 11, 1000.0)
  95. _power = parse_power_data(raw_data, 13, 10.0)
  96. return {
  97. "energy": 0.01 if 0 < _energy <= 0.01 else round(_energy, 2),
  98. "energy usage yesterday": 0.01
  99. if 0 < _energy_usage_yesterday <= 0.01
  100. else round(_energy_usage_yesterday, 2),
  101. "use_time": round(_use_time, 1),
  102. "voltage": 0.1 if 0 < _voltage <= 0.1 else round(_voltage),
  103. "current": 0.1 if 0 < _current <= 0.1 else round(_current, 1),
  104. "power": 0.1 if 0 < _power <= 0.1 else round(_power, 1),
  105. }
  106. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  107. """Update device data from advertisement."""
  108. adv_data = advertisement.data["data"]
  109. channel = self._channel if hasattr(self, "_channel") else None
  110. if self._model in (
  111. SwitchbotModel.RELAY_SWITCH_1PM,
  112. SwitchbotModel.RELAY_SWITCH_2PM,
  113. ):
  114. if channel is None:
  115. adv_data["voltage"] = self._get_adv_value("voltage") or 0
  116. adv_data["current"] = self._get_adv_value("current") or 0
  117. adv_data["power"] = self._get_adv_value("power") or 0
  118. adv_data["energy"] = self._get_adv_value("energy") or 0
  119. else:
  120. for i in range(1, channel + 1):
  121. adv_data[i] = adv_data.get(i, {})
  122. adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
  123. adv_data[i]["current"] = self._get_adv_value("current", i) or 0
  124. adv_data[i]["power"] = self._get_adv_value("power", i) or 0
  125. adv_data[i]["energy"] = self._get_adv_value("energy", i) or 0
  126. super().update_from_advertisement(advertisement)
  127. def get_current_time_and_start_time(self) -> int:
  128. """Get current time in seconds since epoch."""
  129. current_time = int(time.time())
  130. current_time_hex = f"{current_time:08x}"
  131. current_day_start_time = int(current_time / 86400) * 86400
  132. current_day_start_time_hex = f"{current_day_start_time:08x}"
  133. return current_time_hex, current_day_start_time_hex
  134. async def get_basic_info(self) -> dict[str, Any] | None:
  135. """Get device basic settings."""
  136. current_time_hex, current_day_start_time_hex = (
  137. self.get_current_time_and_start_time()
  138. )
  139. if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
  140. return None
  141. if not (
  142. _channel1_data := await self._get_basic_info(
  143. COMMAND_GET_CHANNEL1_INFO.format(
  144. current_time_hex, current_day_start_time_hex
  145. )
  146. )
  147. ):
  148. return None
  149. _LOGGER.debug(
  150. "on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex()
  151. )
  152. common_data = self._parse_common_data(_data)
  153. user_data = self._parse_user_data(_channel1_data)
  154. if self._model in (
  155. SwitchbotModel.RELAY_SWITCH_1,
  156. SwitchbotModel.GARAGE_DOOR_OPENER,
  157. ):
  158. for key in ["voltage", "current", "power", "energy"]:
  159. user_data.pop(key, None)
  160. if not common_data["isOn"]:
  161. self._reset_power_data(user_data)
  162. garage_door_opener_data = {"door_open": not bool(_data[2] & DOOR_OPEN_MASK)}
  163. _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
  164. if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
  165. return common_data | garage_door_opener_data
  166. return common_data | user_data
  167. @update_after_operation
  168. async def async_toggle(self, **kwargs) -> bool:
  169. """Toggle device."""
  170. result = await self._send_command(COMMAND_TOGGLE)
  171. return self._check_command_result(result, 0, {1})
  172. def is_on(self) -> bool | None:
  173. """Return switch state from cache."""
  174. return self._get_adv_value("isOn")
  175. class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
  176. """Representation of a Switchbot relay switch 2pm."""
  177. def __init__(
  178. self,
  179. device: BLEDevice,
  180. key_id: str,
  181. encryption_key: str,
  182. interface: int = 0,
  183. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM,
  184. **kwargs: Any,
  185. ) -> None:
  186. super().__init__(device, key_id, encryption_key, interface, model, **kwargs)
  187. self._channel = 2
  188. @property
  189. def channel(self) -> int:
  190. return self._channel
  191. def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
  192. """Return parsed device data, optionally for a specific channel."""
  193. data = self.data.get("data") or {}
  194. return data.get(channel, {})
  195. async def get_basic_info(self):
  196. current_time_hex, current_day_start_time_hex = (
  197. self.get_current_time_and_start_time()
  198. )
  199. if not (common_data := await super().get_basic_info()):
  200. return None
  201. if not (
  202. _channel2_data := await self._get_basic_info(
  203. COMMAND_GET_CHANNEL2_INFO.format(
  204. current_time_hex, current_day_start_time_hex
  205. )
  206. )
  207. ):
  208. return None
  209. _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
  210. channel2_data = self._parse_user_data(_channel2_data)
  211. channel2_data["isOn"] = common_data["channel2_isOn"]
  212. if not channel2_data["isOn"]:
  213. self._reset_power_data(channel2_data)
  214. _LOGGER.debug(
  215. "channel1_data: %s, channel2_data: %s", common_data, channel2_data
  216. )
  217. return {1: common_data, 2: channel2_data}
  218. @update_after_operation
  219. async def turn_on(self, channel: int) -> bool:
  220. """Turn device on."""
  221. result = await self._send_command(
  222. MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel]
  223. )
  224. return self._check_command_result(result, 0, {1})
  225. @update_after_operation
  226. async def turn_off(self, channel: int) -> bool:
  227. """Turn device off."""
  228. result = await self._send_command(
  229. MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel]
  230. )
  231. return self._check_command_result(result, 0, {1})
  232. @update_after_operation
  233. async def async_toggle(self, channel: int) -> bool:
  234. """Toggle device."""
  235. result = await self._send_command(
  236. MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel]
  237. )
  238. return self._check_command_result(result, 0, {1})
  239. def is_on(self, channel: int) -> bool | None:
  240. """Return switch state from cache."""
  241. return self._get_adv_value("isOn", channel)
  242. def switch_mode(self, channel: int) -> bool | None:
  243. """Return true or false from cache."""
  244. return self._get_adv_value("switchMode", channel)