Browse Source

fix: guard remaining adv_parsers against short payloads (#496)

Bluetooth Devices Bot 4 ngày trước cách đây
mục cha
commit
ac01f2cd1f

+ 1 - 1
switchbot/adv_parsers/air_purifier.py

@@ -11,7 +11,7 @@ def process_air_purifier(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int]:
     """Process air purifier services data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 14:
         return {}
     device_data = mfr_data[6:]
 

+ 1 - 1
switchbot/adv_parsers/art_frame.py

@@ -9,7 +9,7 @@ def process_art_frame(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int | str]:
     """Process Art Frame data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 10:
         return {}
 
     _seq_num = mfr_data[6]

+ 2 - 2
switchbot/adv_parsers/blind_tilt.py

@@ -7,7 +7,7 @@ def process_woblindtilt(
     data: bytes | None, mfr_data: bytes | None, reverse: bool = False
 ) -> dict[str, bool | int]:
     """Process woBlindTilt services data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 10:
         return {}
 
     device_data = mfr_data[6:]
@@ -19,7 +19,7 @@ def process_woblindtilt(
 
     return {
         "calibration": _calibrated,
-        "battery": data[2] & 0b01111111 if data else None,
+        "battery": data[2] & 0b01111111 if data and len(data) >= 3 else None,
         "inMotion": _in_motion,
         "tilt": (100 - _tilt) if reverse else _tilt,
         "lightLevel": _light_level,

+ 1 - 1
switchbot/adv_parsers/bot.py

@@ -8,7 +8,7 @@ def process_wohand(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool
     if data is None and mfr_data is None:
         return {}
 
-    if data is None:
+    if data is None or len(data) < 3:
         return {
             "switchMode": None,
             "isOn": None,

+ 1 - 1
switchbot/adv_parsers/bulb.py

@@ -7,7 +7,7 @@ def process_color_bulb(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int]:
     """Process WoBulb services data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 11:
         return {}
     return {
         "sequence_number": mfr_data[6],

+ 1 - 1
switchbot/adv_parsers/ceiling_light.py

@@ -15,7 +15,7 @@ _LOGGER = logging.getLogger(__name__)
 
 def process_woceiling(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
     """Process WoCeiling services data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 11:
         return {}
     return {
         "sequence_number": mfr_data[6],

+ 1 - 1
switchbot/adv_parsers/climate_panel.py

@@ -9,7 +9,7 @@ def process_climate_panel(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int | str]:
     """Process Climate Panel data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 16:
         return {}
 
     seq_number = mfr_data[6]

+ 3 - 3
switchbot/adv_parsers/curtain.py

@@ -12,8 +12,8 @@ def process_wocurtain(
         battery_data = mfr_data[12]
     elif mfr_data and len(mfr_data) >= 11:
         device_data = mfr_data[8:11]
-        battery_data = data[2] if data else None
-    elif data:
+        battery_data = data[2] if data and len(data) >= 3 else None
+    elif data and len(data) >= 6:
         device_data = data[3:6]
         battery_data = data[2]
     else:
@@ -25,7 +25,7 @@ def process_wocurtain(
     _device_chain = device_data[1] & 0b00000111
 
     return {
-        "calibration": bool(data[1] & 0b01000000) if data else None,
+        "calibration": bool(data[1] & 0b01000000) if data and len(data) >= 2 else None,
         "battery": battery_data & 0b01111111 if battery_data is not None else None,
         "inMotion": _in_motion,
         "position": (100 - _position) if reverse else _position,

+ 1 - 1
switchbot/adv_parsers/fan.py

@@ -14,7 +14,7 @@ def _parse_fan(
     mfr_data: bytes | None, mode_map: dict[int, str]
 ) -> dict[str, bool | int | str | None]:
     """Shared fan advertisement parse, parameterized on the mode map."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 10:
         return {}
 
     device_data = mfr_data[6:]

+ 1 - 1
switchbot/adv_parsers/hub2.py

@@ -12,7 +12,7 @@ def process_wohub2(data: bytes | None, mfr_data: bytes | None) -> dict[str, Any]
     """Process woHub2 sensor manufacturer data."""
     temp_data = None
 
-    if mfr_data:
+    if mfr_data and len(mfr_data) >= 16:
         status = mfr_data[12]
         temp_data = mfr_data[13:16]
 

+ 1 - 1
switchbot/adv_parsers/hub3.py

@@ -10,7 +10,7 @@ from ..helpers import celsius_to_fahrenheit
 
 def process_hub3(data: bytes | None, mfr_data: bytes | None) -> dict[str, Any]:
     """Process hub3 sensor manufacturer data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 17:
         return {}
     device_data = mfr_data[6:]
 

+ 1 - 1
switchbot/adv_parsers/hubmini_matter.py

@@ -13,7 +13,7 @@ def process_hubmini_matter(
     """Process Hubmini matter sensor manufacturer data."""
     temp_data = None
 
-    if mfr_data:
+    if mfr_data and len(mfr_data) >= 16:
         temp_data = mfr_data[13:16]
 
     if not temp_data:

+ 2 - 2
switchbot/adv_parsers/humidifier.py

@@ -46,7 +46,7 @@ def process_wohumidifier(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int]:
     """Process WoHumi services data."""
-    if data is None:
+    if data is None or len(data) < 5:
         return {
             "isOn": None,
             "level": None,
@@ -64,7 +64,7 @@ def process_evaporative_humidifier(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int]:
     """Process WoHumi services data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 17:
         return {}
 
     seq_number = mfr_data[6]

+ 1 - 1
switchbot/adv_parsers/keypad.py

@@ -12,7 +12,7 @@ def process_wokeypad(
     mfr_data: bytes | None,
 ) -> dict[str, bool | int | None]:
     """Process woKeypad services data."""
-    if data is None or mfr_data is None:
+    if data is None or mfr_data is None or len(data) < 3 or len(mfr_data) < 7:
         return {"battery": None, "attempt_state": None}
 
     _LOGGER.debug("mfr_data: %s", mfr_data.hex())

+ 3 - 3
switchbot/adv_parsers/keypad_vision.py

@@ -7,7 +7,7 @@ _LOGGER = logging.getLogger(__name__)
 
 def process_common_mfr_data(mfr_data: bytes | None) -> dict[str, bool | int]:
     """Process common Keypad Vision (Pro) manufacturer data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 13:
         return {}
 
     sequence_number = mfr_data[6]
@@ -41,7 +41,7 @@ def process_keypad_vision(
     """Process Keypad Vision data."""
     result = process_common_mfr_data(mfr_data)
 
-    if not result:
+    if not result or len(mfr_data) < 14:
         return {}
 
     pir_triggered_level = mfr_data[13] & 0x03
@@ -63,7 +63,7 @@ def process_keypad_vision_pro(
     """Process Keypad Vision Pro data."""
     result = process_common_mfr_data(mfr_data)
 
-    if not result:
+    if not result or len(mfr_data) < 14:
         return {}
 
     radar_triggered_level = mfr_data[13] & 0x03

+ 3 - 3
switchbot/adv_parsers/light_strip.py

@@ -9,7 +9,7 @@ def process_wostrip(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int]:
     """Process WoStrip services data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 9:
         return {}
     return {
         "sequence_number": mfr_data[6],
@@ -25,7 +25,7 @@ def process_candle_warmer_lamp(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int]:
     """Process Candle Warmer Lamp services data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 9:
         return {}
     return {
         "sequence_number": mfr_data[6],
@@ -41,7 +41,7 @@ def process_light(
 ) -> dict[str, bool | int]:
     """Support for strip light 3 and floor lamp."""
     common_data = process_wostrip(data, mfr_data)
-    if not common_data:
+    if not common_data or len(mfr_data) < cw_offset + 2:
         return {}
 
     light_data = {"cw": _UNPACK_UINT16_BE(mfr_data, cw_offset)[0]}

+ 4 - 4
switchbot/adv_parsers/lock.py

@@ -12,7 +12,7 @@ _LOGGER = logging.getLogger(__name__)
 def process_wolock(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int]:
     """Support for lock and lock lite process data."""
     common_data = process_locklite(data, mfr_data)
-    if not common_data:
+    if not common_data or len(mfr_data) < 9:
         return {}
 
     common_data["door_open"] = bool(mfr_data[7] & 0b00000100)
@@ -26,7 +26,7 @@ def process_locklite(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int]:
     """Support for lock lite process data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 9:
         return {}
 
     _LOGGER.debug("mfr_data: %s", mfr_data.hex())
@@ -35,7 +35,7 @@ def process_locklite(
 
     return {
         "sequence_number": mfr_data[6],
-        "battery": data[2] & 0b01111111 if data else None,
+        "battery": data[2] & 0b01111111 if data and len(data) >= 3 else None,
         "calibration": bool(mfr_data[7] & 0b10000000),
         "status": LockStatus((mfr_data[7] & 0b01110000) >> 4),
         "update_from_secondary_lock": bool(mfr_data[7] & 0b00001000),
@@ -46,7 +46,7 @@ def process_locklite(
 
 
 def parse_common_data(mfr_data: bytes | None) -> dict[str, bool | int]:
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 12:
         return {}
 
     _LOGGER.debug("mfr_data: %s", mfr_data.hex())

+ 3 - 3
switchbot/adv_parsers/meter.py

@@ -19,11 +19,11 @@ def process_wosensorth(data: bytes | None, mfr_data: bytes | None) -> dict[str,
     temp_data: bytes | None = None
     battery: int | None = None
 
-    if mfr_data:
+    if mfr_data and len(mfr_data) >= 11:
         temp_data = mfr_data[8:11]
 
-    if data:
-        if not temp_data:
+    if data and len(data) >= 3:
+        if not temp_data and len(data) >= 6:
             temp_data = data[3:6]
         battery = data[2] & 0b01111111
 

+ 2 - 1
switchbot/adv_parsers/motion.py

@@ -16,8 +16,9 @@ def process_wopresence(
     sense_distance = None
     light_intensity = None
     is_light = None
+    motion_detected = None
 
-    if data:
+    if data and len(data) >= 6:
         tested = bool(data[1] & 0b10000000)
         motion_detected = bool(data[1] & 0b01000000)
         battery = data[2] & 0b01111111

+ 1 - 1
switchbot/adv_parsers/plug.py

@@ -9,7 +9,7 @@ def process_woplugmini(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int]:
     """Process plug mini."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 12:
         return {}
     return {
         "switchMode": True,

+ 1 - 1
switchbot/adv_parsers/remote.py

@@ -11,7 +11,7 @@ def process_woremote(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, int | None]:
     """Process WoRemote adv data."""
-    if data is None:
+    if data is None or len(data) < 3:
         return {
             "battery": None,
         }

+ 2 - 2
switchbot/adv_parsers/roller_shade.py

@@ -7,7 +7,7 @@ def process_worollershade(
     data: bytes | None, mfr_data: bytes | None, reverse: bool = True
 ) -> dict[str, bool | int]:
     """Process woRollerShade services data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 10:
         return {}
 
     device_data = mfr_data[6:]
@@ -20,7 +20,7 @@ def process_worollershade(
 
     return {
         "calibration": _calibrated,
-        "battery": data[2] & 0b01111111 if data else None,
+        "battery": data[2] & 0b01111111 if data and len(data) >= 3 else None,
         "inMotion": _in_motion,
         "position": (100 - _position) if reverse else _position,
         "lightLevel": _light_level,

+ 1 - 1
switchbot/adv_parsers/smart_thermostat_radiator.py

@@ -11,7 +11,7 @@ def process_smart_thermostat_radiator(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int | str]:
     """Process Smart Thermostat Radiator data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 13:
         return {}
 
     _seq_num = mfr_data[6]

+ 2 - 2
switchbot/adv_parsers/vacuum.py

@@ -12,7 +12,7 @@ def process_vacuum(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int | str]:
     """Support for s10, k10+ pro combo, k20 process service data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 14:
         return {}
 
     _seq_num = mfr_data[6]
@@ -48,7 +48,7 @@ def process_vacuum_k(
     data: bytes | None, mfr_data: bytes | None
 ) -> dict[str, bool | int | str]:
     """Support for k10+, k10+ pro process service data."""
-    if mfr_data is None:
+    if mfr_data is None or len(mfr_data) < 9:
         return {}
 
     _seq_num = mfr_data[6]

+ 202 - 0
tests/test_short_payload_guards.py

@@ -0,0 +1,202 @@
+"""
+Regression tests: parsers must not raise on short payloads.
+
+The dispatcher in `switchbot/adv_parser.py` does not pre-validate
+`mfr_data` / `data` length before invoking parsers. A malformed BLE
+advertisement (untrusted but range-limited input) should degrade to
+an empty dict / None values, not raise an `IndexError` / `ValueError`
+that bubbles up to callers.
+"""
+
+from __future__ import annotations
+
+import pytest
+
+from switchbot.adv_parsers.air_purifier import process_air_purifier
+from switchbot.adv_parsers.art_frame import process_art_frame
+from switchbot.adv_parsers.blind_tilt import process_woblindtilt
+from switchbot.adv_parsers.bot import process_wohand
+from switchbot.adv_parsers.bulb import process_color_bulb
+from switchbot.adv_parsers.ceiling_light import process_woceiling
+from switchbot.adv_parsers.climate_panel import process_climate_panel
+from switchbot.adv_parsers.curtain import process_wocurtain
+from switchbot.adv_parsers.fan import process_fan, process_standing_fan
+from switchbot.adv_parsers.hub2 import process_wohub2
+from switchbot.adv_parsers.hub3 import process_hub3
+from switchbot.adv_parsers.hubmini_matter import process_hubmini_matter
+from switchbot.adv_parsers.humidifier import (
+    process_evaporative_humidifier,
+    process_wohumidifier,
+)
+from switchbot.adv_parsers.keypad import process_wokeypad
+from switchbot.adv_parsers.keypad_vision import (
+    process_keypad_vision,
+    process_keypad_vision_pro,
+)
+from switchbot.adv_parsers.light_strip import (
+    process_candle_warmer_lamp,
+    process_light,
+    process_rgbic_light,
+    process_wostrip,
+)
+from switchbot.adv_parsers.lock import (
+    parse_common_data,
+    process_lock2,
+    process_locklite,
+    process_wolock,
+    process_wolock_pro,
+)
+from switchbot.adv_parsers.meter import process_wosensorth
+from switchbot.adv_parsers.motion import process_wopresence
+from switchbot.adv_parsers.plug import process_woplugmini
+from switchbot.adv_parsers.remote import process_woremote
+from switchbot.adv_parsers.roller_shade import process_worollershade
+from switchbot.adv_parsers.smart_thermostat_radiator import (
+    process_smart_thermostat_radiator,
+)
+from switchbot.adv_parsers.vacuum import process_vacuum, process_vacuum_k
+
+EMPTY = b""
+SHORT = b"\x00" * 4
+
+
+@pytest.mark.parametrize(
+    "parser",
+    [
+        process_air_purifier,
+        process_art_frame,
+        process_woblindtilt,
+        process_color_bulb,
+        process_woceiling,
+        process_climate_panel,
+        process_wocurtain,
+        process_fan,
+        process_standing_fan,
+        process_wohub2,
+        process_hub3,
+        process_hubmini_matter,
+        process_evaporative_humidifier,
+        process_keypad_vision,
+        process_keypad_vision_pro,
+        process_wostrip,
+        process_candle_warmer_lamp,
+        process_woplugmini,
+        process_worollershade,
+        process_smart_thermostat_radiator,
+        process_vacuum,
+        process_vacuum_k,
+    ],
+)
+@pytest.mark.parametrize("payload", [None, EMPTY, SHORT])
+def test_mfr_only_parsers_return_empty_on_short(parser, payload):
+    """Parsers that read only mfr_data must return {} for short payloads."""
+    assert parser(None, payload) == {}
+
+
+@pytest.mark.parametrize("payload", [None, EMPTY, SHORT, b"\x00" * 17])
+def test_process_light_short_payload(payload):
+    """process_light needs cw_offset + 2 bytes (default 18)."""
+    assert process_light(None, payload) == {}
+
+
+@pytest.mark.parametrize("payload", [None, EMPTY, SHORT, b"\x00" * 11])
+def test_process_rgbic_light_short_payload(payload):
+    """process_rgbic_light uses cw_offset=10, so needs >= 12 bytes."""
+    assert process_rgbic_light(None, payload) == {}
+
+
+@pytest.mark.parametrize(
+    ("data", "mfr_data"),
+    [
+        (None, None),
+        (EMPTY, None),
+        (b"\x00\x00", None),
+    ],
+)
+def test_process_wohand_short_data(data, mfr_data):
+    """process_wohand must not crash on short `data`."""
+    out = process_wohand(data, mfr_data)
+    # Either an empty dict (both None) or all-None values
+    assert "isOn" in out or out == {}
+    assert out.get("battery") in (None, *out.values())
+
+
+@pytest.mark.parametrize("data", [None, EMPTY, b"\x00\x00"])
+def test_process_woremote_short_data(data):
+    out = process_woremote(data, None)
+    assert out == {"battery": None}
+
+
+@pytest.mark.parametrize(
+    ("data", "mfr_data"),
+    [
+        (None, None),
+        (b"\x00", None),
+        (None, b"\x00"),
+        (b"\x00\x00", b"\x00\x00"),
+    ],
+)
+def test_process_wokeypad_short(data, mfr_data):
+    out = process_wokeypad(data, mfr_data)
+    assert out == {"battery": None, "attempt_state": None}
+
+
+@pytest.mark.parametrize("data", [None, EMPTY, b"\x00\x00\x00"])
+def test_process_wohumidifier_short_data(data):
+    out = process_wohumidifier(data, None)
+    assert out == {"isOn": None, "level": None, "switchMode": True}
+
+
+@pytest.mark.parametrize("data", [None, EMPTY, b"\x00\x00\x00"])
+@pytest.mark.parametrize("mfr_data", [None, EMPTY, b"\x00\x00\x00"])
+def test_process_wosensorth_short(data, mfr_data):
+    """process_wosensorth must not crash; returns {} when no usable payload."""
+    out = process_wosensorth(data, mfr_data)
+    assert isinstance(out, dict)
+
+
+@pytest.mark.parametrize("data", [None, EMPTY, b"\x00\x00\x00"])
+@pytest.mark.parametrize("mfr_data", [None, EMPTY, b"\x00\x00\x00"])
+def test_process_wopresence_short(data, mfr_data):
+    """process_wopresence must not crash even when both inputs are short."""
+    out = process_wopresence(data, mfr_data)
+    assert isinstance(out, dict)
+
+
+@pytest.mark.parametrize("mfr_data", [None, EMPTY, SHORT])
+def test_lock_parsers_short_mfr(mfr_data):
+    assert process_locklite(None, mfr_data) == {}
+    assert process_wolock(None, mfr_data) == {}
+    assert parse_common_data(mfr_data) == {}
+    assert process_wolock_pro(None, mfr_data) == {}
+    assert process_lock2(None, mfr_data) == {}
+
+
+@pytest.mark.parametrize("data", [None, EMPTY, b"\x00\x00"])
+def test_blind_tilt_short_data(data):
+    """blind_tilt with full mfr but short data must not crash on data[2]."""
+    mfr = b"\x00" * 10
+    out = process_woblindtilt(data, mfr)
+    assert out["battery"] is None
+
+
+@pytest.mark.parametrize("data", [None, EMPTY, b"\x00\x00"])
+def test_curtain_short_data_with_long_mfr(data):
+    """Curtain >=11 path uses data[2] for battery; must not crash on short data."""
+    mfr = b"\x00" * 11
+    out = process_wocurtain(data, mfr)
+    assert out["battery"] is None
+
+
+@pytest.mark.parametrize("data", [b"\x00\x00\x00\x00", b"\x00" * 5])
+def test_curtain_short_data_only(data):
+    """Curtain data-only path needs len >= 6; shorter -> {}."""
+    assert process_wocurtain(data, None) == {}
+
+
+@pytest.mark.parametrize("data", [None, EMPTY, b"\x00\x00"])
+def test_roller_shade_short_data(data):
+    """roller_shade with short data must not crash on data[2]."""
+    mfr = b"\x00" * 10
+    out = process_worollershade(data, mfr)
+    assert out["battery"] is None