relay_switch.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. import logging
  2. import time
  3. from typing import Any
  4. from bleak.backends.device import BLEDevice
  5. from ..const import SwitchbotModel
  6. from ..helpers import parse_power_data, parse_uint24_be
  7. from ..models import SwitchBotAdvertisement
  8. from .device import (
  9. SwitchbotEncryptedDevice,
  10. SwitchbotSequenceDevice,
  11. update_after_operation,
  12. )
  13. _LOGGER = logging.getLogger(__name__)
  14. # Bit masks for status parsing
  15. SWITCH1_ON_MASK = 0b10000000
  16. SWITCH2_ON_MASK = 0b01000000
  17. DOOR_OPEN_MASK = 0b00100000
  18. COMMAND_HEADER = "57"
  19. COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
  20. COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
  21. COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
  22. COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
  23. COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
  24. COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
  25. COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
  26. MULTI_CHANNEL_COMMANDS_TURN_ON = {
  27. SwitchbotModel.RELAY_SWITCH_2PM: {
  28. 1: "570f70010d00",
  29. 2: "570f70010700",
  30. }
  31. }
  32. MULTI_CHANNEL_COMMANDS_TURN_OFF = {
  33. SwitchbotModel.RELAY_SWITCH_2PM: {
  34. 1: "570f70010c00",
  35. 2: "570f70010300",
  36. }
  37. }
  38. MULTI_CHANNEL_COMMANDS_TOGGLE = {
  39. SwitchbotModel.RELAY_SWITCH_2PM: {
  40. 1: "570f70010e00",
  41. 2: "570f70010b00",
  42. }
  43. }
  44. MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
  45. SwitchbotModel.RELAY_SWITCH_2PM: {
  46. 1: COMMAND_GET_CHANNEL1_INFO,
  47. 2: COMMAND_GET_CHANNEL2_INFO,
  48. }
  49. }
  50. class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  51. """Representation of a Switchbot relay switch 1pm."""
  52. def __init__(
  53. self,
  54. device: BLEDevice,
  55. key_id: str,
  56. encryption_key: str,
  57. interface: int = 0,
  58. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  59. **kwargs: Any,
  60. ) -> None:
  61. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  62. @classmethod
  63. async def verify_encryption_key(
  64. cls,
  65. device: BLEDevice,
  66. key_id: str,
  67. encryption_key: str,
  68. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
  69. **kwargs: Any,
  70. ) -> bool:
  71. return await super().verify_encryption_key(
  72. device, key_id, encryption_key, model, **kwargs
  73. )
  74. def _reset_power_data(self, data: dict[str, Any]) -> None:
  75. """Reset power-related data to 0."""
  76. for key in ["power", "current", "voltage"]:
  77. data[key] = 0
  78. def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
  79. """Parse common data from raw bytes."""
  80. return {
  81. "sequence_number": raw_data[1],
  82. "isOn": bool(raw_data[2] & SWITCH1_ON_MASK),
  83. "firmware": raw_data[16] / 10.0,
  84. "channel2_isOn": bool(raw_data[2] & SWITCH2_ON_MASK),
  85. }
  86. def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
  87. """Parse user-specific data from raw bytes."""
  88. _energy = parse_uint24_be(raw_data, 1) / 60000
  89. _energy_usage_yesterday = parse_uint24_be(raw_data, 4) / 60000
  90. _use_time = parse_power_data(raw_data, 7, 60.0)
  91. _voltage = parse_power_data(raw_data, 9, 10.0)
  92. _current = parse_power_data(raw_data, 11, 1000.0)
  93. _power = parse_power_data(raw_data, 13, 10.0)
  94. return {
  95. "energy": 0.01 if 0 < _energy <= 0.01 else round(_energy, 2),
  96. "energy usage yesterday": 0.01
  97. if 0 < _energy_usage_yesterday <= 0.01
  98. else round(_energy_usage_yesterday, 2),
  99. "use_time": round(_use_time, 1),
  100. "voltage": 0.1 if 0 < _voltage <= 0.1 else round(_voltage),
  101. "current": 0.1 if 0 < _current <= 0.1 else round(_current, 1),
  102. "power": 0.1 if 0 < _power <= 0.1 else round(_power, 1),
  103. }
  104. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  105. """Update device data from advertisement."""
  106. adv_data = advertisement.data["data"]
  107. channel = self._channel if hasattr(self, "_channel") else None
  108. if self._model in (
  109. SwitchbotModel.RELAY_SWITCH_1PM,
  110. SwitchbotModel.RELAY_SWITCH_2PM,
  111. ):
  112. if channel is None:
  113. adv_data["voltage"] = self._get_adv_value("voltage") or 0
  114. adv_data["current"] = self._get_adv_value("current") or 0
  115. adv_data["power"] = self._get_adv_value("power") or 0
  116. adv_data["energy"] = self._get_adv_value("energy") or 0
  117. else:
  118. for i in range(1, channel + 1):
  119. adv_data[i] = adv_data.get(i, {})
  120. adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
  121. adv_data[i]["current"] = self._get_adv_value("current", i) or 0
  122. adv_data[i]["power"] = self._get_adv_value("power", i) or 0
  123. adv_data[i]["energy"] = self._get_adv_value("energy", i) or 0
  124. super().update_from_advertisement(advertisement)
  125. def get_current_time_and_start_time(self) -> int:
  126. """Get current time in seconds since epoch."""
  127. current_time = int(time.time())
  128. current_time_hex = f"{current_time:08x}"
  129. current_day_start_time = int(current_time / 86400) * 86400
  130. current_day_start_time_hex = f"{current_day_start_time:08x}"
  131. return current_time_hex, current_day_start_time_hex
  132. async def get_basic_info(self) -> dict[str, Any] | None:
  133. """Get device basic settings."""
  134. current_time_hex, current_day_start_time_hex = (
  135. self.get_current_time_and_start_time()
  136. )
  137. if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
  138. return None
  139. if not (
  140. _channel1_data := await self._get_basic_info(
  141. COMMAND_GET_CHANNEL1_INFO.format(
  142. current_time_hex, current_day_start_time_hex
  143. )
  144. )
  145. ):
  146. return None
  147. _LOGGER.debug(
  148. "on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex()
  149. )
  150. common_data = self._parse_common_data(_data)
  151. user_data = self._parse_user_data(_channel1_data)
  152. if self._model in (
  153. SwitchbotModel.RELAY_SWITCH_1,
  154. SwitchbotModel.GARAGE_DOOR_OPENER,
  155. ):
  156. for key in ["voltage", "current", "power", "energy"]:
  157. user_data.pop(key, None)
  158. if not common_data["isOn"]:
  159. self._reset_power_data(user_data)
  160. garage_door_opener_data = {"door_open": not bool(_data[2] & DOOR_OPEN_MASK)}
  161. _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
  162. if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
  163. return common_data | garage_door_opener_data
  164. return common_data | user_data
  165. @update_after_operation
  166. async def turn_on(self) -> bool:
  167. """Turn device on."""
  168. result = await self._send_command(COMMAND_TURN_ON)
  169. return self._check_command_result(result, 0, {1})
  170. @update_after_operation
  171. async def turn_off(self) -> bool:
  172. """Turn device off."""
  173. result = await self._send_command(COMMAND_TURN_OFF)
  174. return self._check_command_result(result, 0, {1})
  175. @update_after_operation
  176. async def async_toggle(self, **kwargs) -> bool:
  177. """Toggle device."""
  178. result = await self._send_command(COMMAND_TOGGLE)
  179. return self._check_command_result(result, 0, {1})
  180. def is_on(self) -> bool | None:
  181. """Return switch state from cache."""
  182. return self._get_adv_value("isOn")
  183. class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
  184. """Representation of a Switchbot relay switch 2pm."""
  185. def __init__(
  186. self,
  187. device: BLEDevice,
  188. key_id: str,
  189. encryption_key: str,
  190. interface: int = 0,
  191. model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM,
  192. **kwargs: Any,
  193. ) -> None:
  194. super().__init__(device, key_id, encryption_key, interface, model, **kwargs)
  195. self._channel = 2
  196. @property
  197. def channel(self) -> int:
  198. return self._channel
  199. def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
  200. """Return parsed device data, optionally for a specific channel."""
  201. data = self.data.get("data") or {}
  202. return data.get(channel, {})
  203. async def get_basic_info(self):
  204. current_time_hex, current_day_start_time_hex = (
  205. self.get_current_time_and_start_time()
  206. )
  207. if not (common_data := await super().get_basic_info()):
  208. return None
  209. if not (
  210. _channel2_data := await self._get_basic_info(
  211. COMMAND_GET_CHANNEL2_INFO.format(
  212. current_time_hex, current_day_start_time_hex
  213. )
  214. )
  215. ):
  216. return None
  217. _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
  218. channel2_data = self._parse_user_data(_channel2_data)
  219. channel2_data["isOn"] = common_data["channel2_isOn"]
  220. if not channel2_data["isOn"]:
  221. self._reset_power_data(channel2_data)
  222. _LOGGER.debug(
  223. "channel1_data: %s, channel2_data: %s", common_data, channel2_data
  224. )
  225. return {1: common_data, 2: channel2_data}
  226. @update_after_operation
  227. async def turn_on(self, channel: int) -> bool:
  228. """Turn device on."""
  229. result = await self._send_command(
  230. MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel]
  231. )
  232. return self._check_command_result(result, 0, {1})
  233. @update_after_operation
  234. async def turn_off(self, channel: int) -> bool:
  235. """Turn device off."""
  236. result = await self._send_command(
  237. MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel]
  238. )
  239. return self._check_command_result(result, 0, {1})
  240. @update_after_operation
  241. async def async_toggle(self, channel: int) -> bool:
  242. """Toggle device."""
  243. result = await self._send_command(
  244. MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel]
  245. )
  246. return self._check_command_result(result, 0, {1})
  247. def is_on(self, channel: int) -> bool | None:
  248. """Return switch state from cache."""
  249. return self._get_adv_value("isOn", channel)
  250. def switch_mode(self, channel: int) -> bool | None:
  251. """Return true or false from cache."""
  252. return self._get_adv_value("switchMode", channel)