roller_shade.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from typing import Any
  5. from ..models import SwitchBotAdvertisement
  6. from .base_cover import CONTROL_SOURCE, ROLLERSHADE_COMMAND, SwitchbotBaseCover
  7. from .device import REQ_HEADER, SwitchbotSequenceDevice, update_after_operation
  8. _LOGGER = logging.getLogger(__name__)
  9. OPEN_KEYS = [
  10. f"{REQ_HEADER}{ROLLERSHADE_COMMAND}01{CONTROL_SOURCE}0100",
  11. f"{REQ_HEADER}{ROLLERSHADE_COMMAND}05{CONTROL_SOURCE}0000",
  12. ]
  13. CLOSE_KEYS = [
  14. f"{REQ_HEADER}{ROLLERSHADE_COMMAND}01{CONTROL_SOURCE}0164",
  15. f"{REQ_HEADER}{ROLLERSHADE_COMMAND}05{CONTROL_SOURCE}0064",
  16. ]
  17. POSITION_KEYS = [
  18. f"{REQ_HEADER}{ROLLERSHADE_COMMAND}01{CONTROL_SOURCE}01",
  19. f"{REQ_HEADER}{ROLLERSHADE_COMMAND}05{CONTROL_SOURCE}",
  20. ] # +actual_position
  21. STOP_KEYS = [f"{REQ_HEADER}{ROLLERSHADE_COMMAND}00{CONTROL_SOURCE}01"]
  22. class SwitchbotRollerShade(SwitchbotBaseCover, SwitchbotSequenceDevice):
  23. """Representation of a Switchbot Roller Shade."""
  24. def __init__(self, *args: Any, **kwargs: Any) -> None:
  25. """Switchbot roller shade constructor."""
  26. # The position of the roller shade is saved returned with 0 = open and 100 = closed.
  27. # the definition of position is the same as in Home Assistant.
  28. self._reverse: bool = kwargs.pop("reverse_mode", True)
  29. super().__init__(self._reverse, *args, **kwargs)
  30. def _set_parsed_data(
  31. self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
  32. ) -> None:
  33. """Set data."""
  34. in_motion = data["inMotion"]
  35. previous_position = self._get_adv_value("position")
  36. new_position = data["position"]
  37. self._update_motion_direction(in_motion, previous_position, new_position)
  38. super()._set_parsed_data(advertisement, data)
  39. @update_after_operation
  40. async def open(self, mode: int = 0) -> bool:
  41. """Send open command. 0 - performance mode, 1 - unfelt mode."""
  42. self._is_opening = True
  43. self._is_closing = False
  44. return await self._send_multiple_commands(OPEN_KEYS)
  45. @update_after_operation
  46. async def close(self, speed: int = 0) -> bool:
  47. """Send close command. 0 - performance mode, 1 - unfelt mode."""
  48. self._is_closing = True
  49. self._is_opening = False
  50. return await self._send_multiple_commands(CLOSE_KEYS)
  51. @update_after_operation
  52. async def stop(self) -> bool:
  53. """Send stop command to device."""
  54. self._is_opening = self._is_closing = False
  55. return await self._send_multiple_commands(STOP_KEYS)
  56. @update_after_operation
  57. async def set_position(self, position: int, mode: int = 0) -> bool:
  58. """Send position command (0-100) to device. 0 - performance mode, 1 - unfelt mode."""
  59. position = (100 - position) if self._reverse else position
  60. self._update_motion_direction(True, self._get_adv_value("position"), position)
  61. return await self._send_multiple_commands(
  62. [
  63. f"{POSITION_KEYS[0]}{position:02X}",
  64. f"{POSITION_KEYS[1]}{mode:02X}{position:02X}",
  65. ]
  66. )
  67. def get_position(self) -> Any:
  68. """Return cached position (0-100) of Curtain."""
  69. # To get actual position call update() first.
  70. return self._get_adv_value("position")
  71. async def get_basic_info(self) -> dict[str, Any] | None:
  72. """Get device basic settings."""
  73. if not (_data := await self._get_basic_info()):
  74. return None
  75. _position = max(min(_data[5], 100), 0)
  76. _direction_adjusted_position = (100 - _position) if self._reverse else _position
  77. _previous_position = self._get_adv_value("position")
  78. _in_motion = bool(_data[4] & 0b00000011)
  79. self._update_motion_direction(
  80. _in_motion, _previous_position, _direction_adjusted_position
  81. )
  82. return {
  83. "battery": _data[1],
  84. "firmware": _data[2] / 10.0,
  85. "chainLength": _data[3],
  86. "openDirection": (
  87. "clockwise" if _data[4] & 0b10000000 == 128 else "anticlockwise"
  88. ),
  89. "fault": bool(_data[4] & 0b00010000),
  90. "solarPanel": bool(_data[4] & 0b00001000),
  91. "calibration": bool(_data[4] & 0b00000100),
  92. "calibrated": bool(_data[4] & 0b00000100),
  93. "inMotion": _in_motion,
  94. "position": _direction_adjusted_position,
  95. "timers": _data[6],
  96. }
  97. def _update_motion_direction(
  98. self, in_motion: bool, previous_position: int | None, new_position: int
  99. ) -> None:
  100. """Update opening/closing status based on movement."""
  101. if previous_position is None:
  102. return
  103. if in_motion is False:
  104. self._is_closing = self._is_opening = False
  105. return
  106. if new_position != previous_position:
  107. self._is_opening = new_position > previous_position
  108. self._is_closing = new_position < previous_position