curtain.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from typing import Any
  5. from switchbot.models import SwitchBotAdvertisement
  6. from .device import REQ_HEADER, SwitchbotDevice, update_after_operation
  7. # Curtain keys
  8. CURTAIN_COMMAND = "4501"
  9. # For second element of open and close arrs we should add two bytes i.e. ff00
  10. # First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
  11. # * Only for curtains 3. For other models use ff
  12. # Second byte [00] is a command (00 - open, 64 - close)
  13. OPEN_KEYS = [
  14. f"{REQ_HEADER}{CURTAIN_COMMAND}010100",
  15. f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "00"
  16. ]
  17. CLOSE_KEYS = [
  18. f"{REQ_HEADER}{CURTAIN_COMMAND}010164",
  19. f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "64"
  20. ]
  21. POSITION_KEYS = [
  22. f"{REQ_HEADER}{CURTAIN_COMMAND}0101",
  23. f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed
  24. ] # +actual_position
  25. STOP_KEYS = [f"{REQ_HEADER}{CURTAIN_COMMAND}0001", f"{REQ_HEADER}{CURTAIN_COMMAND}00ff"]
  26. CURTAIN_EXT_SUM_KEY = f"{REQ_HEADER}460401"
  27. CURTAIN_EXT_ADV_KEY = f"{REQ_HEADER}460402"
  28. CURTAIN_EXT_CHAIN_INFO_KEY = f"{REQ_HEADER}468101"
  29. _LOGGER = logging.getLogger(__name__)
  30. class SwitchbotCurtain(SwitchbotDevice):
  31. """Representation of a Switchbot Curtain."""
  32. def __init__(self, *args: Any, **kwargs: Any) -> None:
  33. """Switchbot Curtain/WoCurtain constructor."""
  34. # The position of the curtain is saved returned with 0 = open and 100 = closed.
  35. # This is independent of the calibration of the curtain bot (Open left to right/
  36. # Open right to left/Open from the middle).
  37. # The parameter 'reverse_mode' reverse these values,
  38. # if 'reverse_mode' = True, position = 0 equals close
  39. # and position = 100 equals open. The parameter is default set to True so that
  40. # the definition of position is the same as in Home Assistant.
  41. super().__init__(*args, **kwargs)
  42. self._reverse: bool = kwargs.pop("reverse_mode", True)
  43. self._settings: dict[str, Any] = {}
  44. self.ext_info_sum: dict[str, Any] = {}
  45. self.ext_info_adv: dict[str, Any] = {}
  46. self._is_opening: bool = False
  47. self._is_closing: bool = False
  48. def _set_parsed_data(
  49. self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
  50. ) -> None:
  51. """Set data."""
  52. in_motion = data["inMotion"]
  53. previous_position = self._get_adv_value("position")
  54. new_position = data["position"]
  55. self._update_motion_direction(in_motion, previous_position, new_position)
  56. super()._set_parsed_data(advertisement, data)
  57. async def _send_multiple_commands(self, keys: list[str]) -> bool:
  58. """Send multiple commands to device.
  59. Since we current have no way to tell which command the device
  60. needs we send both.
  61. """
  62. final_result = False
  63. for key in keys:
  64. result = await self._send_command(key)
  65. final_result |= self._check_command_result(result, 0, {1})
  66. return final_result
  67. @update_after_operation
  68. async def open(self, speed: int = 255) -> bool:
  69. """Send open command. Speed 255 - normal, 1 - slow"""
  70. self._is_opening = True
  71. self._is_closing = False
  72. return await self._send_multiple_commands(
  73. [OPEN_KEYS[0], f"{OPEN_KEYS[1]}{speed:02X}00"]
  74. )
  75. @update_after_operation
  76. async def close(self, speed: int = 255) -> bool:
  77. """Send close command. Speed 255 - normal, 1 - slow"""
  78. self._is_closing = True
  79. self._is_opening = False
  80. return await self._send_multiple_commands(
  81. [CLOSE_KEYS[0], f"{CLOSE_KEYS[1]}{speed:02X}64"]
  82. )
  83. @update_after_operation
  84. async def stop(self) -> bool:
  85. """Send stop command to device."""
  86. self._is_opening = self._is_closing = False
  87. return await self._send_multiple_commands(STOP_KEYS)
  88. @update_after_operation
  89. async def set_position(self, position: int, speed: int = 255) -> bool:
  90. """Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
  91. position = (100 - position) if self._reverse else position
  92. self._update_motion_direction(True, self._get_adv_value("position"), position)
  93. return await self._send_multiple_commands(
  94. [
  95. f"{POSITION_KEYS[0]}{position:02X}",
  96. f"{POSITION_KEYS[1]}{speed:02X}{position:02X}",
  97. ]
  98. )
  99. def get_position(self) -> Any:
  100. """Return cached position (0-100) of Curtain."""
  101. # To get actual position call update() first.
  102. return self._get_adv_value("position")
  103. async def get_basic_info(self) -> dict[str, Any] | None:
  104. """Get device basic settings."""
  105. if not (_data := await self._get_basic_info()):
  106. return None
  107. _position = max(min(_data[6], 100), 0)
  108. _direction_adjusted_position = (100 - _position) if self._reverse else _position
  109. _previous_position = self._get_adv_value("position")
  110. _in_motion = bool(_data[5] & 0b01000011)
  111. self._update_motion_direction(
  112. _in_motion, _previous_position, _direction_adjusted_position
  113. )
  114. return {
  115. "battery": _data[1],
  116. "firmware": _data[2] / 10.0,
  117. "chainLength": _data[3],
  118. "openDirection": (
  119. "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
  120. ),
  121. "touchToOpen": bool(_data[4] & 0b01000000),
  122. "light": bool(_data[4] & 0b00100000),
  123. "fault": bool(_data[4] & 0b00001000),
  124. "solarPanel": bool(_data[5] & 0b00001000),
  125. "calibration": bool(_data[5] & 0b00000100),
  126. "calibrated": bool(_data[5] & 0b00000100),
  127. "inMotion": _in_motion,
  128. "position": _direction_adjusted_position,
  129. "timers": _data[7],
  130. }
  131. def _update_motion_direction(
  132. self, in_motion: bool, previous_position: int | None, new_position: int
  133. ) -> None:
  134. """Update opening/closing status based on movement."""
  135. if previous_position is None:
  136. return
  137. if in_motion is False:
  138. self._is_closing = self._is_opening = False
  139. return
  140. if new_position != previous_position:
  141. self._is_opening = new_position > previous_position
  142. self._is_closing = new_position < previous_position
  143. async def get_extended_info_summary(self) -> dict[str, Any] | None:
  144. """Get basic info for all devices in chain."""
  145. _data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)
  146. if not _data:
  147. _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
  148. return None
  149. if _data in (b"\x07", b"\x00"):
  150. _LOGGER.error("%s: Unsuccessful, please try again", self.name)
  151. return None
  152. self.ext_info_sum["device0"] = {
  153. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  154. "touchToOpen": bool(_data[1] & 0b01000000),
  155. "light": bool(_data[1] & 0b00100000),
  156. "openDirection": (
  157. "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
  158. ),
  159. }
  160. # if grouped curtain device present.
  161. if _data[2] != 0:
  162. self.ext_info_sum["device1"] = {
  163. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  164. "touchToOpen": bool(_data[1] & 0b01000000),
  165. "light": bool(_data[1] & 0b00100000),
  166. "openDirection": (
  167. "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
  168. ),
  169. }
  170. return self.ext_info_sum
  171. async def get_extended_info_adv(self) -> dict[str, Any] | None:
  172. """Get advance page info for device chain."""
  173. _data = await self._send_command(key=CURTAIN_EXT_ADV_KEY)
  174. if not _data:
  175. _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
  176. return None
  177. if _data in (b"\x07", b"\x00"):
  178. _LOGGER.error("%s: Unsuccessful, please try again", self.name)
  179. return None
  180. _state_of_charge = [
  181. "not_charging",
  182. "charging_by_adapter",
  183. "charging_by_solar",
  184. "fully_charged",
  185. "solar_not_charging",
  186. "charging_error",
  187. ]
  188. self.ext_info_adv["device0"] = {
  189. "battery": _data[1],
  190. "firmware": _data[2] / 10.0,
  191. "stateOfCharge": _state_of_charge[_data[3]],
  192. }
  193. # If grouped curtain device present.
  194. if _data[4]:
  195. self.ext_info_adv["device1"] = {
  196. "battery": _data[4],
  197. "firmware": _data[5] / 10.0,
  198. "stateOfCharge": _state_of_charge[_data[6]],
  199. }
  200. return self.ext_info_adv
  201. def get_light_level(self) -> Any:
  202. """Return cached light level."""
  203. # To get actual light level call update() first.
  204. return self._get_adv_value("lightLevel")
  205. def is_reversed(self) -> bool:
  206. """Return True if curtain position is opposite from SB data."""
  207. return self._reverse
  208. def is_calibrated(self) -> Any:
  209. """Return True curtain is calibrated."""
  210. # To get actual light level call update() first.
  211. return self._get_adv_value("calibration")
  212. def is_opening(self) -> bool:
  213. """Return True if the curtain is opening."""
  214. return self._is_opening
  215. def is_closing(self) -> bool:
  216. """Return True if the curtain is closing."""
  217. return self._is_closing