Browse Source

Control datetime on SwitchBot Meter Pro CO2 Monitor (#433)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
elgris 12 hours ago
parent
commit
9e62748b2c
3 changed files with 423 additions and 0 deletions
  1. 2 0
      switchbot/__init__.py
  2. 172 0
      switchbot/devices/meter_pro.py
  3. 249 0
      tests/test_meter_pro.py

+ 2 - 0
switchbot/__init__.py

@@ -52,6 +52,7 @@ from .devices.light_strip import (
     SwitchbotStripLight3,
 )
 from .devices.lock import SwitchbotLock
+from .devices.meter_pro import SwitchbotMeterProCO2
 from .devices.plug import SwitchbotPlugMini
 from .devices.relay_switch import (
     SwitchbotGarageDoorOpener,
@@ -101,6 +102,7 @@ __all__ = [
     "SwitchbotKeypadVision",
     "SwitchbotLightStrip",
     "SwitchbotLock",
+    "SwitchbotMeterProCO2",
     "SwitchbotModel",
     "SwitchbotModel",
     "SwitchbotOperationError",

+ 172 - 0
switchbot/devices/meter_pro.py

@@ -0,0 +1,172 @@
+from typing import Any
+
+from ..helpers import parse_uint24_be
+from .device import SwitchbotDevice, SwitchbotOperationError
+
+COMMAND_SET_TIME_OFFSET = "570f680506"
+COMMAND_GET_TIME_OFFSET = "570f690506"
+MAX_TIME_OFFSET = (1 << 24) - 1
+
+COMMAND_GET_DEVICE_DATETIME = "570f6901"
+COMMAND_SET_DEVICE_DATETIME = "57000503"
+COMMAND_SET_DISPLAY_FORMAT = "570f680505"
+
+
+class SwitchbotMeterProCO2(SwitchbotDevice):
+    """API to control Switchbot Meter Pro CO2."""
+
+    async def get_time_offset(self) -> int:
+        """
+        Get the current display time offset from the device.
+
+        Returns:
+            int: The time offset in seconds. Max 24 bits.
+
+        """
+        # Response Format: 5 bytes, where
+        # - byte 0: "01" (success)
+        # - byte 1: "00" (plus offset) or "80" (minus offset)
+        # - bytes 2-4: int24, number of seconds to offset.
+        # Example response: 01-80-00-10-00 -> subtract 4096 seconds.
+        result = await self._send_command(COMMAND_GET_TIME_OFFSET)
+        result = self._validate_result("get_time_offset", result, min_length=5)
+
+        is_negative = bool(result[1] & 0b10000000)
+        offset = parse_uint24_be(result, 2)
+        return -offset if is_negative else offset
+
+    async def set_time_offset(self, offset_seconds: int) -> None:
+        """
+        Set the display time offset on the device. This is what happens when
+        you adjust display time in the Switchbot app. The displayed time is
+        calculated as the internal device time (usually comes from the factory
+        settings or set by the Switchbot app upon syncing) + offset. The offset
+        is provided in seconds and can be positive or negative.
+
+        Args:
+            offset_seconds (int): 2^24 maximum, can be negative.
+
+        """
+        abs_offset = abs(offset_seconds)
+        if abs_offset > MAX_TIME_OFFSET:
+            raise SwitchbotOperationError(
+                f"{self.name}: Requested to set_time_offset of {offset_seconds} seconds, allowed +-{MAX_TIME_OFFSET} max."
+            )
+
+        sign_byte = "80" if offset_seconds < 0 else "00"
+
+        # Example: 57-0f-68-05-06-80-00-10-00 -> subtract 4096 seconds.
+        payload = f"{COMMAND_SET_TIME_OFFSET}{sign_byte}{abs_offset:06x}"
+        result = await self._send_command(payload)
+        self._validate_result("set_time_offset", result)
+
+    async def get_datetime(self) -> dict[str, Any]:
+        """
+        Get the current device time and settings as it is displayed. Contains
+        a time offset, if any was applied (see set_time_offset).
+        Doesn't include the current time zone.
+
+        Returns:
+            dict: Dictionary containing:
+                - 12h_mode (bool): True if 12h mode, False if 24h mode.
+                - year (int)
+                - month (int)
+                - day (int)
+                - hour (int)
+                - minute (int)
+                - second (int)
+
+        """
+        # Response Format: 13 bytes, where
+        # - byte 0: "01" (success)
+        # - bytes 1-4: temperature, ignored here.
+        # - byte 5: time display format:
+        #   - "80" - 12h (am/pm)
+        #   - "00" - 24h
+        # - bytes 6-12: yyyy-MM-dd-hh-mm-ss
+        # Example: 01-e4-02-94-23-00-07-e9-0c-1e-08-37-01 contains
+        # "year 2025, 30 December, 08:55:01, displayed in 24h format".
+        result = await self._send_command(COMMAND_GET_DEVICE_DATETIME)
+        result = self._validate_result("get_datetime", result, min_length=13)
+        return {
+            # Whether the time is displayed in 12h(am/pm) or 24h mode.
+            "12h_mode": bool(result[5] & 0b10000000),
+            "year": (result[6] << 8) + result[7],
+            "month": result[8],
+            "day": result[9],
+            "hour": result[10],
+            "minute": result[11],
+            "second": result[12],
+        }
+
+    async def set_datetime(
+        self, timestamp: int, utc_offset_hours: int = 0, utc_offset_minutes: int = 0
+    ) -> None:
+        """
+        Set the device internal time and timezone. Similar to how the
+        Switchbot app does it upon syncing with the device.
+        Pay attention to calculating UTC offset hours and minutes, see
+        examples below.
+
+        Args:
+            timestamp (int): Unix timestamp in seconds.
+            utc_offset_hours (int): UTC offset in hours, floor()'ed,
+                within [-12; 14] range.
+                Examples: -5 for UTC-05:00, -6 for UTC-05:30,
+                5 for UTC+05:00, 5 for UTC+5:30.
+            utc_offset_minutes (int): UTC offset minutes component, always
+                positive, complements utc_offset_hours.
+                Examples: 45 for UTC+05:45, 15 for UTC-5:45.
+
+        """
+        if not (-12 <= utc_offset_hours <= 14):
+            raise SwitchbotOperationError(
+                f"{self.name}: utc_offset_hours must be between -12 and +14 inclusive, got {utc_offset_hours}"
+            )
+        if not (0 <= utc_offset_minutes < 60):
+            raise SwitchbotOperationError(
+                f"{self.name}: utc_offset_minutes must be between 0 and 59 inclusive, got {utc_offset_minutes}"
+            )
+
+        # The device doesn't automatically add offset minutes, it expects them
+        # to come as a part of the timestamp.
+        adjusted_timestamp = timestamp + utc_offset_minutes * 60
+
+        # The timezone is encoded as 1 byte, where 00 stands for UTC-12.
+        # TZ with minute offset gets floor()ed: 4:30 yields 4, -4:30 yields -5.
+        utc_byte = utc_offset_hours + 12
+
+        payload = (
+            f"{COMMAND_SET_DEVICE_DATETIME}{utc_byte:02x}"
+            f"{adjusted_timestamp:016x}{utc_offset_minutes:02x}"
+        )
+
+        result = await self._send_command(payload)
+        self._validate_result("set_datetime", result)
+
+    async def set_time_display_format(self, is_12h_mode: bool = False) -> None:
+        """
+        Set the time display format on the device: 12h(AM/PM) or 24h.
+
+        Args:
+            is_12h_mode (bool): True for 12h (AM/PM) mode, False for 24h mode.
+
+        """
+        mode_byte = "80" if is_12h_mode else "00"
+        payload = f"{COMMAND_SET_DISPLAY_FORMAT}{mode_byte}"
+        result = await self._send_command(payload)
+        self._validate_result("set_time_display_format", result)
+
+    def _validate_result(
+        self, op_name: str, result: bytes | None, min_length: int | None = None
+    ) -> bytes:
+        if not self._check_command_result(result, 0, {1}):
+            raise SwitchbotOperationError(
+                f"{self.name}: Unexpected response code for {op_name} (result={result.hex() if result else 'None'} rssi={self.rssi})"
+            )
+        assert result is not None
+        if min_length is not None and len(result) < min_length:
+            raise SwitchbotOperationError(
+                f"{self.name}: Unexpected response len for {op_name}, wanted at least {min_length} (result={result.hex() if result else 'None'} rssi={self.rssi})"
+            )
+        return result

+ 249 - 0
tests/test_meter_pro.py

@@ -0,0 +1,249 @@
+from unittest.mock import AsyncMock
+
+import pytest
+from bleak.backends.device import BLEDevice
+
+from switchbot import SwitchbotOperationError
+from switchbot.devices.meter_pro import MAX_TIME_OFFSET, SwitchbotMeterProCO2
+
+
+def create_device():
+    ble_device = BLEDevice(
+        address="aa:bb:cc:dd:ee:ff", name="any", details={"rssi": -80}
+    )
+    device = SwitchbotMeterProCO2(ble_device)
+    device._send_command = AsyncMock()
+    return device
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    (
+        "device_response",
+        "expected_offset",
+    ),
+    [
+        ("0100101bc9", 1055689),  # 01 (success) 00 (plus offset) 10 1b c9 (1055689)
+        ("0180101bc9", -1055689),  # 01 (success) 80 (minus offset) 10 1b c9 (1055689)
+    ],
+)
+async def test_get_time_offset(device_response: str, expected_offset: int):
+    device = create_device()
+    device._send_command.return_value = bytes.fromhex(device_response)
+
+    offset = await device.get_time_offset()
+    device._send_command.assert_called_with("570f690506")
+    assert offset == expected_offset
+
+
+@pytest.mark.asyncio
+async def test_get_time_offset_failure():
+    device = create_device()
+    # Invalid 1st byte
+    device._send_command.return_value = bytes.fromhex("0080101bc9")
+
+    with pytest.raises(SwitchbotOperationError):
+        await device.get_time_offset()
+    device._send_command.assert_called_with("570f690506")
+
+
+@pytest.mark.asyncio
+async def test_get_time_offset_wrong_response():
+    device = create_device()
+    # Response too short (only status byte returned)
+    device._send_command.return_value = bytes.fromhex("01")
+
+    with pytest.raises(SwitchbotOperationError):
+        await device.get_time_offset()
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    (
+        "offset_sec",
+        "expected_payload",
+    ),
+    [
+        (1055689, "00101bc9"),  # "00" for positive offset, 101bc9 for 1055689
+        (-4096, "80001000"),  # "80" for negative offset, 001000 for 4096
+        (0, "00000000"),
+        (-0, "00000000"),  # -0 == 0 in Python
+    ],
+)
+async def test_set_time_offset(offset_sec: int, expected_payload: str):
+    device = create_device()
+    device._send_command.return_value = bytes.fromhex("01")
+
+    await device.set_time_offset(offset_sec)
+    device._send_command.assert_called_with("570f680506" + expected_payload)
+
+
+@pytest.mark.asyncio
+async def test_set_time_offset_too_large():
+    device = create_device()
+    with pytest.raises(SwitchbotOperationError):
+        await device.set_time_offset(MAX_TIME_OFFSET + 1)
+
+    with pytest.raises(SwitchbotOperationError):
+        await device.set_time_offset(-(MAX_TIME_OFFSET + 1))
+
+
+@pytest.mark.asyncio
+async def test_set_time_offset_failure():
+    device = create_device()
+    device._send_command.return_value = bytes.fromhex("00")
+
+    with pytest.raises(SwitchbotOperationError):
+        await device.set_time_offset(100)
+
+
+@pytest.mark.asyncio
+async def test_get_datetime_success():
+    device = create_device()
+    # Mock response:
+    # byte 0: 01 (success)
+    # bytes 1-4: e4 02 94 23 (temp, ignored)
+    # byte 5: 00 (24h mode)
+    # bytes 6-7: 07 e9 (year 2025)
+    # byte 8: 0c (Dec)
+    # byte 9: 1e (30)
+    # byte 10: 08 (Hour)
+    # byte 11: 37 (Minute = 55)
+    # byte 12: 01 (Second)
+    response_hex = "01e40294230007e90c1e083701"
+    device._send_command.return_value = bytes.fromhex(response_hex)
+
+    result = await device.get_datetime()
+    device._send_command.assert_called_with("570f6901")
+
+    assert result["12h_mode"] is False
+    assert result["year"] == 2025
+    assert result["month"] == 12
+    assert result["day"] == 30
+    assert result["hour"] == 8
+    assert result["minute"] == 55
+    assert result["second"] == 1
+
+
+@pytest.mark.asyncio
+async def test_get_datetime_12h_mode():
+    device = create_device()
+    # byte 5: 80 (12h mode)
+    # Time: 12:00:00
+    response_hex = "010000000080000001010c0000"
+    device._send_command.return_value = bytes.fromhex(response_hex)
+
+    result = await device.get_datetime()
+    device._send_command.assert_called_with("570f6901")
+
+    assert result["12h_mode"] is True
+    assert result["year"] == 0
+    assert result["month"] == 1
+    assert result["day"] == 1
+    assert result["hour"] == 12
+    assert result["minute"] == 0
+    assert result["second"] == 0
+
+
+@pytest.mark.asyncio
+async def test_get_datetime_failure():
+    device = create_device()
+    device._send_command.return_value = bytes.fromhex("00")
+
+    with pytest.raises(SwitchbotOperationError):
+        await device.get_datetime()
+
+
+@pytest.mark.asyncio
+async def test_get_datetime_wrong_response():
+    device = create_device()
+    device._send_command.return_value = bytes.fromhex("0100")
+
+    with pytest.raises(SwitchbotOperationError):
+        await device.get_datetime()
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    (
+        "timestamp",
+        "utc_offset_hours",
+        "utc_offset_minutes",
+        "expected_ts",
+        "expected_utc",
+        "expected_min",
+    ),
+    [
+        (1709251200, 0, 0, "65e11a80", "0c", "00"),  # 2024-03-01T00:00:00+00:00
+        (1709251200, 1, 0, "65e11a80", "0d", "00"),  # 2024-03-01T00:00:00+01:00
+        (1709251200, 5, 45, "65e1250c", "11", "2d"),  # 2024-03-01T00:00:00+05:45
+        (1709251200, -6, 15, "65e11e04", "06", "0f"),  # 2024-03-01T00:00:00-05:45
+    ],
+)
+async def test_set_datetime(  # noqa: PLR0913
+    timestamp: int,
+    utc_offset_hours: int,
+    utc_offset_minutes: int,
+    expected_ts: str,
+    expected_utc: str,
+    expected_min: str,
+):
+    device = create_device()
+    device._send_command.return_value = bytes.fromhex("01")
+
+    await device.set_datetime(
+        timestamp,
+        utc_offset_hours=utc_offset_hours,
+        utc_offset_minutes=utc_offset_minutes,
+    )
+
+    expected_ts = expected_ts.zfill(16)
+    expected_payload = "57000503" + expected_utc + expected_ts + expected_min
+    device._send_command.assert_called_with(expected_payload)
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "bad_hour",
+    [-13, 15],
+)
+async def test_set_datetime_invalid_utc_offset_hours(bad_hour: int):
+    device = create_device()
+    with pytest.raises(SwitchbotOperationError):
+        await device.set_datetime(1709251200, utc_offset_hours=bad_hour)
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "bad_min",
+    [-1, 60],
+)
+async def test_set_datetime_invalid_utc_offset_minutes(bad_min: int):
+    device = create_device()
+    with pytest.raises(SwitchbotOperationError):
+        await device.set_datetime(1709251200, utc_offset_minutes=bad_min)
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    ("is_12h_mode", "expected_payload"),
+    [
+        (True, "80"),
+        (False, "00"),
+    ],
+)
+async def test_set_time_display_format(is_12h_mode: bool, expected_payload: str):
+    device = create_device()
+    device._send_command.return_value = bytes.fromhex("01")
+
+    await device.set_time_display_format(is_12h_mode=is_12h_mode)
+    device._send_command.assert_called_with("570f680505" + expected_payload)
+
+
+@pytest.mark.asyncio
+async def test_set_time_display_format_failure():
+    device = create_device()
+    device._send_command.return_value = bytes.fromhex("00")
+
+    with pytest.raises(SwitchbotOperationError):
+        await device.set_time_display_format(is_12h_mode=True)