1
0

curtain.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from typing import Any
  5. from ..models import SwitchBotAdvertisement
  6. from .base_cover import COVER_COMMAND, COVER_EXT_SUM_KEY, SwitchbotBaseCover
  7. from .device import REQ_HEADER, update_after_operation
  8. # For second element of open and close arrs we should add two bytes i.e. ff00
  9. # First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
  10. # * Only for curtains 3. For other models use ff
  11. # Second byte [00] is a command (00 - open, 64 - close)
  12. OPEN_KEYS = [
  13. f"{REQ_HEADER}{COVER_COMMAND}010100",
  14. f"{REQ_HEADER}{COVER_COMMAND}05", # +speed + "00"
  15. ]
  16. CLOSE_KEYS = [
  17. f"{REQ_HEADER}{COVER_COMMAND}010164",
  18. f"{REQ_HEADER}{COVER_COMMAND}05", # +speed + "64"
  19. ]
  20. POSITION_KEYS = [
  21. f"{REQ_HEADER}{COVER_COMMAND}0101",
  22. f"{REQ_HEADER}{COVER_COMMAND}05", # +speed
  23. ] # +actual_position
  24. STOP_KEYS = [f"{REQ_HEADER}{COVER_COMMAND}0001", f"{REQ_HEADER}{COVER_COMMAND}00ff"]
  25. CURTAIN_EXT_CHAIN_INFO_KEY = f"{REQ_HEADER}468101"
  26. _LOGGER = logging.getLogger(__name__)
  27. class SwitchbotCurtain(SwitchbotBaseCover):
  28. """Representation of a Switchbot Curtain."""
  29. def __init__(self, *args: Any, **kwargs: Any) -> None:
  30. """Switchbot Curtain/WoCurtain constructor."""
  31. # The position of the curtain is saved returned with 0 = open and 100 = closed.
  32. # This is independent of the calibration of the curtain bot (Open left to right/
  33. # Open right to left/Open from the middle).
  34. # The parameter 'reverse_mode' reverse these values,
  35. # if 'reverse_mode' = True, position = 0 equals close
  36. # and position = 100 equals open. The parameter is default set to True so that
  37. # the definition of position is the same as in Home Assistant.
  38. self._reverse: bool = kwargs.pop("reverse_mode", True)
  39. super().__init__(self._reverse, *args, **kwargs)
  40. self._settings: dict[str, Any] = {}
  41. self.ext_info_sum: dict[str, Any] = {}
  42. self.ext_info_adv: dict[str, Any] = {}
  43. def _set_parsed_data(
  44. self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
  45. ) -> None:
  46. """Set data."""
  47. in_motion = data["inMotion"]
  48. previous_position = self._get_adv_value("position")
  49. new_position = data["position"]
  50. self._update_motion_direction(in_motion, previous_position, new_position)
  51. super()._set_parsed_data(advertisement, data)
  52. @update_after_operation
  53. async def open(self, speed: int = 255) -> bool:
  54. """Send open command. Speed 255 - normal, 1 - slow"""
  55. self._is_opening = True
  56. self._is_closing = False
  57. return await self._send_multiple_commands(
  58. [OPEN_KEYS[0], f"{OPEN_KEYS[1]}{speed:02X}00"]
  59. )
  60. @update_after_operation
  61. async def close(self, speed: int = 255) -> bool:
  62. """Send close command. Speed 255 - normal, 1 - slow"""
  63. self._is_closing = True
  64. self._is_opening = False
  65. return await self._send_multiple_commands(
  66. [CLOSE_KEYS[0], f"{CLOSE_KEYS[1]}{speed:02X}64"]
  67. )
  68. @update_after_operation
  69. async def stop(self) -> bool:
  70. """Send stop command to device."""
  71. self._is_opening = self._is_closing = False
  72. return await super().stop()
  73. @update_after_operation
  74. async def set_position(self, position: int, speed: int = 255) -> bool:
  75. """Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
  76. direction_adjusted_position = (100 - position) if self._reverse else position
  77. self._update_motion_direction(
  78. True, self._get_adv_value("position"), direction_adjusted_position
  79. )
  80. return await super().set_position(position, speed)
  81. def get_position(self) -> Any:
  82. """Return cached position (0-100) of Curtain."""
  83. # To get actual position call update() first.
  84. return self._get_adv_value("position")
  85. async def get_basic_info(self) -> dict[str, Any] | None:
  86. """Get device basic settings."""
  87. if not (_data := await self._get_basic_info()):
  88. return None
  89. _position = max(min(_data[6], 100), 0)
  90. _direction_adjusted_position = (100 - _position) if self._reverse else _position
  91. _previous_position = self._get_adv_value("position")
  92. _in_motion = bool(_data[5] & 0b01000011)
  93. self._update_motion_direction(
  94. _in_motion, _previous_position, _direction_adjusted_position
  95. )
  96. return {
  97. "battery": _data[1],
  98. "firmware": _data[2] / 10.0,
  99. "chainLength": _data[3],
  100. "openDirection": (
  101. "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
  102. ),
  103. "touchToOpen": bool(_data[4] & 0b01000000),
  104. "light": bool(_data[4] & 0b00100000),
  105. "fault": bool(_data[4] & 0b00001000),
  106. "solarPanel": bool(_data[5] & 0b00001000),
  107. "calibration": bool(_data[5] & 0b00000100),
  108. "calibrated": bool(_data[5] & 0b00000100),
  109. "inMotion": _in_motion,
  110. "position": _direction_adjusted_position,
  111. "timers": _data[7],
  112. }
  113. def _update_motion_direction(
  114. self, in_motion: bool, previous_position: int | None, new_position: int
  115. ) -> None:
  116. """Update opening/closing status based on movement."""
  117. if previous_position is None:
  118. return
  119. if in_motion is False:
  120. self._is_closing = self._is_opening = False
  121. return
  122. if new_position != previous_position:
  123. self._is_opening = new_position > previous_position
  124. self._is_closing = new_position < previous_position
  125. async def get_extended_info_summary(self) -> dict[str, Any] | None:
  126. """Get extended info for all devices in chain."""
  127. _data = await self._send_command(key=COVER_EXT_SUM_KEY)
  128. if not _data:
  129. _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
  130. return None
  131. if _data in (b"\x07", b"\x00"):
  132. _LOGGER.error("%s: Unsuccessful, please try again", self.name)
  133. return None
  134. self.ext_info_sum["device0"] = {
  135. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  136. "touchToOpen": bool(_data[1] & 0b01000000),
  137. "light": bool(_data[1] & 0b00100000),
  138. "openDirection": (
  139. "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
  140. ),
  141. }
  142. # if grouped curtain device present.
  143. if _data[2] != 0:
  144. self.ext_info_sum["device1"] = {
  145. "openDirectionDefault": not bool(_data[2] & 0b10000000),
  146. "touchToOpen": bool(_data[2] & 0b01000000),
  147. "light": bool(_data[2] & 0b00100000),
  148. "openDirection": (
  149. "left_to_right" if _data[2] & 0b00010000 else "right_to_left"
  150. ),
  151. }
  152. return self.ext_info_sum