air_purifier.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. import struct
  5. from typing import Any
  6. from bleak.backends.device import BLEDevice
  7. from ..adv_parsers.air_purifier import get_air_purifier_mode
  8. from ..const import SwitchbotModel
  9. from ..const.air_purifier import AirPurifierMode, AirQualityLevel
  10. from .device import (
  11. SwitchbotEncryptedDevice,
  12. SwitchbotSequenceDevice,
  13. update_after_operation,
  14. )
  15. _LOGGER = logging.getLogger(__name__)
  16. COMMAND_HEAD = "570f4c"
  17. COMMAND_TURN_OFF = f"{COMMAND_HEAD}010000"
  18. COMMAND_TURN_ON = f"{COMMAND_HEAD}010100"
  19. COMMAND_SET_MODE = {
  20. AirPurifierMode.LEVEL_1.name.lower(): f"{COMMAND_HEAD}01010100",
  21. AirPurifierMode.LEVEL_2.name.lower(): f"{COMMAND_HEAD}01010132",
  22. AirPurifierMode.LEVEL_3.name.lower(): f"{COMMAND_HEAD}01010164",
  23. AirPurifierMode.AUTO.name.lower(): f"{COMMAND_HEAD}01010200",
  24. AirPurifierMode.PET.name.lower(): f"{COMMAND_HEAD}01010300",
  25. AirPurifierMode.SLEEP.name.lower(): f"{COMMAND_HEAD}01010400",
  26. }
  27. DEVICE_GET_BASIC_SETTINGS_KEY = "570f4d81"
  28. class SwitchbotAirPurifier(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  29. """Representation of a Switchbot Air Purifier."""
  30. def __init__(
  31. self,
  32. device: BLEDevice,
  33. key_id: str,
  34. encryption_key: str,
  35. interface: int = 0,
  36. model: SwitchbotModel = SwitchbotModel.AIR_PURIFIER,
  37. **kwargs: Any,
  38. ) -> None:
  39. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  40. @classmethod
  41. async def verify_encryption_key(
  42. cls,
  43. device: BLEDevice,
  44. key_id: str,
  45. encryption_key: str,
  46. model: SwitchbotModel = SwitchbotModel.AIR_PURIFIER,
  47. **kwargs: Any,
  48. ) -> bool:
  49. return await super().verify_encryption_key(
  50. device, key_id, encryption_key, model, **kwargs
  51. )
  52. async def get_basic_info(self) -> dict[str, Any] | None:
  53. """Get device basic settings."""
  54. if not (_data := await self._get_basic_info()):
  55. return None
  56. _LOGGER.debug("data: %s", _data)
  57. isOn = bool(_data[2] & 0b10000000)
  58. version_info = (_data[2] & 0b00110000) >> 4
  59. _mode = _data[2] & 0b00000111
  60. isAqiValid = bool(_data[3] & 0b00000100)
  61. child_lock = bool(_data[3] & 0b00000010)
  62. _aqi_level = (_data[4] & 0b00000110) >> 1
  63. aqi_level = AirQualityLevel(_aqi_level).name.lower()
  64. speed = _data[6] & 0b01111111
  65. pm25 = struct.unpack("<H", _data[12:14])[0] & 0xFFF
  66. firmware = _data[15] / 10.0
  67. mode = get_air_purifier_mode(_mode, speed)
  68. return {
  69. "isOn": isOn,
  70. "version_info": version_info,
  71. "mode": mode,
  72. "isAqiValid": isAqiValid,
  73. "child_lock": child_lock,
  74. "aqi_level": aqi_level,
  75. "speed": speed,
  76. "pm25": pm25,
  77. "firmware": firmware,
  78. }
  79. async def _get_basic_info(self) -> bytes | None:
  80. """Return basic info of device."""
  81. _data = await self._send_command(
  82. key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
  83. )
  84. if _data in (b"\x07", b"\x00"):
  85. _LOGGER.error("Unsuccessful, please try again")
  86. return None
  87. return _data
  88. @update_after_operation
  89. async def set_preset_mode(self, preset_mode: str) -> bool:
  90. """Send command to set air purifier preset_mode."""
  91. result = await self._send_command(COMMAND_SET_MODE[preset_mode])
  92. return self._check_command_result(result, 0, {1})
  93. @update_after_operation
  94. async def turn_on(self) -> bool:
  95. """Turn on the air purifier."""
  96. result = await self._send_command(COMMAND_TURN_ON)
  97. return self._check_command_result(result, 0, {1})
  98. @update_after_operation
  99. async def turn_off(self) -> bool:
  100. """Turn off the air purifier."""
  101. result = await self._send_command(COMMAND_TURN_OFF)
  102. return self._check_command_result(result, 0, {1})
  103. def get_current_percentage(self) -> Any:
  104. """Return cached percentage."""
  105. return self._get_adv_value("speed")
  106. def is_on(self) -> bool | None:
  107. """Return air purifier state from cache."""
  108. return self._get_adv_value("isOn")
  109. def get_current_aqi_level(self) -> Any:
  110. """Return cached aqi level."""
  111. return self._get_adv_value("aqi_level")
  112. def get_current_pm25(self) -> Any:
  113. """Return cached pm25."""
  114. return self._get_adv_value("pm25")
  115. def get_current_mode(self) -> Any:
  116. """Return cached mode."""
  117. return self._get_adv_value("mode")