| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 | """Library to handle connection with Switchbot."""from __future__ import annotationsimport loggingfrom typing import Anyfrom switchbot.models import SwitchBotAdvertisementfrom .device import REQ_HEADER, SwitchbotDevice, update_after_operation# Curtain keysCURTAIN_COMMAND = "4501"# For second element of open and close arrs we should add two bytes i.e. ff00# First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *# * Only for curtains 3. For other models use ff# Second byte [00] is a command (00 - open, 64 - close)OPEN_KEYS = [    f"{REQ_HEADER}{CURTAIN_COMMAND}010100",    f"{REQ_HEADER}{CURTAIN_COMMAND}05",  # +speed + "00"]CLOSE_KEYS = [    f"{REQ_HEADER}{CURTAIN_COMMAND}010164",    f"{REQ_HEADER}{CURTAIN_COMMAND}05",  # +speed + "64"]POSITION_KEYS = [    f"{REQ_HEADER}{CURTAIN_COMMAND}0101",    f"{REQ_HEADER}{CURTAIN_COMMAND}05",  # +speed]  # +actual_positionSTOP_KEYS = [f"{REQ_HEADER}{CURTAIN_COMMAND}0001", f"{REQ_HEADER}{CURTAIN_COMMAND}00ff"]CURTAIN_EXT_SUM_KEY = f"{REQ_HEADER}460401"CURTAIN_EXT_ADV_KEY = f"{REQ_HEADER}460402"CURTAIN_EXT_CHAIN_INFO_KEY = f"{REQ_HEADER}468101"_LOGGER = logging.getLogger(__name__)class SwitchbotCurtain(SwitchbotDevice):    """Representation of a Switchbot Curtain."""    def __init__(self, *args: Any, **kwargs: Any) -> None:        """Switchbot Curtain/WoCurtain constructor."""        # The position of the curtain is saved returned with 0 = open and 100 = closed.        # This is independent of the calibration of the curtain bot (Open left to right/        # Open right to left/Open from the middle).        # The parameter 'reverse_mode' reverse these values,        # if 'reverse_mode' = True, position = 0 equals close        # and position = 100 equals open. The parameter is default set to True so that        # the definition of position is the same as in Home Assistant.        super().__init__(*args, **kwargs)        self._reverse: bool = kwargs.pop("reverse_mode", True)        self._settings: dict[str, Any] = {}        self.ext_info_sum: dict[str, Any] = {}        self.ext_info_adv: dict[str, Any] = {}        self._is_opening: bool = False        self._is_closing: bool = False    def _set_parsed_data(        self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]    ) -> None:        """Set data."""        in_motion = data["inMotion"]        previous_position = self._get_adv_value("position")        new_position = data["position"]        self._update_motion_direction(in_motion, previous_position, new_position)        super()._set_parsed_data(advertisement, data)    async def _send_multiple_commands(self, keys: list[str]) -> bool:        """Send multiple commands to device.        Since we current have no way to tell which command the device        needs we send both.        """        final_result = False        for key in keys:            result = await self._send_command(key)            final_result |= self._check_command_result(result, 0, {1})        return final_result    @update_after_operation    async def open(self, speed: int = 255) -> bool:        """Send open command. Speed 255 - normal, 1 - slow"""        self._is_opening = True        self._is_closing = False        return await self._send_multiple_commands(            [OPEN_KEYS[0], f"{OPEN_KEYS[1]}{speed:02X}00"]        )    @update_after_operation    async def close(self, speed: int = 255) -> bool:        """Send close command. Speed 255 - normal, 1 - slow"""        self._is_closing = True        self._is_opening = False        return await self._send_multiple_commands(            [CLOSE_KEYS[0], f"{CLOSE_KEYS[1]}{speed:02X}64"]        )    @update_after_operation    async def stop(self) -> bool:        """Send stop command to device."""        self._is_opening = self._is_closing = False        return await self._send_multiple_commands(STOP_KEYS)    @update_after_operation    async def set_position(self, position: int, speed: int = 255) -> bool:        """Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""        position = (100 - position) if self._reverse else position        self._update_motion_direction(True, self._get_adv_value("position"), position)        return await self._send_multiple_commands(            [                f"{POSITION_KEYS[0]}{position:02X}",                f"{POSITION_KEYS[1]}{speed:02X}{position:02X}",            ]        )    def get_position(self) -> Any:        """Return cached position (0-100) of Curtain."""        # To get actual position call update() first.        return self._get_adv_value("position")    async def get_basic_info(self) -> dict[str, Any] | None:        """Get device basic settings."""        if not (_data := await self._get_basic_info()):            return None        _position = max(min(_data[6], 100), 0)        _direction_adjusted_position = (100 - _position) if self._reverse else _position        _previous_position = self._get_adv_value("position")        _in_motion = bool(_data[5] & 0b01000011)        self._update_motion_direction(            _in_motion, _previous_position, _direction_adjusted_position        )        return {            "battery": _data[1],            "firmware": _data[2] / 10.0,            "chainLength": _data[3],            "openDirection": (                "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"            ),            "touchToOpen": bool(_data[4] & 0b01000000),            "light": bool(_data[4] & 0b00100000),            "fault": bool(_data[4] & 0b00001000),            "solarPanel": bool(_data[5] & 0b00001000),            "calibration": bool(_data[5] & 0b00000100),            "calibrated": bool(_data[5] & 0b00000100),            "inMotion": _in_motion,            "position": _direction_adjusted_position,            "timers": _data[7],        }    def _update_motion_direction(        self, in_motion: bool, previous_position: int | None, new_position: int    ) -> None:        """Update opening/closing status based on movement."""        if previous_position is None:            return        if in_motion is False:            self._is_closing = self._is_opening = False            return        if new_position != previous_position:            self._is_opening = new_position > previous_position            self._is_closing = new_position < previous_position    async def get_extended_info_summary(self) -> dict[str, Any] | None:        """Get basic info for all devices in chain."""        _data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)        if not _data:            _LOGGER.error("%s: Unsuccessful, no result from device", self.name)            return None        if _data in (b"\x07", b"\x00"):            _LOGGER.error("%s: Unsuccessful, please try again", self.name)            return None        self.ext_info_sum["device0"] = {            "openDirectionDefault": not bool(_data[1] & 0b10000000),            "touchToOpen": bool(_data[1] & 0b01000000),            "light": bool(_data[1] & 0b00100000),            "openDirection": (                "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"            ),        }        # if grouped curtain device present.        if _data[2] != 0:            self.ext_info_sum["device1"] = {                "openDirectionDefault": not bool(_data[1] & 0b10000000),                "touchToOpen": bool(_data[1] & 0b01000000),                "light": bool(_data[1] & 0b00100000),                "openDirection": (                    "left_to_right" if _data[1] & 0b00010000 else "right_to_left"                ),            }        return self.ext_info_sum    async def get_extended_info_adv(self) -> dict[str, Any] | None:        """Get advance page info for device chain."""        _data = await self._send_command(key=CURTAIN_EXT_ADV_KEY)        if not _data:            _LOGGER.error("%s: Unsuccessful, no result from device", self.name)            return None        if _data in (b"\x07", b"\x00"):            _LOGGER.error("%s: Unsuccessful, please try again", self.name)            return None        _state_of_charge = [            "not_charging",            "charging_by_adapter",            "charging_by_solar",            "fully_charged",            "solar_not_charging",            "charging_error",        ]        self.ext_info_adv["device0"] = {            "battery": _data[1],            "firmware": _data[2] / 10.0,            "stateOfCharge": _state_of_charge[_data[3]],        }        # If grouped curtain device present.        if _data[4]:            self.ext_info_adv["device1"] = {                "battery": _data[4],                "firmware": _data[5] / 10.0,                "stateOfCharge": _state_of_charge[_data[6]],            }        return self.ext_info_adv    def get_light_level(self) -> Any:        """Return cached light level."""        # To get actual light level call update() first.        return self._get_adv_value("lightLevel")    def is_reversed(self) -> bool:        """Return True if curtain position is opposite from SB data."""        return self._reverse    def is_calibrated(self) -> Any:        """Return True curtain is calibrated."""        # To get actual light level call update() first.        return self._get_adv_value("calibration")    def is_opening(self) -> bool:        """Return True if the curtain is opening."""        return self._is_opening    def is_closing(self) -> bool:        """Return True if the curtain is closing."""        return self._is_closing
 |