Browse Source

Improve handling of passive only data when active scans are not available (#153)

J. Nick Koston 1 year ago
parent
commit
1451c615d2

+ 71 - 12
switchbot/adv_parser.py

@@ -29,6 +29,7 @@ SERVICE_DATA_ORDER = (
     "0000fd3d-0000-1000-8000-00805f9b34fb",
     "00000d00-0000-1000-8000-00805f9b34fb",
 )
+MFR_DATA_ORDER = (2409, 89)
 
 
 class SwitchbotSupportedType(TypedDict):
@@ -44,11 +45,15 @@ SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
         "modelName": SwitchbotModel.CONTACT_SENSOR,
         "modelFriendlyName": "Contact Sensor",
         "func": process_wocontact,
+        "manufacturer_id": 2409,
+        "manufacturer_data_length": 13,
     },
     "H": {
         "modelName": SwitchbotModel.BOT,
         "modelFriendlyName": "Bot",
         "func": process_wohand,
+        "service_uuids": {"cba20d00-224d-11e6-9fb8-0002a5d5c51b"},
+        "manufacturer_id": 89,
     },
     "s": {
         "modelName": SwitchbotModel.MOTION_SENSOR,
@@ -64,6 +69,12 @@ SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
         "modelName": SwitchbotModel.CURTAIN,
         "modelFriendlyName": "Curtain",
         "func": process_wocurtain,
+        "service_uuids": {
+            "00001800-0000-1000-8000-00805f9b34fb",
+            "00001801-0000-1000-8000-00805f9b34fb",
+            "cba20d00-224d-11e6-9fb8-0002a5d5c51b",
+        },
+        "manufacturer_id": 89,
     },
     "T": {
         "modelName": SwitchbotModel.METER,
@@ -107,50 +118,98 @@ SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
     },
 }
 
+_SWITCHBOT_MODEL_TO_CHAR = {
+    model_data["modelName"]: model_chr
+    for model_chr, model_data in SUPPORTED_TYPES.items()
+}
+
+MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = {
+    mfr_id: [] for mfr_id in MFR_DATA_ORDER
+}
+for model_chr, model in SUPPORTED_TYPES.items():
+    if "manufacturer_id" in model:
+        mfr_id = model["manufacturer_id"]
+        MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model))
+
 
 def parse_advertisement_data(
-    device: BLEDevice, advertisement_data: AdvertisementData
+    device: BLEDevice,
+    advertisement_data: AdvertisementData,
+    model: SwitchbotModel | None = None,
 ) -> SwitchBotAdvertisement | None:
     """Parse advertisement data."""
-    _mgr_datas = list(advertisement_data.manufacturer_data.values())
     service_data = advertisement_data.service_data
 
-    if not service_data:
-        return None
-
     _service_data = None
     for uuid in SERVICE_DATA_ORDER:
         if uuid in service_data:
             _service_data = service_data[uuid]
             break
 
-    if not _service_data:
-        return None
+    _mfr_data = None
+    _mfr_id = None
+    for mfr_id in MFR_DATA_ORDER:
+        if mfr_id in advertisement_data.manufacturer_data:
+            _mfr_id = mfr_id
+            _mfr_data = advertisement_data.manufacturer_data[mfr_id]
+            break
 
-    _mfr_data = _mgr_datas[0] if _mgr_datas else None
+    if _mfr_data is None and _service_data is None:
+        return None
 
     try:
-        data = _parse_data(_service_data, _mfr_data)
+        data = _parse_data(
+            "".join(sorted(advertisement_data.service_uuids)),
+            _service_data,
+            _mfr_data,
+            _mfr_id,
+            model,
+        )
     except Exception as err:  # pylint: disable=broad-except
         _LOGGER.exception(
             "Failed to parse advertisement data: %s: %s", advertisement_data, err
         )
         return None
 
+    if not data:
+        return None
+
     return SwitchBotAdvertisement(device.address, data, device, advertisement_data.rssi)
 
 
 @lru_cache(maxsize=128)
 def _parse_data(
-    _service_data: bytes, _mfr_data: bytes | None
+    _service_uuids_str: str,
+    _service_data: bytes | None,
+    _mfr_data: bytes | None,
+    _mfr_id: int | None = None,
+    _switchbot_model: SwitchbotModel | None = None,
 ) -> SwitchBotAdvertisement | None:
     """Parse advertisement data."""
-    _model = chr(_service_data[0] & 0b01111111)
+    _model = chr(_service_data[0] & 0b01111111) if _service_data else None
+
+    if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA:
+        _service_uuids = set(_service_uuids_str.split(","))
+        for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]:
+            if model_data.get("manufacturer_data_length") == len(_mfr_data):
+                _model = model_chr
+                break
+            if model_data.get("service_uuids", set()).intersection(_service_uuids):
+                _model = model_chr
+                break
+
+    if not _model and _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR:
+        _model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model]
+
+    if not _model:
+        return None
+
+    _isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False
     data = {
         "rawAdvData": _service_data,
         "data": {},
         "model": _model,
-        "isEncrypted": bool(_service_data[0] & 0b10000000),
+        "isEncrypted": _isEncrypted,
     }
 
     type_data = SUPPORTED_TYPES.get(_model)

+ 11 - 1
switchbot/adv_parsers/bot.py

@@ -2,8 +2,18 @@
 from __future__ import annotations
 
 
-def process_wohand(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+def process_wohand(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int]:
     """Process woHand/Bot services data."""
+    if data is None and mfr_data is None:
+        return {}
+
+    if data is None:
+        return {
+            "switchMode": None,
+            "isOn": None,
+            "battery": None,
+        }
+
     _switch_mode = bool(data[1] & 0b10000000)
 
     return {

+ 3 - 1
switchbot/adv_parsers/bulb.py

@@ -2,7 +2,9 @@
 from __future__ import annotations
 
 
-def process_color_bulb(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+def process_color_bulb(
+    data: bytes | None, mfr_data: bytes | None
+) -> dict[str, bool | int]:
     """Process WoBulb services data."""
     if mfr_data is None:
         return {}

+ 9 - 4
switchbot/adv_parsers/contact.py

@@ -2,11 +2,16 @@
 from __future__ import annotations
 
 
-def process_wocontact(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+def process_wocontact(
+    data: bytes | None, mfr_data: bytes | None
+) -> dict[str, bool | int]:
     """Process woContact Sensor services data."""
-    battery = data[2] & 0b01111111
-    tested = bool(data[1] & 0b10000000)
-    contact_timeout = data[3] & 0b00000100 == 0b00000100
+    if data is None and mfr_data is None:
+        return {}
+
+    battery = data[2] & 0b01111111 if data else None
+    tested = bool(data[1] & 0b10000000) if data else None
+    contact_timeout = data[3] & 0b00000100 == 0b00000100 if data else False
 
     if mfr_data and len(mfr_data) >= 13:
         motion_detected = bool(mfr_data[7] & 0b10000000)

+ 3 - 1
switchbot/adv_parsers/curtain.py

@@ -3,9 +3,11 @@ from __future__ import annotations
 
 
 def process_wocurtain(
-    data: bytes, mfr_data: bytes | None, reverse: bool = True
+    data: bytes | None, mfr_data: bytes | None, reverse: bool = True
 ) -> dict[str, bool | int]:
     """Process woCurtain/Curtain services data."""
+    if data is None:
+        return {}
 
     _position = max(min(data[3] & 0b01111111, 100), 0)
 

+ 4 - 2
switchbot/adv_parsers/humidifier.py

@@ -13,9 +13,11 @@ _LOGGER = logging.getLogger(__name__)
 # Low:  658000c5222b6300
 # Med:  658000c5432b6300
 # High: 658000c5642b6300
-def process_wohumidifier(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+def process_wohumidifier(
+    data: bytes | None, mfr_data: bytes | None
+) -> dict[str, bool | int]:
     """Process WoHumi services data."""
-    if mfr_data is None:
+    if data is None:
         return {}
     _LOGGER.debug("mfr_data: %s", mfr_data.hex())
     _LOGGER.debug("data: %s", data.hex())

+ 3 - 1
switchbot/adv_parsers/light_strip.py

@@ -2,7 +2,9 @@
 from __future__ import annotations
 
 
-def process_wostrip(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+def process_wostrip(
+    data: bytes | None, mfr_data: bytes | None
+) -> dict[str, bool | int]:
     """Process WoStrip services data."""
     if mfr_data is None:
         return {}

+ 2 - 2
switchbot/adv_parsers/lock.py

@@ -8,7 +8,7 @@ from ..const import LockStatus
 _LOGGER = logging.getLogger(__name__)
 
 
-def process_wolock(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+def process_wolock(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int]:
     """Process woLock services data."""
     if mfr_data is None:
         return {}
@@ -17,7 +17,7 @@ def process_wolock(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]
     _LOGGER.debug("data: %s", data.hex())
 
     return {
-        "battery": data[2] & 0b01111111,
+        "battery": data[2] & 0b01111111 if data else None,
         "calibration": bool(mfr_data[7] & 0b10000000),
         "status": LockStatus(mfr_data[7] & 0b01110000),
         "update_from_secondary_lock": bool(mfr_data[7] & 0b00001000),

+ 3 - 1
switchbot/adv_parsers/meter.py

@@ -4,8 +4,10 @@ from __future__ import annotations
 from typing import Any
 
 
-def process_wosensorth(data: bytes, mfr_data: bytes | None) -> dict[str, Any]:
+def process_wosensorth(data: bytes | None, mfr_data: bytes | None) -> dict[str, Any]:
     """Process woSensorTH/Temp sensor services data."""
+    if data is None:
+        return {}
 
     _temp_sign = 1 if data[4] & 0b10000000 else -1
     _temp_c = _temp_sign * ((data[4] & 0b01111111) + ((data[3] & 0b00001111) / 10))

+ 5 - 1
switchbot/adv_parsers/motion.py

@@ -2,8 +2,12 @@
 from __future__ import annotations
 
 
-def process_wopresence(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+def process_wopresence(
+    data: bytes | None, mfr_data: bytes | None
+) -> dict[str, bool | int]:
     """Process WoPresence Sensor services data."""
+    if data is None:
+        return {}
     return {
         "tested": bool(data[1] & 0b10000000),
         "motion_detected": bool(data[1] & 0b01000000),

+ 3 - 1
switchbot/adv_parsers/plug.py

@@ -2,7 +2,9 @@
 from __future__ import annotations
 
 
-def process_woplugmini(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+def process_woplugmini(
+    data: bytes | None, mfr_data: bytes | None
+) -> dict[str, bool | int]:
     """Process plug mini."""
     if mfr_data is None:
         return {}

+ 62 - 0
tests/test_adv_parser.py

@@ -445,6 +445,39 @@ def test_contact_sensor_mfr():
     )
 
 
+def test_contact_sensor_mfr_no_service_data():
+    """Test contact sensor with passive data only."""
+    ble_device = BLEDevice("aa:bb:cc:dd:ee:ff", "any")
+    adv_data = generate_advertisement_data(
+        manufacturer_data={2409: b"\xcb9\xcd\xc4=FA,\x00F\x01\x8f\xc4"},
+        service_data={},
+        tx_power=-127,
+        rssi=-70,
+    )
+    result = parse_advertisement_data(ble_device, adv_data)
+    assert result == SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "data": {
+                "battery": None,
+                "button_count": 4,
+                "contact_open": False,
+                "contact_timeout": False,
+                "is_light": False,
+                "motion_detected": False,
+                "tested": None,
+            },
+            "isEncrypted": False,
+            "model": "d",
+            "modelFriendlyName": "Contact Sensor",
+            "modelName": SwitchbotModel.CONTACT_SENSOR,
+            "rawAdvData": None,
+        },
+        device=ble_device,
+        rssi=-70,
+    )
+
+
 def test_contact_sensor_srv():
     """Test parsing adv data from new bot firmware."""
     ble_device = BLEDevice("aa:bb:cc:dd:ee:ff", "any")
@@ -547,3 +580,32 @@ def test_contact_sensor_closed():
         device=ble_device,
         rssi=-50,
     )
+
+
+def test_switchbot_passive():
+    """Test parsing switchbot as passive."""
+    ble_device = BLEDevice("aa:bb:cc:dd:ee:ff", "any")
+    adv_data = generate_advertisement_data(
+        manufacturer_data={89: bytes.fromhex("d51cfb397856")},
+        service_data={},
+        tx_power=-127,
+        rssi=-50,
+    )
+    result = parse_advertisement_data(ble_device, adv_data, SwitchbotModel.BOT)
+    assert result == SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "data": {
+                "battery": None,
+                "switchMode": None,
+                "isOn": None,
+            },
+            "isEncrypted": False,
+            "model": "H",
+            "modelFriendlyName": "Bot",
+            "modelName": SwitchbotModel.BOT,
+            "rawAdvData": None,
+        },
+        device=ble_device,
+        rssi=-50,
+    )