1
0

curtain.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. from dataclasses import replace
  4. import logging
  5. from typing import Any
  6. from switchbot.models import SwitchBotAdvertisement
  7. from .device import REQ_HEADER, SwitchbotDevice, update_after_operation
  8. # Curtain keys
  9. CURTAIN_COMMAND = "4501"
  10. # For second element of open and close arrs we should add two bytes i.e. ff00
  11. # First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
  12. # * Only for curtains 3. For other models use ff
  13. # Second byte [00] is a command (00 - open, 64 - close)
  14. OPEN_KEYS = [
  15. f"{REQ_HEADER}{CURTAIN_COMMAND}010100",
  16. f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "00"
  17. ]
  18. CLOSE_KEYS = [
  19. f"{REQ_HEADER}{CURTAIN_COMMAND}010164",
  20. f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "64"
  21. ]
  22. POSITION_KEYS = [
  23. f"{REQ_HEADER}{CURTAIN_COMMAND}0101",
  24. f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed
  25. ] # +actual_position
  26. STOP_KEYS = [f"{REQ_HEADER}{CURTAIN_COMMAND}0001", f"{REQ_HEADER}{CURTAIN_COMMAND}00ff"]
  27. CURTAIN_EXT_SUM_KEY = f"{REQ_HEADER}460401"
  28. CURTAIN_EXT_ADV_KEY = f"{REQ_HEADER}460402"
  29. CURTAIN_EXT_CHAIN_INFO_KEY = f"{REQ_HEADER}468101"
  30. _LOGGER = logging.getLogger(__name__)
  31. class SwitchbotCurtain(SwitchbotDevice):
  32. """Representation of a Switchbot Curtain."""
  33. def __init__(self, *args: Any, **kwargs: Any) -> None:
  34. """Switchbot Curtain/WoCurtain constructor."""
  35. # The position of the curtain is saved returned with 0 = open and 100 = closed.
  36. # This is independent of the calibration of the curtain bot (Open left to right/
  37. # Open right to left/Open from the middle).
  38. # The parameter 'reverse_mode' reverse these values,
  39. # if 'reverse_mode' = True, position = 0 equals close
  40. # and position = 100 equals open. The parameter is default set to True so that
  41. # the definition of position is the same as in Home Assistant.
  42. super().__init__(*args, **kwargs)
  43. self._reverse: bool = kwargs.pop("reverse_mode", True)
  44. self._settings: dict[str, Any] = {}
  45. self.ext_info_sum: dict[str, Any] = {}
  46. self.ext_info_adv: dict[str, Any] = {}
  47. self._is_opening: bool = False
  48. self._is_closing: bool = False
  49. def _set_parsed_data(
  50. self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
  51. ) -> None:
  52. """Set data."""
  53. _new_data = replace(
  54. advertisement, data=self._sb_adv_data.data | {"data": data}
  55. )
  56. self._update_motion_direction(_new_data["inMotion"], self._get_adv_value("position"), _new_data["position"])
  57. if _new_data["inMotion"] == True:
  58. pass
  59. else:
  60. self._is_closing = self._is_opening = False
  61. super()._set_parsed_data(advertisement, data)
  62. async def _send_multiple_commands(self, keys: list[str]) -> bool:
  63. """Send multiple commands to device.
  64. Since we current have no way to tell which command the device
  65. needs we send both.
  66. """
  67. final_result = False
  68. for key in keys:
  69. result = await self._send_command(key)
  70. final_result |= self._check_command_result(result, 0, {1})
  71. return final_result
  72. @update_after_operation
  73. async def open(self, speed: int = 255) -> bool:
  74. """Send open command. Speed 255 - normal, 1 - slow"""
  75. self._is_opening = True
  76. self._is_closing = False
  77. return await self._send_multiple_commands(
  78. [OPEN_KEYS[0], f"{OPEN_KEYS[1]}{speed:02X}00"]
  79. )
  80. @update_after_operation
  81. async def close(self, speed: int = 255) -> bool:
  82. """Send close command. Speed 255 - normal, 1 - slow"""
  83. self._is_closing = True
  84. self._is_opening = False
  85. return await self._send_multiple_commands(
  86. [CLOSE_KEYS[0], f"{CLOSE_KEYS[1]}{speed:02X}64"]
  87. )
  88. @update_after_operation
  89. async def stop(self) -> bool:
  90. """Send stop command to device."""
  91. self._is_opening = self._is_closing = False
  92. return await self._send_multiple_commands(STOP_KEYS)
  93. @update_after_operation
  94. async def set_position(self, position: int, speed: int = 255) -> bool:
  95. """Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
  96. position = (100 - position) if self._reverse else position
  97. self._update_motion_direction(True, self._get_adv_value("position"), position)
  98. return await self._send_multiple_commands(
  99. [
  100. f"{POSITION_KEYS[0]}{position:02X}",
  101. f"{POSITION_KEYS[1]}{speed:02X}{position:02X}",
  102. ]
  103. )
  104. def get_position(self) -> Any:
  105. """Return cached position (0-100) of Curtain."""
  106. # To get actual position call update() first.
  107. return self._get_adv_value("position")
  108. async def get_basic_info(self) -> dict[str, Any] | None:
  109. """Get device basic settings."""
  110. if not (_data := await self._get_basic_info()):
  111. return None
  112. _position = max(min(_data[6], 100), 0)
  113. _direction_adjusted_position = (100 - _position) if self._reverse else _position
  114. _previous_position = self._get_adv_value("position")
  115. _in_motion = bool(_data[5] & 0b01000011)
  116. self._update_motion_direction(_in_motion, _previous_position, _direction_adjusted_position)
  117. return {
  118. "battery": _data[1],
  119. "firmware": _data[2] / 10.0,
  120. "chainLength": _data[3],
  121. "openDirection": (
  122. "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
  123. ),
  124. "touchToOpen": bool(_data[4] & 0b01000000),
  125. "light": bool(_data[4] & 0b00100000),
  126. "fault": bool(_data[4] & 0b00001000),
  127. "solarPanel": bool(_data[5] & 0b00001000),
  128. "calibration": bool(_data[5] & 0b00000100),
  129. "calibrated": bool(_data[5] & 0b00000100),
  130. "inMotion": _in_motion,
  131. "position": _direction_adjusted_position,
  132. "timers": _data[7],
  133. }
  134. def _update_motion_direction(self, in_motion: bool, previous_position: int, new_position: int) -> None:
  135. """Update opening/closing status based on movement."""
  136. if in_motion == True:
  137. if new_position != previous_position:
  138. self._is_opening = new_position > previous_position
  139. self._is_closing = new_position < previous_position
  140. else:
  141. self._is_closing = self._is_opening = False
  142. async def get_extended_info_summary(self) -> dict[str, Any] | None:
  143. """Get basic info for all devices in chain."""
  144. _data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)
  145. if not _data:
  146. _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
  147. return None
  148. if _data in (b"\x07", b"\x00"):
  149. _LOGGER.error("%s: Unsuccessful, please try again", self.name)
  150. return None
  151. self.ext_info_sum["device0"] = {
  152. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  153. "touchToOpen": bool(_data[1] & 0b01000000),
  154. "light": bool(_data[1] & 0b00100000),
  155. "openDirection": (
  156. "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
  157. ),
  158. }
  159. # if grouped curtain device present.
  160. if _data[2] != 0:
  161. self.ext_info_sum["device1"] = {
  162. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  163. "touchToOpen": bool(_data[1] & 0b01000000),
  164. "light": bool(_data[1] & 0b00100000),
  165. "openDirection": (
  166. "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
  167. ),
  168. }
  169. return self.ext_info_sum
  170. async def get_extended_info_adv(self) -> dict[str, Any] | None:
  171. """Get advance page info for device chain."""
  172. _data = await self._send_command(key=CURTAIN_EXT_ADV_KEY)
  173. if not _data:
  174. _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
  175. return None
  176. if _data in (b"\x07", b"\x00"):
  177. _LOGGER.error("%s: Unsuccessful, please try again", self.name)
  178. return None
  179. _state_of_charge = [
  180. "not_charging",
  181. "charging_by_adapter",
  182. "charging_by_solar",
  183. "fully_charged",
  184. "solar_not_charging",
  185. "charging_error",
  186. ]
  187. self.ext_info_adv["device0"] = {
  188. "battery": _data[1],
  189. "firmware": _data[2] / 10.0,
  190. "stateOfCharge": _state_of_charge[_data[3]],
  191. }
  192. # If grouped curtain device present.
  193. if _data[4]:
  194. self.ext_info_adv["device1"] = {
  195. "battery": _data[4],
  196. "firmware": _data[5] / 10.0,
  197. "stateOfCharge": _state_of_charge[_data[6]],
  198. }
  199. return self.ext_info_adv
  200. def get_light_level(self) -> Any:
  201. """Return cached light level."""
  202. # To get actual light level call update() first.
  203. return self._get_adv_value("lightLevel")
  204. def is_reversed(self) -> bool:
  205. """Return True if curtain position is opposite from SB data."""
  206. return self._reverse
  207. def is_calibrated(self) -> Any:
  208. """Return True curtain is calibrated."""
  209. # To get actual light level call update() first.
  210. return self._get_adv_value("calibration")
  211. def is_opening(self) -> Any:
  212. """Return True if the curtain is opening."""
  213. return self._is_opening
  214. def is_closing(self) -> Any:
  215. """Return True if the curtain is closing."""
  216. return self._is_closing