air_purifier.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import logging
  4. import struct
  5. from typing import Any
  6. from bleak.backends.device import BLEDevice
  7. from ..adv_parsers.air_purifier import get_air_purifier_mode
  8. from ..const import SwitchbotModel
  9. from ..const.air_purifier import AirPurifierMode, AirQualityLevel
  10. from .device import (
  11. SwitchbotEncryptedDevice,
  12. SwitchbotSequenceDevice,
  13. update_after_operation,
  14. )
  15. _LOGGER = logging.getLogger(__name__)
  16. COMMAND_HEAD = "570f4c"
  17. COMMAND_SET_MODE = {
  18. AirPurifierMode.LEVEL_1.name.lower(): f"{COMMAND_HEAD}01010100",
  19. AirPurifierMode.LEVEL_2.name.lower(): f"{COMMAND_HEAD}01010132",
  20. AirPurifierMode.LEVEL_3.name.lower(): f"{COMMAND_HEAD}01010164",
  21. AirPurifierMode.AUTO.name.lower(): f"{COMMAND_HEAD}01010200",
  22. AirPurifierMode.SLEEP.name.lower(): f"{COMMAND_HEAD}01010300",
  23. AirPurifierMode.PET.name.lower(): f"{COMMAND_HEAD}01010400",
  24. }
  25. DEVICE_GET_BASIC_SETTINGS_KEY = "570f4d81"
  26. class SwitchbotAirPurifier(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
  27. """Representation of a Switchbot Air Purifier."""
  28. _turn_on_command = f"{COMMAND_HEAD}010100"
  29. _turn_off_command = f"{COMMAND_HEAD}010000"
  30. def __init__(
  31. self,
  32. device: BLEDevice,
  33. key_id: str,
  34. encryption_key: str,
  35. interface: int = 0,
  36. model: SwitchbotModel = SwitchbotModel.AIR_PURIFIER,
  37. **kwargs: Any,
  38. ) -> None:
  39. super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
  40. @classmethod
  41. async def verify_encryption_key(
  42. cls,
  43. device: BLEDevice,
  44. key_id: str,
  45. encryption_key: str,
  46. model: SwitchbotModel = SwitchbotModel.AIR_PURIFIER,
  47. **kwargs: Any,
  48. ) -> bool:
  49. return await super().verify_encryption_key(
  50. device, key_id, encryption_key, model, **kwargs
  51. )
  52. async def get_basic_info(self) -> dict[str, Any] | None:
  53. """Get device basic settings."""
  54. if not (_data := await self._get_basic_info()):
  55. return None
  56. _LOGGER.debug("data: %s", _data)
  57. isOn = bool(_data[2] & 0b10000000)
  58. version_info = (_data[2] & 0b00110000) >> 4
  59. _mode = _data[2] & 0b00000111
  60. isAqiValid = bool(_data[3] & 0b00000100)
  61. child_lock = bool(_data[3] & 0b00000010)
  62. _aqi_level = (_data[4] & 0b00000110) >> 1
  63. aqi_level = AirQualityLevel(_aqi_level).name.lower()
  64. speed = _data[6] & 0b01111111
  65. pm25 = struct.unpack("<H", _data[12:14])[0] & 0xFFF
  66. firmware = _data[15] / 10.0
  67. mode = get_air_purifier_mode(_mode, speed)
  68. return {
  69. "isOn": isOn,
  70. "version_info": version_info,
  71. "mode": mode,
  72. "isAqiValid": isAqiValid,
  73. "child_lock": child_lock,
  74. "aqi_level": aqi_level,
  75. "speed": speed,
  76. "pm25": pm25,
  77. "firmware": firmware,
  78. }
  79. async def _get_basic_info(self) -> bytes | None:
  80. """Return basic info of device."""
  81. _data = await self._send_command(
  82. key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
  83. )
  84. if _data in (b"\x07", b"\x00"):
  85. _LOGGER.error("Unsuccessful, please try again")
  86. return None
  87. return _data
  88. @update_after_operation
  89. async def set_preset_mode(self, preset_mode: str) -> bool:
  90. """Send command to set air purifier preset_mode."""
  91. result = await self._send_command(COMMAND_SET_MODE[preset_mode])
  92. return self._check_command_result(result, 0, {1})
  93. def get_current_percentage(self) -> Any:
  94. """Return cached percentage."""
  95. return self._get_adv_value("speed")
  96. def is_on(self) -> bool | None:
  97. """Return air purifier state from cache."""
  98. return self._get_adv_value("isOn")
  99. def get_current_aqi_level(self) -> Any:
  100. """Return cached aqi level."""
  101. return self._get_adv_value("aqi_level")
  102. def get_current_pm25(self) -> Any:
  103. """Return cached pm25."""
  104. return self._get_adv_value("pm25")
  105. def get_current_mode(self) -> Any:
  106. """Return cached mode."""
  107. return self._get_adv_value("mode")