relay_switch.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from ..const import SwitchbotModel
  6. from ..models import SwitchBotAdvertisement
  7. from .device import (
  8. SwitchbotEncryptedDevice,
  9. SwitchbotSequenceDevice,
  10. update_after_operation,
  11. )
  12. _LOGGER = logging.getLogger(__name__)
  13. COMMAND_HEADER = "57"
  14. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  15. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  16. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  17. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  18. COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
  19. COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
  20. COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
  21. MULTI_CHANNEL_COMMANDS_TURN_ON = {
  22. SwitchbotModel.RELAY_SWITCH_2PM: {
  23. 1: "570f70010d00",
  24. 2: "570f70010700",
  25. }
  26. }
  27. MULTI_CHANNEL_COMMANDS_TURN_OFF = {
  28. SwitchbotModel.RELAY_SWITCH_2PM: {
  29. 1: "570f70010c00",
  30. 2: "570f70010300",
  31. }
  32. }
  33. MULTI_CHANNEL_COMMANDS_TOGGLE = {
  34. SwitchbotModel.RELAY_SWITCH_2PM: {
  35. 1: "570f70010e00",
  36. 2: "570f70010b00",
  37. }
  38. }
  39. MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
  40. SwitchbotModel.RELAY_SWITCH_2PM: {
  41. 1: COMMAND_GET_CHANNEL1_INFO,
  42. 2: COMMAND_GET_CHANNEL2_INFO,
  43. }
  44. }
  45. class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  46. """Representation of a Switchbot relay switch 1pm."""
  47. def __init__(
  48. self,
  49. device: BLEDevice,
  50. key_id: str,
  51. encryption_key: str,
  52. interface: int = 0,
  53. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  54. **kwargs: Any,
  55. ) -> None:
  56. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  57. @classmethod
  58. async def verify_encryption_key(
  59. cls,
  60. device: BLEDevice,
  61. key_id: str,
  62. encryption_key: str,
  63. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  64. **kwargs: Any,
  65. ) -> bool:
  66. return await super().verify_encryption_key(
  67. device, key_id, encryption_key, model, **kwargs
  68. )
  69. def _reset_power_data(self, data: dict[str, Any]) -> None:
  70. """Reset power-related data to 0."""
  71. for key in ["power", "current", "voltage"]:
  72. data[key] = 0
  73. def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
  74. """Parse common data from raw bytes."""
  75. return {
  76. "sequence_number": raw_data[1],
  77. "isOn": bool(raw_data[2] & 0b10000000),
  78. "firmware": raw_data[16] / 10.0,
  79. "channel2_isOn": bool(raw_data[2] & 0b01000000),
  80. }
  81. def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
  82. """Parse user-specific data from raw bytes."""
  83. _energy = int.from_bytes(raw_data[1:4], "big") / 60000
  84. _energy_usage_yesterday = int.from_bytes(raw_data[4:7], "big") / 60000
  85. _use_time = int.from_bytes(raw_data[7:9], "big") / 60.0
  86. _voltage = int.from_bytes(raw_data[9:11], "big") / 10.0
  87. _current = int.from_bytes(raw_data[11:13], "big") / 1000.0
  88. _power = int.from_bytes(raw_data[13:15], "big") / 10.0
  89. return {
  90. "energy": 0.01 if 0 < _energy <= 0.01 else round(_energy, 2),
  91. "energy usage yesterday": 0.01
  92. if 0 < _energy_usage_yesterday <= 0.01
  93. else round(_energy_usage_yesterday, 2),
  94. "use_time": round(_use_time, 1),
  95. "voltage": 0.1 if 0 < _voltage <= 0.1 else round(_voltage),
  96. "current": 0.1 if 0 < _current <= 0.1 else round(_current, 1),
  97. "power": 0.1 if 0 < _power <= 0.1 else round(_power, 1),
  98. }
  99. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  100. """Update device data from advertisement."""
  101. adv_data = advertisement.data["data"]
  102. channel = self._channel if hasattr(self, "_channel") else None
  103. if self._model in (
  104. SwitchbotModel.RELAY_SWITCH_1PM,
  105. SwitchbotModel.RELAY_SWITCH_2PM,
  106. ):
  107. if channel is None:
  108. adv_data["voltage"] = self._get_adv_value("voltage") or 0
  109. adv_data["current"] = self._get_adv_value("current") or 0
  110. adv_data["power"] = self._get_adv_value("power") or 0
  111. adv_data["energy"] = self._get_adv_value("energy") or 0
  112. else:
  113. for i in range(1, channel + 1):
  114. adv_data[i] = adv_data.get(i, {})
  115. adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
  116. adv_data[i]["current"] = self._get_adv_value("current", i) or 0
  117. adv_data[i]["power"] = self._get_adv_value("power", i) or 0
  118. adv_data[i]["energy"] = self._get_adv_value("energy", i) or 0
  119. super().update_from_advertisement(advertisement)
  120. def get_current_time_and_start_time(self) -> int:
  121. """Get current time in seconds since epoch."""
  122. current_time = int(time.time())
  123. current_time_hex = f"{current_time:08x}"
  124. current_day_start_time = int(current_time / 86400) * 86400
  125. current_day_start_time_hex = f"{current_day_start_time:08x}"
  126. return current_time_hex, current_day_start_time_hex
  127. async def get_basic_info(self) -> dict[str, Any] | None:
  128. """Get device basic settings."""
  129. current_time_hex, current_day_start_time_hex = (
  130. self.get_current_time_and_start_time()
  131. )
  132. if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
  133. return None
  134. if not (
  135. _channel1_data := await self._get_basic_info(
  136. COMMAND_GET_CHANNEL1_INFO.format(
  137. current_time_hex, current_day_start_time_hex
  138. )
  139. )
  140. ):
  141. return None
  142. _LOGGER.debug(
  143. "on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex()
  144. )
  145. common_data = self._parse_common_data(_data)
  146. user_data = self._parse_user_data(_channel1_data)
  147. if self._model in (
  148. SwitchbotModel.RELAY_SWITCH_1,
  149. SwitchbotModel.GARAGE_DOOR_OPENER,
  150. ):
  151. for key in ["voltage", "current", "power", "energy"]:
  152. user_data.pop(key, None)
  153. if not common_data["isOn"]:
  154. self._reset_power_data(user_data)
  155. garage_door_opener_data = {"door_open": not bool(_data[2] & 0b00100000)}
  156. _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
  157. if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
  158. return common_data | garage_door_opener_data
  159. return common_data | user_data
  160. @update_after_operation
  161. async def turn_on(self) -> bool:
  162. """Turn device on."""
  163. result = await self._send_command(COMMAND_TURN_ON)
  164. return self._check_command_result(result, 0, {1})
  165. @update_after_operation
  166. async def turn_off(self) -> bool:
  167. """Turn device off."""
  168. result = await self._send_command(COMMAND_TURN_OFF)
  169. return self._check_command_result(result, 0, {1})
  170. @update_after_operation
  171. async def async_toggle(self, **kwargs) -> bool:
  172. """Toggle device."""
  173. result = await self._send_command(COMMAND_TOGGLE)
  174. return self._check_command_result(result, 0, {1})
  175. def is_on(self) -> bool | None:
  176. """Return switch state from cache."""
  177. return self._get_adv_value("isOn")
  178. class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
  179. """Representation of a Switchbot relay switch 2pm."""
  180. def __init__(
  181. self,
  182. device: BLEDevice,
  183. key_id: str,
  184. encryption_key: str,
  185. interface: int = 0,
  186. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM,
  187. **kwargs: Any,
  188. ) -> None:
  189. super().__init__(device, key_id, encryption_key, interface, model, **kwargs)
  190. self._channel = 2
  191. @property
  192. def channel(self) -> int:
  193. return self._channel
  194. def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
  195. """Return parsed device data, optionally for a specific channel."""
  196. data = self.data.get("data") or {}
  197. return data.get(channel, {})
  198. async def get_basic_info(self):
  199. current_time_hex, current_day_start_time_hex = (
  200. self.get_current_time_and_start_time()
  201. )
  202. if not (common_data := await super().get_basic_info()):
  203. return None
  204. if not (
  205. _channel2_data := await self._get_basic_info(
  206. COMMAND_GET_CHANNEL2_INFO.format(
  207. current_time_hex, current_day_start_time_hex
  208. )
  209. )
  210. ):
  211. return None
  212. _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
  213. channel2_data = self._parse_user_data(_channel2_data)
  214. channel2_data["isOn"] = common_data["channel2_isOn"]
  215. if not channel2_data["isOn"]:
  216. self._reset_power_data(channel2_data)
  217. _LOGGER.debug(
  218. "channel1_data: %s, channel2_data: %s", common_data, channel2_data
  219. )
  220. return {1: common_data, 2: channel2_data}
  221. @update_after_operation
  222. async def turn_on(self, channel: int) -> bool:
  223. """Turn device on."""
  224. result = await self._send_command(
  225. MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel]
  226. )
  227. return self._check_command_result(result, 0, {1})
  228. @update_after_operation
  229. async def turn_off(self, channel: int) -> bool:
  230. """Turn device off."""
  231. result = await self._send_command(
  232. MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel]
  233. )
  234. return self._check_command_result(result, 0, {1})
  235. @update_after_operation
  236. async def async_toggle(self, channel: int) -> bool:
  237. """Toggle device."""
  238. result = await self._send_command(
  239. MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel]
  240. )
  241. return self._check_command_result(result, 0, {1})
  242. def is_on(self, channel: int) -> bool | None:
  243. """Return switch state from cache."""
  244. return self._get_adv_value("isOn", channel)
  245. def switch_mode(self, channel: int) -> bool | None:
  246. """Return true or false from cache."""
  247. return self._get_adv_value("switchMode", channel)