|
@@ -0,0 +1,116 @@
|
|
|
+"""Library to handle connection with Switchbot."""
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+import logging
|
|
|
+from typing import Any
|
|
|
+
|
|
|
+from switchbot.devices.device import REQ_HEADER, update_after_operation
|
|
|
+
|
|
|
+from .curtain import CURTAIN_EXT_SUM_KEY, SwitchbotCurtain
|
|
|
+
|
|
|
+_LOGGER = logging.getLogger(__name__)
|
|
|
+
|
|
|
+
|
|
|
+BLIND_COMMAND = "4501"
|
|
|
+OPEN_KEYS = [
|
|
|
+ f"{REQ_HEADER}{BLIND_COMMAND}010132",
|
|
|
+ f"{REQ_HEADER}{BLIND_COMMAND}05ff32",
|
|
|
+]
|
|
|
+CLOSE_DOWN_KEYS = [
|
|
|
+ f"{REQ_HEADER}{BLIND_COMMAND}010100",
|
|
|
+ f"{REQ_HEADER}{BLIND_COMMAND}05ff00",
|
|
|
+]
|
|
|
+CLOSE_UP_KEYS = [
|
|
|
+ f"{REQ_HEADER}{BLIND_COMMAND}010164",
|
|
|
+ f"{REQ_HEADER}{BLIND_COMMAND}05ff64",
|
|
|
+]
|
|
|
+
|
|
|
+
|
|
|
+class SwitchbotBlindTilt(SwitchbotCurtain):
|
|
|
+ """Representation of a Switchbot Blind Tilt."""
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
|
+ """Switchbot Blind Tilt/woBlindTilt constructor."""
|
|
|
+ super().__init__(*args, **kwargs)
|
|
|
+
|
|
|
+ self._reverse: bool = kwargs.pop("reverse_mode", False)
|
|
|
+
|
|
|
+ @update_after_operation
|
|
|
+ async def open(self) -> bool:
|
|
|
+ """Send open command."""
|
|
|
+ return await self._send_multiple_commands(OPEN_KEYS)
|
|
|
+
|
|
|
+ @update_after_operation
|
|
|
+ async def close_up(self) -> bool:
|
|
|
+ """Send close up command."""
|
|
|
+ return await self._send_multiple_commands(CLOSE_UP_KEYS)
|
|
|
+
|
|
|
+ @update_after_operation
|
|
|
+ async def close_down(self) -> bool:
|
|
|
+ """Send close down command."""
|
|
|
+ return await self._send_multiple_commands(CLOSE_DOWN_KEYS)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ @update_after_operation
|
|
|
+ async def close(self) -> bool:
|
|
|
+ """Send close command."""
|
|
|
+ if self.get_position() > 50:
|
|
|
+ return await self.close_up()
|
|
|
+ else:
|
|
|
+ return await self.close_down()
|
|
|
+
|
|
|
+ def get_position(self) -> Any:
|
|
|
+ """Return cached tilt (0-100) of Blind Tilt."""
|
|
|
+
|
|
|
+ return self._get_adv_value("tilt")
|
|
|
+
|
|
|
+ async def get_basic_info(self) -> dict[str, Any] | None:
|
|
|
+ """Get device basic settings."""
|
|
|
+ if not (_data := await self._get_basic_info()):
|
|
|
+ return None
|
|
|
+
|
|
|
+ _tilt = max(min(_data[6], 100), 0)
|
|
|
+ return {
|
|
|
+ "battery": _data[1],
|
|
|
+ "firmware": _data[2] / 10.0,
|
|
|
+ "light": bool(_data[4] & 0b00100000),
|
|
|
+ "fault": bool(_data[4] & 0b00001000),
|
|
|
+ "solarPanel": bool(_data[5] & 0b00001000),
|
|
|
+ "calibration": bool(_data[5] & 0b00000100),
|
|
|
+ "calibrated": bool(_data[5] & 0b00000100),
|
|
|
+ "inMotion": bool(_data[5] & 0b00000011),
|
|
|
+ "motionDirection": {
|
|
|
+ "up": bool(_data[5] & (0b00000010 if self._reverse else 0b00000001)),
|
|
|
+ "down": bool(_data[5] & (0b00000001 if self._reverse else 0b00000010)),
|
|
|
+ },
|
|
|
+ "tilt": (100 - _tilt) if self._reverse else _tilt,
|
|
|
+ "timers": _data[7],
|
|
|
+ }
|
|
|
+
|
|
|
+ async def get_extended_info_summary(self) -> dict[str, Any] | None:
|
|
|
+ """Get basic info for all devices in chain."""
|
|
|
+ _data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)
|
|
|
+
|
|
|
+ if not _data:
|
|
|
+ _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
|
|
|
+ return None
|
|
|
+
|
|
|
+ if _data in (b"\x07", b"\x00"):
|
|
|
+ _LOGGER.error("%s: Unsuccessful, please try again", self.name)
|
|
|
+ return None
|
|
|
+
|
|
|
+ self.ext_info_sum["device0"] = {
|
|
|
+ "light": bool(_data[1] & 0b00100000),
|
|
|
+ }
|
|
|
+
|
|
|
+ return self.ext_info_sum
|