123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- """Library to handle connection with Switchbot."""
- from __future__ import annotations
- import logging
- from typing import Any
- from switchbot.models import SwitchBotAdvertisement
- from .device import REQ_HEADER, SwitchbotDevice, update_after_operation
- # Curtain keys
- CURTAIN_COMMAND = "4501"
- # For second element of open and close arrs we should add two bytes i.e. ff00
- # First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
- # * Only for curtains 3. For other models use ff
- # Second byte [00] is a command (00 - open, 64 - close)
- OPEN_KEYS = [
- f"{REQ_HEADER}{CURTAIN_COMMAND}010100",
- f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "00"
- ]
- CLOSE_KEYS = [
- f"{REQ_HEADER}{CURTAIN_COMMAND}010164",
- f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "64"
- ]
- POSITION_KEYS = [
- f"{REQ_HEADER}{CURTAIN_COMMAND}0101",
- f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed
- ] # +actual_position
- STOP_KEYS = [f"{REQ_HEADER}{CURTAIN_COMMAND}0001", f"{REQ_HEADER}{CURTAIN_COMMAND}00ff"]
- CURTAIN_EXT_SUM_KEY = f"{REQ_HEADER}460401"
- CURTAIN_EXT_ADV_KEY = f"{REQ_HEADER}460402"
- CURTAIN_EXT_CHAIN_INFO_KEY = f"{REQ_HEADER}468101"
- _LOGGER = logging.getLogger(__name__)
- class SwitchbotCurtain(SwitchbotDevice):
- """Representation of a Switchbot Curtain."""
- def __init__(self, *args: Any, **kwargs: Any) -> None:
- """Switchbot Curtain/WoCurtain constructor."""
- # The position of the curtain is saved returned with 0 = open and 100 = closed.
- # This is independent of the calibration of the curtain bot (Open left to right/
- # Open right to left/Open from the middle).
- # The parameter 'reverse_mode' reverse these values,
- # if 'reverse_mode' = True, position = 0 equals close
- # and position = 100 equals open. The parameter is default set to True so that
- # the definition of position is the same as in Home Assistant.
- super().__init__(*args, **kwargs)
- self._reverse: bool = kwargs.pop("reverse_mode", True)
- self._settings: dict[str, Any] = {}
- self.ext_info_sum: dict[str, Any] = {}
- self.ext_info_adv: dict[str, Any] = {}
- self._is_opening: bool = False
- self._is_closing: bool = False
- def _set_parsed_data(
- self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
- ) -> None:
- """Set data."""
- in_motion = data["inMotion"]
- previous_position = self._get_adv_value("position")
- new_position = data["position"]
- self._update_motion_direction(in_motion, previous_position, new_position)
- super()._set_parsed_data(advertisement, data)
- async def _send_multiple_commands(self, keys: list[str]) -> bool:
- """Send multiple commands to device.
- Since we current have no way to tell which command the device
- needs we send both.
- """
- final_result = False
- for key in keys:
- result = await self._send_command(key)
- final_result |= self._check_command_result(result, 0, {1})
- return final_result
- @update_after_operation
- async def open(self, speed: int = 255) -> bool:
- """Send open command. Speed 255 - normal, 1 - slow"""
- self._is_opening = True
- self._is_closing = False
- return await self._send_multiple_commands(
- [OPEN_KEYS[0], f"{OPEN_KEYS[1]}{speed:02X}00"]
- )
- @update_after_operation
- async def close(self, speed: int = 255) -> bool:
- """Send close command. Speed 255 - normal, 1 - slow"""
- self._is_closing = True
- self._is_opening = False
- return await self._send_multiple_commands(
- [CLOSE_KEYS[0], f"{CLOSE_KEYS[1]}{speed:02X}64"]
- )
- @update_after_operation
- async def stop(self) -> bool:
- """Send stop command to device."""
- self._is_opening = self._is_closing = False
- return await self._send_multiple_commands(STOP_KEYS)
- @update_after_operation
- async def set_position(self, position: int, speed: int = 255) -> bool:
- """Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
- position = (100 - position) if self._reverse else position
- self._update_motion_direction(True, self._get_adv_value("position"), position)
- return await self._send_multiple_commands(
- [
- f"{POSITION_KEYS[0]}{position:02X}",
- f"{POSITION_KEYS[1]}{speed:02X}{position:02X}",
- ]
- )
- def get_position(self) -> Any:
- """Return cached position (0-100) of Curtain."""
- # To get actual position call update() first.
- return self._get_adv_value("position")
- async def get_basic_info(self) -> dict[str, Any] | None:
- """Get device basic settings."""
- if not (_data := await self._get_basic_info()):
- return None
- _position = max(min(_data[6], 100), 0)
- _direction_adjusted_position = (100 - _position) if self._reverse else _position
- _previous_position = self._get_adv_value("position")
- _in_motion = bool(_data[5] & 0b01000011)
- self._update_motion_direction(
- _in_motion, _previous_position, _direction_adjusted_position
- )
- return {
- "battery": _data[1],
- "firmware": _data[2] / 10.0,
- "chainLength": _data[3],
- "openDirection": (
- "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
- ),
- "touchToOpen": bool(_data[4] & 0b01000000),
- "light": bool(_data[4] & 0b00100000),
- "fault": bool(_data[4] & 0b00001000),
- "solarPanel": bool(_data[5] & 0b00001000),
- "calibration": bool(_data[5] & 0b00000100),
- "calibrated": bool(_data[5] & 0b00000100),
- "inMotion": _in_motion,
- "position": _direction_adjusted_position,
- "timers": _data[7],
- }
- def _update_motion_direction(
- self, in_motion: bool, previous_position: int | None, new_position: int
- ) -> None:
- """Update opening/closing status based on movement."""
- if previous_position is None:
- return
- if in_motion is False:
- self._is_closing = self._is_opening = False
- return
- if new_position != previous_position:
- self._is_opening = new_position > previous_position
- self._is_closing = new_position < previous_position
- async def get_extended_info_summary(self) -> dict[str, Any] | None:
- """Get basic info for all devices in chain."""
- _data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)
- if not _data:
- _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
- return None
- if _data in (b"\x07", b"\x00"):
- _LOGGER.error("%s: Unsuccessful, please try again", self.name)
- return None
- self.ext_info_sum["device0"] = {
- "openDirectionDefault": not bool(_data[1] & 0b10000000),
- "touchToOpen": bool(_data[1] & 0b01000000),
- "light": bool(_data[1] & 0b00100000),
- "openDirection": (
- "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
- ),
- }
- # if grouped curtain device present.
- if _data[2] != 0:
- self.ext_info_sum["device1"] = {
- "openDirectionDefault": not bool(_data[1] & 0b10000000),
- "touchToOpen": bool(_data[1] & 0b01000000),
- "light": bool(_data[1] & 0b00100000),
- "openDirection": (
- "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
- ),
- }
- return self.ext_info_sum
- async def get_extended_info_adv(self) -> dict[str, Any] | None:
- """Get advance page info for device chain."""
- _data = await self._send_command(key=CURTAIN_EXT_ADV_KEY)
- if not _data:
- _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
- return None
- if _data in (b"\x07", b"\x00"):
- _LOGGER.error("%s: Unsuccessful, please try again", self.name)
- return None
- _state_of_charge = [
- "not_charging",
- "charging_by_adapter",
- "charging_by_solar",
- "fully_charged",
- "solar_not_charging",
- "charging_error",
- ]
- self.ext_info_adv["device0"] = {
- "battery": _data[1],
- "firmware": _data[2] / 10.0,
- "stateOfCharge": _state_of_charge[_data[3]],
- }
- # If grouped curtain device present.
- if _data[4]:
- self.ext_info_adv["device1"] = {
- "battery": _data[4],
- "firmware": _data[5] / 10.0,
- "stateOfCharge": _state_of_charge[_data[6]],
- }
- return self.ext_info_adv
- def get_light_level(self) -> Any:
- """Return cached light level."""
- # To get actual light level call update() first.
- return self._get_adv_value("lightLevel")
- def is_reversed(self) -> bool:
- """Return True if curtain position is opposite from SB data."""
- return self._reverse
- def is_calibrated(self) -> Any:
- """Return True curtain is calibrated."""
- # To get actual light level call update() first.
- return self._get_adv_value("calibration")
- def is_opening(self) -> bool:
- """Return True if the curtain is opening."""
- return self._is_opening
- def is_closing(self) -> bool:
- """Return True if the curtain is closing."""
- return self._is_closing
|