curtain.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from typing import Any
  5. from .device import REQ_HEADER, SwitchbotDevice, update_after_operation
  6. # Curtain keys
  7. CURTAIN_COMMAND = "4501"
  8. # For second element of open and close arrs we should add two bytes i.e. ff00
  9. # First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
  10. # * Only for curtains 3. For other models use ff
  11. # Second byte [00] is a command (00 - open, 64 - close)
  12. OPEN_KEYS = [
  13. f"{REQ_HEADER}{CURTAIN_COMMAND}010100",
  14. f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "00"
  15. ]
  16. CLOSE_KEYS = [
  17. f"{REQ_HEADER}{CURTAIN_COMMAND}010164",
  18. f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "64"
  19. ]
  20. POSITION_KEYS = [
  21. f"{REQ_HEADER}{CURTAIN_COMMAND}0101",
  22. f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed
  23. ] # +actual_position
  24. STOP_KEYS = [f"{REQ_HEADER}{CURTAIN_COMMAND}0001", f"{REQ_HEADER}{CURTAIN_COMMAND}00ff"]
  25. CURTAIN_EXT_SUM_KEY = f"{REQ_HEADER}460401"
  26. CURTAIN_EXT_ADV_KEY = f"{REQ_HEADER}460402"
  27. CURTAIN_EXT_CHAIN_INFO_KEY = f"{REQ_HEADER}468101"
  28. _LOGGER = logging.getLogger(__name__)
  29. class SwitchbotCurtain(SwitchbotDevice):
  30. """Representation of a Switchbot Curtain."""
  31. def __init__(self, *args: Any, **kwargs: Any) -> None:
  32. """Switchbot Curtain/WoCurtain constructor."""
  33. # The position of the curtain is saved returned with 0 = open and 100 = closed.
  34. # This is independent of the calibration of the curtain bot (Open left to right/
  35. # Open right to left/Open from the middle).
  36. # The parameter 'reverse_mode' reverse these values,
  37. # if 'reverse_mode' = True, position = 0 equals close
  38. # and position = 100 equals open. The parameter is default set to True so that
  39. # the definition of position is the same as in Home Assistant.
  40. super().__init__(*args, **kwargs)
  41. self._reverse: bool = kwargs.pop("reverse_mode", True)
  42. self._settings: dict[str, Any] = {}
  43. self.ext_info_sum: dict[str, Any] = {}
  44. self.ext_info_adv: dict[str, Any] = {}
  45. async def _send_multiple_commands(self, keys: list[str]) -> bool:
  46. """Send multiple commands to device.
  47. Since we current have no way to tell which command the device
  48. needs we send both.
  49. """
  50. final_result = False
  51. for key in keys:
  52. result = await self._send_command(key)
  53. final_result |= self._check_command_result(result, 0, {1})
  54. return final_result
  55. @update_after_operation
  56. async def open(self, speed: int = 255) -> bool:
  57. """Send open command. Speed 255 - normal, 1 - slow"""
  58. return await self._send_multiple_commands(
  59. [OPEN_KEYS[0], f"{OPEN_KEYS[1]}{speed:02X}00"]
  60. )
  61. @update_after_operation
  62. async def close(self, speed: int = 255) -> bool:
  63. """Send close command. Speed 255 - normal, 1 - slow"""
  64. return await self._send_multiple_commands(
  65. [CLOSE_KEYS[0], f"{CLOSE_KEYS[1]}{speed:02X}64"]
  66. )
  67. @update_after_operation
  68. async def stop(self) -> bool:
  69. """Send stop command to device."""
  70. return await self._send_multiple_commands(STOP_KEYS)
  71. @update_after_operation
  72. async def set_position(self, position: int, speed: int = 255) -> bool:
  73. """Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
  74. position = (100 - position) if self._reverse else position
  75. return await self._send_multiple_commands(
  76. [
  77. f"{POSITION_KEYS[0]}{position:02X}",
  78. f"{POSITION_KEYS[1]}{speed:02X}{position:02X}",
  79. ]
  80. )
  81. def get_position(self) -> Any:
  82. """Return cached position (0-100) of Curtain."""
  83. # To get actual position call update() first.
  84. return self._get_adv_value("position")
  85. async def get_basic_info(self) -> dict[str, Any] | None:
  86. """Get device basic settings."""
  87. if not (_data := await self._get_basic_info()):
  88. return None
  89. _position = max(min(_data[6], 100), 0)
  90. return {
  91. "battery": _data[1],
  92. "firmware": _data[2] / 10.0,
  93. "chainLength": _data[3],
  94. "openDirection": (
  95. "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
  96. ),
  97. "touchToOpen": bool(_data[4] & 0b01000000),
  98. "light": bool(_data[4] & 0b00100000),
  99. "fault": bool(_data[4] & 0b00001000),
  100. "solarPanel": bool(_data[5] & 0b00001000),
  101. "calibration": bool(_data[5] & 0b00000100),
  102. "calibrated": bool(_data[5] & 0b00000100),
  103. "inMotion": bool(_data[5] & 0b01000011),
  104. "position": (100 - _position) if self._reverse else _position,
  105. "timers": _data[7],
  106. }
  107. async def get_extended_info_summary(self) -> dict[str, Any] | None:
  108. """Get basic info for all devices in chain."""
  109. _data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)
  110. if not _data:
  111. _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
  112. return None
  113. if _data in (b"\x07", b"\x00"):
  114. _LOGGER.error("%s: Unsuccessful, please try again", self.name)
  115. return None
  116. self.ext_info_sum["device0"] = {
  117. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  118. "touchToOpen": bool(_data[1] & 0b01000000),
  119. "light": bool(_data[1] & 0b00100000),
  120. "openDirection": (
  121. "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
  122. ),
  123. }
  124. # if grouped curtain device present.
  125. if _data[2] != 0:
  126. self.ext_info_sum["device1"] = {
  127. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  128. "touchToOpen": bool(_data[1] & 0b01000000),
  129. "light": bool(_data[1] & 0b00100000),
  130. "openDirection": (
  131. "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
  132. ),
  133. }
  134. return self.ext_info_sum
  135. async def get_extended_info_adv(self) -> dict[str, Any] | None:
  136. """Get advance page info for device chain."""
  137. _data = await self._send_command(key=CURTAIN_EXT_ADV_KEY)
  138. if not _data:
  139. _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
  140. return None
  141. if _data in (b"\x07", b"\x00"):
  142. _LOGGER.error("%s: Unsuccessful, please try again", self.name)
  143. return None
  144. _state_of_charge = [
  145. "not_charging",
  146. "charging_by_adapter",
  147. "charging_by_solar",
  148. "fully_charged",
  149. "solar_not_charging",
  150. "charging_error",
  151. ]
  152. self.ext_info_adv["device0"] = {
  153. "battery": _data[1],
  154. "firmware": _data[2] / 10.0,
  155. "stateOfCharge": _state_of_charge[_data[3]],
  156. }
  157. # If grouped curtain device present.
  158. if _data[4]:
  159. self.ext_info_adv["device1"] = {
  160. "battery": _data[4],
  161. "firmware": _data[5] / 10.0,
  162. "stateOfCharge": _state_of_charge[_data[6]],
  163. }
  164. return self.ext_info_adv
  165. def get_light_level(self) -> Any:
  166. """Return cached light level."""
  167. # To get actual light level call update() first.
  168. return self._get_adv_value("lightLevel")
  169. def is_reversed(self) -> bool:
  170. """Return True if curtain position is opposite from SB data."""
  171. return self._reverse
  172. def is_calibrated(self) -> Any:
  173. """Return True curtain is calibrated."""
  174. # To get actual light level call update() first.
  175. return self._get_adv_value("calibration")