roller_shade.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. import warnings
  5. from typing import Any
  6. from ..models import SwitchBotAdvertisement
  7. from .base_cover import CONTROL_SOURCE, ROLLERSHADE_COMMAND, SwitchbotBaseCover
  8. from .device import REQ_HEADER, SwitchbotSequenceDevice, update_after_operation
  9. _LOGGER = logging.getLogger(__name__)
  10. OPEN_KEYS = [
  11. f"{REQ_HEADER}{ROLLERSHADE_COMMAND}01{CONTROL_SOURCE}0100",
  12. f"{REQ_HEADER}{ROLLERSHADE_COMMAND}05{CONTROL_SOURCE}", # +mode + "00"
  13. ]
  14. CLOSE_KEYS = [
  15. f"{REQ_HEADER}{ROLLERSHADE_COMMAND}01{CONTROL_SOURCE}0164",
  16. f"{REQ_HEADER}{ROLLERSHADE_COMMAND}05{CONTROL_SOURCE}", # +mode + "64"
  17. ]
  18. POSITION_KEYS = [
  19. f"{REQ_HEADER}{ROLLERSHADE_COMMAND}01{CONTROL_SOURCE}01",
  20. f"{REQ_HEADER}{ROLLERSHADE_COMMAND}05{CONTROL_SOURCE}",
  21. ] # +actual_position
  22. STOP_KEYS = [f"{REQ_HEADER}{ROLLERSHADE_COMMAND}00{CONTROL_SOURCE}01"]
  23. class SwitchbotRollerShade(SwitchbotBaseCover, SwitchbotSequenceDevice):
  24. """Representation of a Switchbot Roller Shade."""
  25. def __init__(self, *args: Any, **kwargs: Any) -> None:
  26. """Switchbot roller shade constructor."""
  27. # The position of the roller shade is saved returned with 0 = open and 100 = closed.
  28. # the definition of position is the same as in Home Assistant.
  29. self._reverse: bool = kwargs.pop("reverse_mode", True)
  30. super().__init__(self._reverse, *args, **kwargs)
  31. def _set_parsed_data(
  32. self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
  33. ) -> None:
  34. """Set data."""
  35. in_motion = data["inMotion"]
  36. previous_position = self._get_adv_value("position")
  37. new_position = data["position"]
  38. self._update_motion_direction(in_motion, previous_position, new_position)
  39. super()._set_parsed_data(advertisement, data)
  40. @staticmethod
  41. def _validate_mode(mode: int) -> None:
  42. """Validate the motor mode (0 = performance, 1 = quiet)."""
  43. if mode not in (0, 1):
  44. raise ValueError(f"mode must be 0 (performance) or 1 (quiet), got {mode!r}")
  45. @update_after_operation
  46. async def open(self, mode: int = 0) -> bool:
  47. """Send open command. 0 - performance mode, 1 - quiet mode."""
  48. self._validate_mode(mode)
  49. if success := await self._send_multiple_commands(
  50. [OPEN_KEYS[0], f"{OPEN_KEYS[1]}{mode:02X}00"]
  51. ):
  52. self._is_opening = True
  53. self._is_closing = False
  54. return success
  55. @update_after_operation
  56. async def close(self, mode: int = 0, **kwargs: Any) -> bool:
  57. """
  58. Send close command. 0 - performance mode, 1 - quiet mode.
  59. ``speed`` is accepted as a deprecated alias for ``mode``: prior to
  60. the quietdrift work the ``speed`` parameter existed on this method
  61. but was a no-op. Callers passing ``speed=`` get a
  62. ``DeprecationWarning`` and the value is forwarded as ``mode``.
  63. """
  64. if "speed" in kwargs:
  65. warnings.warn(
  66. "`speed` kwarg on close() is deprecated; use `mode` instead",
  67. DeprecationWarning,
  68. stacklevel=2,
  69. )
  70. mode = kwargs.pop("speed")
  71. if kwargs:
  72. raise TypeError(
  73. f"close() got unexpected keyword arguments: {sorted(kwargs)}"
  74. )
  75. self._validate_mode(mode)
  76. if success := await self._send_multiple_commands(
  77. [CLOSE_KEYS[0], f"{CLOSE_KEYS[1]}{mode:02X}64"]
  78. ):
  79. self._is_closing = True
  80. self._is_opening = False
  81. return success
  82. @update_after_operation
  83. async def stop(self) -> bool:
  84. """Send stop command to device."""
  85. if success := await self._send_multiple_commands(STOP_KEYS):
  86. self._is_opening = self._is_closing = False
  87. return success
  88. @update_after_operation
  89. async def set_position(self, position: int, mode: int = 0) -> bool:
  90. """Send position command (0-100) to device. 0 - performance mode, 1 - quiet mode."""
  91. self._validate_mode(mode)
  92. position = (100 - position) if self._reverse else position
  93. if success := await self._send_multiple_commands(
  94. [
  95. f"{POSITION_KEYS[0]}{position:02X}",
  96. f"{POSITION_KEYS[1]}{mode:02X}{position:02X}",
  97. ]
  98. ):
  99. self._update_motion_direction(
  100. True, self._get_adv_value("position"), position
  101. )
  102. return success
  103. def get_position(self) -> Any:
  104. """Return cached position (0-100) of Curtain."""
  105. # To get actual position call update() first.
  106. return self._get_adv_value("position")
  107. async def get_basic_info(self) -> dict[str, Any] | None:
  108. """Get device basic settings."""
  109. if not (_data := await self._get_basic_info()):
  110. return None
  111. _position = max(min(_data[5], 100), 0)
  112. _direction_adjusted_position = (100 - _position) if self._reverse else _position
  113. _previous_position = self._get_adv_value("position")
  114. _in_motion = bool(_data[4] & 0b00000011)
  115. self._update_motion_direction(
  116. _in_motion, _previous_position, _direction_adjusted_position
  117. )
  118. return {
  119. "battery": _data[1],
  120. "firmware": _data[2] / 10.0,
  121. "chainLength": _data[3],
  122. "openDirection": (
  123. "clockwise" if _data[4] & 0b10000000 == 128 else "anticlockwise"
  124. ),
  125. "fault": bool(_data[4] & 0b00010000),
  126. "solarPanel": bool(_data[4] & 0b00001000),
  127. "calibration": bool(_data[4] & 0b00000100),
  128. "calibrated": bool(_data[4] & 0b00000100),
  129. "inMotion": _in_motion,
  130. "position": _direction_adjusted_position,
  131. "timers": _data[6],
  132. }
  133. def _update_motion_direction(
  134. self, in_motion: bool, previous_position: int | None, new_position: int
  135. ) -> None:
  136. """Update opening/closing status based on movement."""
  137. if previous_position is None:
  138. return
  139. if in_motion is False:
  140. self._is_closing = self._is_opening = False
  141. return
  142. if new_position != previous_position:
  143. self._is_opening = new_position > previous_position
  144. self._is_closing = new_position < previous_position