base_cover.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. from abc import abstractmethod
  5. from typing import Any
  6. from .device import REQ_HEADER, SwitchbotDevice, update_after_operation
  7. # Cover keys
  8. COVER_COMMAND = "4501"
  9. ROLLERSHADE_COMMAND = "4701"
  10. CONTROL_SOURCE = "00"
  11. # For second element of open and close arrs we should add two bytes i.e. ff00
  12. # First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
  13. # * Only for curtains 3. For other models use ff
  14. # Second byte [00] is a command (00 - open, 64 - close)
  15. POSITION_KEYS = [
  16. f"{REQ_HEADER}{COVER_COMMAND}0101",
  17. f"{REQ_HEADER}{COVER_COMMAND}05", # +speed
  18. ] # +actual_position
  19. STOP_KEYS = [f"{REQ_HEADER}{COVER_COMMAND}0001", f"{REQ_HEADER}{COVER_COMMAND}00ff"]
  20. COVER_EXT_SUM_KEY = f"{REQ_HEADER}460401"
  21. COVER_EXT_ADV_KEY = f"{REQ_HEADER}460402"
  22. _LOGGER = logging.getLogger(__name__)
  23. class SwitchbotBaseCover(SwitchbotDevice):
  24. """Representation of a Switchbot Cover devices for both curtains and tilt blinds."""
  25. def __init__(self, reverse: bool, *args: Any, **kwargs: Any) -> None:
  26. """Switchbot Cover device constructor."""
  27. super().__init__(*args, **kwargs)
  28. self._reverse = reverse
  29. self._settings: dict[str, Any] = {}
  30. self.ext_info_sum: dict[str, Any] = {}
  31. self.ext_info_adv: dict[str, Any] = {}
  32. self._is_opening: bool = False
  33. self._is_closing: bool = False
  34. async def _send_multiple_commands(self, keys: list[str]) -> bool:
  35. """
  36. Send multiple commands to device.
  37. Since we current have no way to tell which command the device
  38. needs we send both.
  39. """
  40. final_result = False
  41. for key in keys:
  42. result = await self._send_command(key)
  43. final_result |= self._check_command_result(result, 0, {1})
  44. return final_result
  45. @update_after_operation
  46. async def stop(self) -> bool:
  47. """Send stop command to device."""
  48. return await self._send_multiple_commands(STOP_KEYS)
  49. @update_after_operation
  50. async def set_position(self, position: int, speed: int = 255) -> bool:
  51. """Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
  52. position = (100 - position) if self._reverse else position
  53. return await self._send_multiple_commands(
  54. [
  55. f"{POSITION_KEYS[0]}{position:02X}",
  56. f"{POSITION_KEYS[1]}{speed:02X}{position:02X}",
  57. ]
  58. )
  59. @abstractmethod
  60. def get_position(self) -> Any:
  61. """Return current device position."""
  62. @abstractmethod
  63. async def get_basic_info(self) -> dict[str, Any] | None:
  64. """Get device basic settings."""
  65. @abstractmethod
  66. async def get_extended_info_summary(self) -> dict[str, Any] | None:
  67. """Get extended info for all devices in chain."""
  68. async def get_extended_info_adv(self) -> dict[str, Any] | None:
  69. """Get advance page info for device chain."""
  70. _data = await self._send_command(key=COVER_EXT_ADV_KEY)
  71. if not _data:
  72. _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
  73. return None
  74. if _data in (b"\x07", b"\x00"):
  75. _LOGGER.error("%s: Unsuccessful, please try again", self.name)
  76. return None
  77. _state_of_charge = [
  78. "not_charging",
  79. "charging_by_adapter",
  80. "charging_by_solar",
  81. "fully_charged",
  82. "solar_not_charging",
  83. "charging_error",
  84. ]
  85. self.ext_info_adv["device0"] = {
  86. "battery": _data[1],
  87. "firmware": _data[2] / 10.0,
  88. "stateOfCharge": _state_of_charge[_data[3]],
  89. }
  90. # If grouped curtain device present.
  91. if _data[4]:
  92. self.ext_info_adv["device1"] = {
  93. "battery": _data[4],
  94. "firmware": _data[5] / 10.0,
  95. "stateOfCharge": _state_of_charge[_data[6]],
  96. }
  97. return self.ext_info_adv
  98. def get_light_level(self) -> Any:
  99. """Return cached light level."""
  100. # To get actual light level call update() first.
  101. return self._get_adv_value("lightLevel")
  102. def is_reversed(self) -> bool:
  103. """Return True if curtain position is opposite from SB data."""
  104. return self._reverse
  105. def is_calibrated(self) -> Any:
  106. """Return True curtain is calibrated."""
  107. # To get actual light level call update() first.
  108. return self._get_adv_value("calibration")
  109. def is_opening(self) -> bool:
  110. """Return True if the curtain is opening."""
  111. return self._is_opening
  112. def is_closing(self) -> bool:
  113. """Return True if the curtain is closing."""
  114. return self._is_closing