Pārlūkot izejas kodu

add support for vacuum

zerzhang 1 mēnesi atpakaļ
vecāks
revīzija
dc5f269451

+ 2 - 0
switchbot/__init__.py

@@ -32,6 +32,7 @@ from .devices.lock import SwitchbotLock
 from .devices.plug import SwitchbotPlugMini
 from .devices.relay_switch import SwitchbotRelaySwitch
 from .devices.roller_shade import SwitchbotRollerShade
+from .devices.vacuum import SwitchbotVacuum
 from .discovery import GetSwitchbotDevices
 from .models import SwitchBotAdvertisement
 
@@ -64,6 +65,7 @@ __all__ = [
     "SwitchbotPlugMini",
     "SwitchbotRelaySwitch",
     "SwitchbotRollerShade",
+    "SwitchbotVacuum",
     "SwitchbotSupportedType",
     "SwitchbotSupportedType",
     "close_stale_connections",

+ 31 - 0
switchbot/adv_parser.py

@@ -33,6 +33,7 @@ from .adv_parsers.relay_switch import (
 )
 from .adv_parsers.remote import process_woremote
 from .adv_parsers.roller_shade import process_worollershade
+from .adv_parsers.vacuum import process_vacuum, process_vacuum_k
 from .const import SwitchbotModel
 from .models import SwitchBotAdvertisement
 
@@ -237,6 +238,36 @@ SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
         "func": process_fan,
         "manufacturer_id": 2409,
     },
+    ".": {
+        "modelName": SwitchbotModel.K20_VACUUM,
+        "modelFriendlyName": "K20 Vacuum",
+        "func": process_vacuum,
+        "manufacturer_id": 2409,
+    },
+    "z": {
+        "modelName": SwitchbotModel.S10_VACUUM,
+        "modelFriendlyName": "S10 Vacuum",
+        "func": process_vacuum,
+        "manufacturer_id": 2409,
+    },
+    "3": {
+        "modelName": SwitchbotModel.K10_PRO_COMBO_VACUUM,
+        "modelFriendlyName": "K10+ Pro Combo",
+        "func": process_vacuum,
+        "manufacturer_id": 2409,
+    },
+    "}": {
+        "modelName": SwitchbotModel.K10_VACUUM,
+        "modelFriendlyName": "K10 Vacuum",
+        "func": process_vacuum_k,
+        "manufacturer_id": 2409,
+    },
+    "(": {
+        "modelName": SwitchbotModel.K10_PRO_VACUUM,
+        "modelFriendlyName": "K10+ Pro Vacuum",
+        "func": process_vacuum_k,
+        "manufacturer_id": 2409,
+    },        
 }
 
 _SWITCHBOT_MODEL_TO_CHAR = {

+ 58 - 0
switchbot/adv_parsers/vacuum.py

@@ -0,0 +1,58 @@
+"""Vacuum parser."""
+
+from __future__ import annotations
+
+import struct
+
+def process_vacuum(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int | str]:
+    """Support for s10, k10+ pro combo, k20 process service data."""
+
+    if mfr_data is None:
+        return {}
+
+    _seq_num = mfr_data[6]
+    _soc_version = get_device_fw_version(mfr_data[8:11])
+    # Steps at the end of the last network configuration
+    _step = mfr_data[11] & 0b00001111
+    _mqtt_connected = bool(mfr_data[11] & 0b00010000)
+    _battery = mfr_data[12]
+    _work_status = mfr_data[13] & 0b00111111
+
+    return {
+        "sequence_number": _seq_num,
+        "soc_version": _soc_version,
+        "step": _step,
+        "mqtt_connected": _mqtt_connected,
+        "battery": _battery,
+        "work_status": _work_status,
+    }
+
+def get_device_fw_version(version_bytes: bytes) -> str | None:
+    version1 = version_bytes[0] & 0x0f
+    version2 = version_bytes[0] >> 4
+    version3 = struct.unpack('<H', version_bytes[1:])[0]
+    return f'{version1}.{version2}.{version3:>03d}'
+
+
+
+def process_vacuum_k(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int | str]:
+    """Support for k10+, k10+ pro process service data."""
+
+    if mfr_data is None:
+        return {}
+
+    _seq_num = mfr_data[6]
+    _dustbin_bound = bool(mfr_data[7] & 0b10000000)
+    _dusbin_connected = bool(mfr_data[7] & 0b01000000)
+    _network_conncted = bool(mfr_data[7] & 0b00100000)
+    _work_status = (mfr_data[7] & 0b00010000) >> 4
+    _battery = mfr_data[8] & 0b01111111
+
+    return {
+        "sequence_number": _seq_num,
+        "dustbin_bound": _dustbin_bound,
+        "dusbin_connected": _dusbin_connected,
+        "network_conncted": _network_conncted,
+        "work_status": _work_status,
+        "battery": _battery
+    }

+ 5 - 0
switchbot/const/__init__.py

@@ -67,3 +67,8 @@ class SwitchbotModel(StrEnum):
     ROLLER_SHADE = "Roller Shade"
     HUBMINI_MATTER = "HubMini Matter"
     CIRCULATOR_FAN = "Circulator Fan"
+    K20_VACUUM = "K20 Vacuum"
+    S10_VACUUM = "S10 Vacuum"
+    K10_VACUUM = "K10 Vacuum"
+    K10_PRO_VACUUM = "K10+ Pro Vacuum"
+    K10_PRO_COMBO_VACUUM = "K10+ Pro Combo Vacuum"

+ 69 - 0
switchbot/devices/vacuum.py

@@ -0,0 +1,69 @@
+"""Library to handle connection with Switchbot."""
+
+from __future__ import annotations
+
+from .device import SwitchbotSequenceDevice, update_after_operation
+
+COMMMAND_HEAD = "5A40010101"
+COMMAND_RETURN_DOCK = F"{COMMMAND_HEAD}0225"
+
+COMMAND_CLEAN_UP = {
+    1: "570F5A00FFFF7001",
+    2: "5A400101010126",
+}
+COMMAND_RETURN_DOCK = {
+    1: "570F5A00FFFF7002",
+    2: "5A400101010225",    
+}
+
+class SwitchbotVacuum(SwitchbotSequenceDevice):
+    """Representation of a Switchbot Vacuum."""
+
+    def __init__(self, device, password=None, interface=0, **kwargs):
+        super().__init__(device, password, interface, **kwargs)
+
+    @update_after_operation
+    async def clean_up(self, protocol_version: int) -> bool:
+        """Send command to perform a spot clean-up."""
+        return await self._send_command(COMMAND_CLEAN_UP[protocol_version])
+    
+    @update_after_operation
+    async def return_to_dock(self, protocol_version: int) -> bool:
+        """Send command to return the dock."""
+        return await self._send_command(COMMAND_RETURN_DOCK[protocol_version])
+
+    def get_ble_version(self) -> int:
+        """Return device ble version."""
+        return self._get_adv_value("firmware")
+    
+    def get_soc_version(self) -> str:
+        """Return device soc version."""
+        return self._get_adv_value("soc_version")
+
+    def get_last_step(self) -> int:
+        """Return device last step after network configuration."""
+        return self._get_adv_value("step")
+    
+    def get_mqtt_connnect_status(self) -> bool:
+        """Return device mqtt connect status."""
+        return self._get_adv_value("mqtt_connected")
+    
+    def get_battery(self) -> int:
+        """Return device battey."""
+        return self._get_adv_value("battery")
+    
+    def get_work_status(self) -> int:
+        """Return device work status."""
+        return self._get_adv_value("work_status")
+
+    def get_dustbin_bound_status(self) -> bool:
+        """Return the dustbin bound status"""
+        return self._get_adv_value("dustbin_bound")
+    
+    def get_dustbin_connnected_status(self) -> bool:
+        """Return the dustbin connected status"""
+        return self._get_adv_value("dusbin_connected")
+
+    def get_network_connected_status(self) -> bool:
+        """Return the network conncted status"""
+        return self._get_adv_value("network_conncted")