|
@@ -1,9 +1,12 @@
|
|
|
"""Library to handle connection with Switchbot."""
|
|
|
from __future__ import annotations
|
|
|
+from dataclasses import replace
|
|
|
|
|
|
import logging
|
|
|
from typing import Any
|
|
|
|
|
|
+from switchbot.models import SwitchBotAdvertisement
|
|
|
+
|
|
|
from .device import REQ_HEADER, SwitchbotDevice, update_after_operation
|
|
|
|
|
|
# Curtain keys
|
|
@@ -54,6 +57,22 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
self._settings: dict[str, Any] = {}
|
|
|
self.ext_info_sum: dict[str, Any] = {}
|
|
|
self.ext_info_adv: dict[str, Any] = {}
|
|
|
+ self._is_opening: bool = False
|
|
|
+ self._is_closing: bool = False
|
|
|
+
|
|
|
+ def _set_parsed_data(
|
|
|
+ self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
|
|
|
+ ) -> None:
|
|
|
+ """Set data."""
|
|
|
+ _new_data = replace(
|
|
|
+ advertisement, data=self._sb_adv_data.data | {"data": data}
|
|
|
+ )
|
|
|
+ self._update_motion_direction(_new_data["inMotion"], self._get_adv_value("position"), _new_data["position"])
|
|
|
+ if _new_data["inMotion"] == True:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ self._is_closing = self._is_opening = False
|
|
|
+ super()._set_parsed_data(advertisement, data)
|
|
|
|
|
|
async def _send_multiple_commands(self, keys: list[str]) -> bool:
|
|
|
"""Send multiple commands to device.
|
|
@@ -70,6 +89,8 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
@update_after_operation
|
|
|
async def open(self, speed: int = 255) -> bool:
|
|
|
"""Send open command. Speed 255 - normal, 1 - slow"""
|
|
|
+ self._is_opening = True
|
|
|
+ self._is_closing = False
|
|
|
return await self._send_multiple_commands(
|
|
|
[OPEN_KEYS[0], f"{OPEN_KEYS[1]}{speed:02X}00"]
|
|
|
)
|
|
@@ -77,6 +98,8 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
@update_after_operation
|
|
|
async def close(self, speed: int = 255) -> bool:
|
|
|
"""Send close command. Speed 255 - normal, 1 - slow"""
|
|
|
+ self._is_closing = True
|
|
|
+ self._is_opening = False
|
|
|
return await self._send_multiple_commands(
|
|
|
[CLOSE_KEYS[0], f"{CLOSE_KEYS[1]}{speed:02X}64"]
|
|
|
)
|
|
@@ -84,12 +107,14 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
@update_after_operation
|
|
|
async def stop(self) -> bool:
|
|
|
"""Send stop command to device."""
|
|
|
+ self._is_opening = self._is_closing = False
|
|
|
return await self._send_multiple_commands(STOP_KEYS)
|
|
|
|
|
|
@update_after_operation
|
|
|
async def set_position(self, position: int, speed: int = 255) -> bool:
|
|
|
"""Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
|
|
|
position = (100 - position) if self._reverse else position
|
|
|
+ self._update_motion_direction(True, self._get_adv_value("position"), position)
|
|
|
return await self._send_multiple_commands(
|
|
|
[
|
|
|
f"{POSITION_KEYS[0]}{position:02X}",
|
|
@@ -108,6 +133,11 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
return None
|
|
|
|
|
|
_position = max(min(_data[6], 100), 0)
|
|
|
+ _direction_adjusted_position = (100 - _position) if self._reverse else _position
|
|
|
+ _previous_position = self._get_adv_value("position")
|
|
|
+ _in_motion = bool(_data[5] & 0b01000011)
|
|
|
+ self._update_motion_direction(_in_motion, _previous_position, _direction_adjusted_position)
|
|
|
+
|
|
|
return {
|
|
|
"battery": _data[1],
|
|
|
"firmware": _data[2] / 10.0,
|
|
@@ -121,10 +151,19 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
"solarPanel": bool(_data[5] & 0b00001000),
|
|
|
"calibration": bool(_data[5] & 0b00000100),
|
|
|
"calibrated": bool(_data[5] & 0b00000100),
|
|
|
- "inMotion": bool(_data[5] & 0b01000011),
|
|
|
- "position": (100 - _position) if self._reverse else _position,
|
|
|
+ "inMotion": _in_motion,
|
|
|
+ "position": _direction_adjusted_position,
|
|
|
"timers": _data[7],
|
|
|
}
|
|
|
+
|
|
|
+ def _update_motion_direction(self, in_motion: bool, previous_position: int, new_position: int) -> None:
|
|
|
+ """Update opening/closing status based on movement."""
|
|
|
+ if in_motion == True:
|
|
|
+ if new_position != previous_position:
|
|
|
+ self._is_opening = new_position > previous_position
|
|
|
+ self._is_closing = new_position < previous_position
|
|
|
+ else:
|
|
|
+ self._is_closing = self._is_opening = False
|
|
|
|
|
|
async def get_extended_info_summary(self) -> dict[str, Any] | None:
|
|
|
"""Get basic info for all devices in chain."""
|
|
@@ -210,3 +249,11 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
"""Return True curtain is calibrated."""
|
|
|
# To get actual light level call update() first.
|
|
|
return self._get_adv_value("calibration")
|
|
|
+
|
|
|
+ def is_opening(self) -> Any:
|
|
|
+ """Return True if the curtain is opening."""
|
|
|
+ return self._is_opening
|
|
|
+
|
|
|
+ def is_closing(self) -> Any:
|
|
|
+ """Return True if the curtain is closing."""
|
|
|
+ return self._is_closing
|