curtain.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. from dataclasses import replace
  4. import logging
  5. from typing import Any
  6. from switchbot.models import SwitchBotAdvertisement
  7. from .device import REQ_HEADER, SwitchbotDevice, update_after_operation
  8. # Curtain keys
  9. CURTAIN_COMMAND = "4501"
  10. # For second element of open and close arrs we should add two bytes i.e. ff00
  11. # First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
  12. # * Only for curtains 3. For other models use ff
  13. # Second byte [00] is a command (00 - open, 64 - close)
  14. OPEN_KEYS = [
  15. f"{REQ_HEADER}{CURTAIN_COMMAND}010100",
  16. f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "00"
  17. ]
  18. CLOSE_KEYS = [
  19. f"{REQ_HEADER}{CURTAIN_COMMAND}010164",
  20. f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "64"
  21. ]
  22. POSITION_KEYS = [
  23. f"{REQ_HEADER}{CURTAIN_COMMAND}0101",
  24. f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed
  25. ] # +actual_position
  26. STOP_KEYS = [f"{REQ_HEADER}{CURTAIN_COMMAND}0001", f"{REQ_HEADER}{CURTAIN_COMMAND}00ff"]
  27. CURTAIN_EXT_SUM_KEY = f"{REQ_HEADER}460401"
  28. CURTAIN_EXT_ADV_KEY = f"{REQ_HEADER}460402"
  29. CURTAIN_EXT_CHAIN_INFO_KEY = f"{REQ_HEADER}468101"
  30. _LOGGER = logging.getLogger(__name__)
  31. class SwitchbotCurtain(SwitchbotDevice):
  32. """Representation of a Switchbot Curtain."""
  33. def __init__(self, *args: Any, **kwargs: Any) -> None:
  34. """Switchbot Curtain/WoCurtain constructor."""
  35. # The position of the curtain is saved returned with 0 = open and 100 = closed.
  36. # This is independent of the calibration of the curtain bot (Open left to right/
  37. # Open right to left/Open from the middle).
  38. # The parameter 'reverse_mode' reverse these values,
  39. # if 'reverse_mode' = True, position = 0 equals close
  40. # and position = 100 equals open. The parameter is default set to True so that
  41. # the definition of position is the same as in Home Assistant.
  42. super().__init__(*args, **kwargs)
  43. self._reverse: bool = kwargs.pop("reverse_mode", True)
  44. self._settings: dict[str, Any] = {}
  45. self.ext_info_sum: dict[str, Any] = {}
  46. self.ext_info_adv: dict[str, Any] = {}
  47. self._is_opening: bool = False
  48. self._is_closing: bool = False
  49. def _set_parsed_data(
  50. self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
  51. ) -> None:
  52. """Set data."""
  53. _new_data = replace(
  54. advertisement, data=self._sb_adv_data.data | {"data": data}
  55. )
  56. self._update_motion_direction(_new_data.data["data"].get("inMotion"), self._get_adv_value("position"), _new_data.data["data"].get("position"))
  57. super()._set_parsed_data(advertisement, data)
  58. async def _send_multiple_commands(self, keys: list[str]) -> bool:
  59. """Send multiple commands to device.
  60. Since we current have no way to tell which command the device
  61. needs we send both.
  62. """
  63. final_result = False
  64. for key in keys:
  65. result = await self._send_command(key)
  66. final_result |= self._check_command_result(result, 0, {1})
  67. return final_result
  68. @update_after_operation
  69. async def open(self, speed: int = 255) -> bool:
  70. """Send open command. Speed 255 - normal, 1 - slow"""
  71. self._is_opening = True
  72. self._is_closing = False
  73. return await self._send_multiple_commands(
  74. [OPEN_KEYS[0], f"{OPEN_KEYS[1]}{speed:02X}00"]
  75. )
  76. @update_after_operation
  77. async def close(self, speed: int = 255) -> bool:
  78. """Send close command. Speed 255 - normal, 1 - slow"""
  79. self._is_closing = True
  80. self._is_opening = False
  81. return await self._send_multiple_commands(
  82. [CLOSE_KEYS[0], f"{CLOSE_KEYS[1]}{speed:02X}64"]
  83. )
  84. @update_after_operation
  85. async def stop(self) -> bool:
  86. """Send stop command to device."""
  87. self._is_opening = self._is_closing = False
  88. return await self._send_multiple_commands(STOP_KEYS)
  89. @update_after_operation
  90. async def set_position(self, position: int, speed: int = 255) -> bool:
  91. """Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
  92. position = (100 - position) if self._reverse else position
  93. self._update_motion_direction(True, self._get_adv_value("position"), position)
  94. return await self._send_multiple_commands(
  95. [
  96. f"{POSITION_KEYS[0]}{position:02X}",
  97. f"{POSITION_KEYS[1]}{speed:02X}{position:02X}",
  98. ]
  99. )
  100. def get_position(self) -> Any:
  101. """Return cached position (0-100) of Curtain."""
  102. # To get actual position call update() first.
  103. return self._get_adv_value("position")
  104. async def get_basic_info(self) -> dict[str, Any] | None:
  105. """Get device basic settings."""
  106. if not (_data := await self._get_basic_info()):
  107. return None
  108. _position = max(min(_data[6], 100), 0)
  109. _direction_adjusted_position = (100 - _position) if self._reverse else _position
  110. _previous_position = self._get_adv_value("position")
  111. _in_motion = bool(_data[5] & 0b01000011)
  112. self._update_motion_direction(_in_motion, _previous_position, _direction_adjusted_position)
  113. return {
  114. "battery": _data[1],
  115. "firmware": _data[2] / 10.0,
  116. "chainLength": _data[3],
  117. "openDirection": (
  118. "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
  119. ),
  120. "touchToOpen": bool(_data[4] & 0b01000000),
  121. "light": bool(_data[4] & 0b00100000),
  122. "fault": bool(_data[4] & 0b00001000),
  123. "solarPanel": bool(_data[5] & 0b00001000),
  124. "calibration": bool(_data[5] & 0b00000100),
  125. "calibrated": bool(_data[5] & 0b00000100),
  126. "inMotion": _in_motion,
  127. "position": _direction_adjusted_position,
  128. "timers": _data[7],
  129. }
  130. def _update_motion_direction(self, in_motion: bool, previous_position: int, new_position: int) -> None:
  131. """Update opening/closing status based on movement."""
  132. if in_motion == True:
  133. if new_position != previous_position:
  134. self._is_opening = new_position > previous_position
  135. self._is_closing = new_position < previous_position
  136. else:
  137. self._is_closing = self._is_opening = False
  138. async def get_extended_info_summary(self) -> dict[str, Any] | None:
  139. """Get basic info for all devices in chain."""
  140. _data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)
  141. if not _data:
  142. _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
  143. return None
  144. if _data in (b"\x07", b"\x00"):
  145. _LOGGER.error("%s: Unsuccessful, please try again", self.name)
  146. return None
  147. self.ext_info_sum["device0"] = {
  148. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  149. "touchToOpen": bool(_data[1] & 0b01000000),
  150. "light": bool(_data[1] & 0b00100000),
  151. "openDirection": (
  152. "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
  153. ),
  154. }
  155. # if grouped curtain device present.
  156. if _data[2] != 0:
  157. self.ext_info_sum["device1"] = {
  158. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  159. "touchToOpen": bool(_data[1] & 0b01000000),
  160. "light": bool(_data[1] & 0b00100000),
  161. "openDirection": (
  162. "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
  163. ),
  164. }
  165. return self.ext_info_sum
  166. async def get_extended_info_adv(self) -> dict[str, Any] | None:
  167. """Get advance page info for device chain."""
  168. _data = await self._send_command(key=CURTAIN_EXT_ADV_KEY)
  169. if not _data:
  170. _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
  171. return None
  172. if _data in (b"\x07", b"\x00"):
  173. _LOGGER.error("%s: Unsuccessful, please try again", self.name)
  174. return None
  175. _state_of_charge = [
  176. "not_charging",
  177. "charging_by_adapter",
  178. "charging_by_solar",
  179. "fully_charged",
  180. "solar_not_charging",
  181. "charging_error",
  182. ]
  183. self.ext_info_adv["device0"] = {
  184. "battery": _data[1],
  185. "firmware": _data[2] / 10.0,
  186. "stateOfCharge": _state_of_charge[_data[3]],
  187. }
  188. # If grouped curtain device present.
  189. if _data[4]:
  190. self.ext_info_adv["device1"] = {
  191. "battery": _data[4],
  192. "firmware": _data[5] / 10.0,
  193. "stateOfCharge": _state_of_charge[_data[6]],
  194. }
  195. return self.ext_info_adv
  196. def get_light_level(self) -> Any:
  197. """Return cached light level."""
  198. # To get actual light level call update() first.
  199. return self._get_adv_value("lightLevel")
  200. def is_reversed(self) -> bool:
  201. """Return True if curtain position is opposite from SB data."""
  202. return self._reverse
  203. def is_calibrated(self) -> Any:
  204. """Return True curtain is calibrated."""
  205. # To get actual light level call update() first.
  206. return self._get_adv_value("calibration")
  207. def is_opening(self) -> Any:
  208. """Return True if the curtain is opening."""
  209. return self._is_opening
  210. def is_closing(self) -> Any:
  211. """Return True if the curtain is closing."""
  212. return self._is_closing