Browse Source

Fix device discovery when Bluetooth adapter is in passive scanning mode (#397)

J. Nick Koston 1 month ago
parent
commit
60c2725025
7 changed files with 688 additions and 25 deletions
  1. 2 0
      switchbot/__init__.py
  2. 12 0
      switchbot/adv_parser.py
  3. 155 24
      switchbot/devices/device.py
  4. 24 0
      switchbot/utils.py
  5. 75 1
      tests/test_adv_parser.py
  6. 379 0
      tests/test_device.py
  7. 41 0
      tests/test_utils.py

+ 2 - 0
switchbot/__init__.py

@@ -36,6 +36,7 @@ from .devices.device import (
     SwitchbotDevice,
     SwitchbotEncryptedDevice,
     SwitchbotOperationError,
+    fetch_cloud_devices,
 )
 from .devices.evaporative_humidifier import SwitchbotEvaporativeHumidifier
 from .devices.fan import SwitchbotFan
@@ -104,6 +105,7 @@ __all__ = [
     "SwitchbotVacuum",
     "close_stale_connections",
     "close_stale_connections_by_address",
+    "fetch_cloud_devices",
     "get_device",
     "parse_advertisement_data",
 ]

+ 12 - 0
switchbot/adv_parser.py

@@ -45,6 +45,7 @@ from .adv_parsers.roller_shade import process_worollershade
 from .adv_parsers.vacuum import process_vacuum, process_vacuum_k
 from .const import SwitchbotModel
 from .models import SwitchBotAdvertisement
+from .utils import format_mac_upper
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -54,6 +55,8 @@ SERVICE_DATA_ORDER = (
 )
 MFR_DATA_ORDER = (2409, 741, 89)
 
+_MODEL_TO_MAC_CACHE: dict[str, SwitchbotModel] = {}
+
 
 class SwitchbotSupportedType(TypedDict):
     """Supported type of Switchbot."""
@@ -383,6 +386,10 @@ def parse_advertisement_data(
     model: SwitchbotModel | None = None,
 ) -> SwitchBotAdvertisement | None:
     """Parse advertisement data."""
+    upper_mac = format_mac_upper(device.address)
+    if model is None and upper_mac in _MODEL_TO_MAC_CACHE:
+        model = _MODEL_TO_MAC_CACHE[upper_mac]
+
     service_data = advertisement_data.service_data
 
     _service_data = None
@@ -470,3 +477,8 @@ def _parse_data(
             )
 
     return data
+
+
+def populate_model_to_mac_cache(mac: str, model: SwitchbotModel) -> None:
+    """Populate the model to MAC address cache."""
+    _MODEL_TO_MAC_CACHE[mac] = model

+ 155 - 24
switchbot/devices/device.py

@@ -24,6 +24,7 @@ from bleak_retry_connector import (
 )
 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
 
+from ..adv_parser import populate_model_to_mac_cache
 from ..api_config import SWITCHBOT_APP_API_BASE_URL, SWITCHBOT_APP_CLIENT_ID
 from ..const import (
     DEFAULT_RETRY_COUNT,
@@ -37,9 +38,43 @@ from ..const import (
 from ..discovery import GetSwitchbotDevices
 from ..helpers import create_background_task
 from ..models import SwitchBotAdvertisement
+from ..utils import format_mac_upper
 
 _LOGGER = logging.getLogger(__name__)
 
+
+def _extract_region(userinfo: dict[str, Any]) -> str:
+    """Extract region from user info, defaulting to 'us'."""
+    if "botRegion" in userinfo and userinfo["botRegion"] != "":
+        return userinfo["botRegion"]
+    return "us"
+
+
+# Mapping from API model names to SwitchbotModel enum values
+API_MODEL_TO_ENUM: dict[str, SwitchbotModel] = {
+    "WoHand": SwitchbotModel.BOT,
+    "WoCurtain": SwitchbotModel.CURTAIN,
+    "WoHumi": SwitchbotModel.HUMIDIFIER,
+    "WoPlug": SwitchbotModel.PLUG_MINI,
+    "WoPlugUS": SwitchbotModel.PLUG_MINI,
+    "WoContact": SwitchbotModel.CONTACT_SENSOR,
+    "WoStrip": SwitchbotModel.LIGHT_STRIP,
+    "WoSensorTH": SwitchbotModel.METER,
+    "WoMeter": SwitchbotModel.METER,
+    "WoMeterPlus": SwitchbotModel.METER_PRO,
+    "WoPresence": SwitchbotModel.MOTION_SENSOR,
+    "WoBulb": SwitchbotModel.COLOR_BULB,
+    "WoCeiling": SwitchbotModel.CEILING_LIGHT,
+    "WoLock": SwitchbotModel.LOCK,
+    "WoBlindTilt": SwitchbotModel.BLIND_TILT,
+    "WoIOSensor": SwitchbotModel.IO_METER,  # Outdoor Meter
+    "WoButton": SwitchbotModel.REMOTE,  # Remote button
+    "WoLinkMini": SwitchbotModel.HUBMINI_MATTER,  # Hub Mini
+    "W1083002": SwitchbotModel.RELAY_SWITCH_1,  # Relay Switch 1
+    "W1079000": SwitchbotModel.METER_PRO,  # Meter Pro (another variant)
+    "W1102001": SwitchbotModel.STRIP_LIGHT_3,  # RGBWW Strip Light 3
+}
+
 REQ_HEADER = "570f"
 
 
@@ -164,6 +199,113 @@ class SwitchbotBaseDevice:
         self._last_full_update: float = -PASSIVE_POLL_INTERVAL
         self._timed_disconnect_task: asyncio.Task[None] | None = None
 
+    @classmethod
+    async def _async_get_user_info(
+        cls,
+        session: aiohttp.ClientSession,
+        auth_headers: dict[str, str],
+    ) -> dict[str, Any]:
+        try:
+            return await cls.api_request(
+                session, "account", "account/api/v1/user/userinfo", {}, auth_headers
+            )
+        except Exception as err:
+            raise SwitchbotAccountConnectionError(
+                f"Failed to retrieve SwitchBot Account user details: {err}"
+            ) from err
+
+    @classmethod
+    async def _get_auth_result(
+        cls,
+        session: aiohttp.ClientSession,
+        username: str,
+        password: str,
+    ) -> dict[str, Any]:
+        """Authenticate with SwitchBot API."""
+        try:
+            return await cls.api_request(
+                session,
+                "account",
+                "account/api/v1/user/login",
+                {
+                    "clientId": SWITCHBOT_APP_CLIENT_ID,
+                    "username": username,
+                    "password": password,
+                    "grantType": "password",
+                    "verifyCode": "",
+                },
+            )
+        except Exception as err:
+            raise SwitchbotAuthenticationError(f"Authentication failed: {err}") from err
+
+    @classmethod
+    async def get_devices(
+        cls,
+        session: aiohttp.ClientSession,
+        username: str,
+        password: str,
+    ) -> dict[str, SwitchbotModel]:
+        """Get devices from SwitchBot API and return formatted MAC to model mapping."""
+        try:
+            auth_result = await cls._get_auth_result(session, username, password)
+            auth_headers = {"authorization": auth_result["access_token"]}
+        except Exception as err:
+            raise SwitchbotAuthenticationError(f"Authentication failed: {err}") from err
+
+        userinfo = await cls._async_get_user_info(session, auth_headers)
+        region = _extract_region(userinfo)
+
+        try:
+            device_info = await cls.api_request(
+                session,
+                f"wonderlabs.{region}",
+                "wonder/device/v3/getdevice",
+                {
+                    "required_type": "All",
+                },
+                auth_headers,
+            )
+        except Exception as err:
+            raise SwitchbotAccountConnectionError(
+                f"Failed to retrieve devices from SwitchBot Account: {err}"
+            ) from err
+
+        items: list[dict[str, Any]] = device_info["Items"]
+        mac_to_model: dict[str, SwitchbotModel] = {}
+
+        for item in items:
+            if "device_mac" not in item:
+                continue
+
+            if (
+                "device_detail" not in item
+                or "device_type" not in item["device_detail"]
+            ):
+                continue
+
+            mac = item["device_mac"]
+            model_name = item["device_detail"]["device_type"]
+
+            # Format MAC to uppercase with colons
+            formatted_mac = format_mac_upper(mac)
+
+            # Map API model name to SwitchbotModel enum if possible
+            if model_name in API_MODEL_TO_ENUM:
+                model = API_MODEL_TO_ENUM[model_name]
+                mac_to_model[formatted_mac] = model
+                # Populate the cache
+                populate_model_to_mac_cache(formatted_mac, model)
+            else:
+                # Log the full item payload for unknown models
+                _LOGGER.debug(
+                    "Unknown model %s for device %s, full item: %s",
+                    model_name,
+                    formatted_mac,
+                    item,
+                )
+
+        return mac_to_model
+
     @classmethod
     async def api_request(
         cls,
@@ -809,34 +951,13 @@ class SwitchbotEncryptedDevice(SwitchbotDevice):
         device_mac = device_mac.replace(":", "").replace("-", "").upper()
 
         try:
-            auth_result = await cls.api_request(
-                session,
-                "account",
-                "account/api/v1/user/login",
-                {
-                    "clientId": SWITCHBOT_APP_CLIENT_ID,
-                    "username": username,
-                    "password": password,
-                    "grantType": "password",
-                    "verifyCode": "",
-                },
-            )
+            auth_result = await cls._get_auth_result(session, username, password)
             auth_headers = {"authorization": auth_result["access_token"]}
         except Exception as err:
             raise SwitchbotAuthenticationError(f"Authentication failed: {err}") from err
 
-        try:
-            userinfo = await cls.api_request(
-                session, "account", "account/api/v1/user/userinfo", {}, auth_headers
-            )
-            if "botRegion" in userinfo and userinfo["botRegion"] != "":
-                region = userinfo["botRegion"]
-            else:
-                region = "us"
-        except Exception as err:
-            raise SwitchbotAccountConnectionError(
-                f"Failed to retrieve SwitchBot Account user details: {err}"
-            ) from err
+        userinfo = await cls._async_get_user_info(session, auth_headers)
+        region = _extract_region(userinfo)
 
         try:
             device_info = await cls.api_request(
@@ -1023,3 +1144,13 @@ class SwitchbotSequenceDevice(SwitchbotDevice):
         )
         if current_state != new_state:
             create_background_task(self.update())
+
+
+async def fetch_cloud_devices(
+    session: aiohttp.ClientSession,
+    username: str,
+    password: str,
+) -> dict[str, SwitchbotModel]:
+    """Fetch devices from SwitchBot API and return MAC to model mapping."""
+    # Get devices from the API (which also populates the cache)
+    return await SwitchbotBaseDevice.get_devices(session, username, password)

+ 24 - 0
switchbot/utils.py

@@ -0,0 +1,24 @@
+"""Utility functions for switchbot."""
+
+from functools import lru_cache
+
+
+@lru_cache(maxsize=512)
+def format_mac_upper(mac: str) -> str:
+    """Format the mac address string to uppercase with colons."""
+    to_test = mac
+
+    if len(to_test) == 17 and to_test.count(":") == 5:
+        return to_test.upper()
+
+    if len(to_test) == 17 and to_test.count("-") == 5:
+        to_test = to_test.replace("-", "")
+    elif len(to_test) == 14 and to_test.count(".") == 2:
+        to_test = to_test.replace(".", "")
+
+    if len(to_test) == 12:
+        # no : included
+        return ":".join(to_test.upper()[i : i + 2] for i in range(0, 12, 2))
+
+    # Not sure how formatted, return original
+    return mac.upper()

+ 75 - 1
tests/test_adv_parser.py

@@ -8,7 +8,11 @@ from bleak.backends.device import BLEDevice
 from bleak.backends.scanner import AdvertisementData
 
 from switchbot import HumidifierMode, SwitchbotModel
-from switchbot.adv_parser import parse_advertisement_data
+from switchbot.adv_parser import (
+    _MODEL_TO_MAC_CACHE,
+    parse_advertisement_data,
+    populate_model_to_mac_cache,
+)
 from switchbot.const.lock import LockStatus
 from switchbot.models import SwitchBotAdvertisement
 
@@ -3790,3 +3794,73 @@ def test_adv_with_empty_data(test_case: AdvTestCase) -> None:
         rssi=-97,
         active=True,
     )
+
+
+def test_parse_advertisement_with_mac_cache() -> None:
+    """Test that populating the MAC cache helps identify unknown passive devices."""
+    # Clear the cache to ensure clean test
+    _MODEL_TO_MAC_CACHE.clear()
+
+    # Create a passive lock device with manufacturer data only (no service data)
+    # This would normally not be identifiable without the cache
+    mac_address = "C1:64:B8:7D:06:05"
+    ble_device = generate_ble_device(mac_address, "WoLock")
+
+    # Lock passive advertisement with manufacturer data
+    # This is real data from a WoLock device in passive mode
+    adv_data = generate_advertisement_data(
+        manufacturer_data={2409: b"\xc1d\xb8}\x06\x05\x00\x00\x00\x00\x00"},
+        service_data={},
+        rssi=-70,
+    )
+
+    # First attempt: Without cache, parser cannot identify the device model
+    result_without_cache = parse_advertisement_data(ble_device, adv_data)
+    assert result_without_cache is None, "Should not decode without model hint"
+
+    # Now populate the cache with the device's MAC and model
+    populate_model_to_mac_cache(mac_address, SwitchbotModel.LOCK)
+
+    # Second attempt: With cache, parser can now identify and decode the device
+    result_with_cache = parse_advertisement_data(ble_device, adv_data)
+    assert result_with_cache is not None, "Should decode with MAC cache"
+    assert result_with_cache.data["modelName"] == SwitchbotModel.LOCK
+    assert result_with_cache.data["modelFriendlyName"] == "Lock"
+    assert result_with_cache.active is False  # Passive advertisement
+
+    # Clean up
+    _MODEL_TO_MAC_CACHE.clear()
+
+
+def test_parse_advertisement_with_mac_cache_curtain() -> None:
+    """Test MAC cache with a passive curtain device."""
+    # Clear the cache
+    _MODEL_TO_MAC_CACHE.clear()
+
+    # Create a passive curtain device
+    mac_address = "CC:F4:C4:F9:AC:6C"
+    ble_device = generate_ble_device(mac_address, None)
+
+    # Curtain passive advertisement with only manufacturer data
+    adv_data = generate_advertisement_data(
+        manufacturer_data={2409: b"\xccOLG\x00c\x00\x00\x11\x00\x00"},
+        service_data={},
+        rssi=-85,
+    )
+
+    # Without cache, cannot identify
+    result_without_cache = parse_advertisement_data(ble_device, adv_data)
+    assert result_without_cache is None
+
+    # Populate cache
+    populate_model_to_mac_cache(mac_address, SwitchbotModel.CURTAIN)
+
+    # With cache, can identify and parse
+    result_with_cache = parse_advertisement_data(ble_device, adv_data)
+    assert result_with_cache is not None
+    assert result_with_cache.data["modelName"] == SwitchbotModel.CURTAIN
+    assert result_with_cache.data["modelFriendlyName"] == "Curtain"
+    assert result_with_cache.active is False
+
+    # Clean up
+    _MODEL_TO_MAC_CACHE.clear()

+ 379 - 0
tests/test_device.py

@@ -0,0 +1,379 @@
+"""Tests for device.py functionality."""
+
+from __future__ import annotations
+
+import logging
+from typing import Any
+from unittest.mock import MagicMock, patch
+
+import aiohttp
+import pytest
+
+from switchbot import fetch_cloud_devices
+from switchbot.adv_parser import _MODEL_TO_MAC_CACHE, populate_model_to_mac_cache
+from switchbot.const import (
+    SwitchbotAccountConnectionError,
+    SwitchbotAuthenticationError,
+    SwitchbotModel,
+)
+from switchbot.devices.device import SwitchbotBaseDevice, _extract_region
+
+
+@pytest.fixture
+def mock_auth_response() -> dict[str, Any]:
+    """Mock authentication response."""
+    return {
+        "access_token": "test_token_123",
+        "refresh_token": "refresh_token_123",
+        "expires_in": 3600,
+    }
+
+
+@pytest.fixture
+def mock_user_info() -> dict[str, Any]:
+    """Mock user info response."""
+    return {
+        "botRegion": "us",
+        "country": "us",
+        "email": "test@example.com",
+    }
+
+
+@pytest.fixture
+def mock_device_response() -> dict[str, Any]:
+    """Mock device list response."""
+    return {
+        "Items": [
+            {
+                "device_mac": "aabbccddeeff",
+                "device_name": "Test Bot",
+                "device_detail": {
+                    "device_type": "WoHand",
+                    "version": "1.0.0",
+                },
+            },
+            {
+                "device_mac": "112233445566",
+                "device_name": "Test Curtain",
+                "device_detail": {
+                    "device_type": "WoCurtain",
+                    "version": "2.0.0",
+                },
+            },
+            {
+                "device_mac": "778899aabbcc",
+                "device_name": "Test Lock",
+                "device_detail": {
+                    "device_type": "WoLock",
+                    "version": "1.5.0",
+                },
+            },
+            {
+                "device_mac": "ddeeff001122",
+                "device_name": "Unknown Device",
+                "device_detail": {
+                    "device_type": "WoUnknown",
+                    "version": "1.0.0",
+                    "extra_field": "extra_value",
+                },
+            },
+            {
+                "device_mac": "invalid_device",
+                # Missing device_detail
+            },
+            {
+                "device_mac": "another_invalid",
+                "device_detail": {
+                    # Missing device_type
+                    "version": "1.0.0",
+                },
+            },
+        ]
+    }
+
+
+@pytest.mark.asyncio
+async def test_get_devices(
+    mock_auth_response: dict[str, Any],
+    mock_user_info: dict[str, Any],
+    mock_device_response: dict[str, Any],
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test get_devices method."""
+    caplog.set_level(logging.DEBUG)
+
+    with (
+        patch.object(
+            SwitchbotBaseDevice, "_get_auth_result", return_value=mock_auth_response
+        ),
+        patch.object(
+            SwitchbotBaseDevice, "_async_get_user_info", return_value=mock_user_info
+        ),
+        patch.object(
+            SwitchbotBaseDevice, "api_request", return_value=mock_device_response
+        ) as mock_api_request,
+        patch(
+            "switchbot.devices.device.populate_model_to_mac_cache"
+        ) as mock_populate_cache,
+    ):
+        session = MagicMock(spec=aiohttp.ClientSession)
+        result = await SwitchbotBaseDevice.get_devices(
+            session, "test@example.com", "password123"
+        )
+
+        # Check that api_request was called with correct parameters
+        mock_api_request.assert_called_once_with(
+            session,
+            "wonderlabs.us",
+            "wonder/device/v3/getdevice",
+            {"required_type": "All"},
+            {"authorization": "test_token_123"},
+        )
+
+        # Check returned dictionary
+        assert len(result) == 3  # Only valid devices with known models
+        assert result["AA:BB:CC:DD:EE:FF"] == SwitchbotModel.BOT
+        assert result["11:22:33:44:55:66"] == SwitchbotModel.CURTAIN
+        assert result["77:88:99:AA:BB:CC"] == SwitchbotModel.LOCK
+
+        # Check that cache was populated
+        assert mock_populate_cache.call_count == 3
+        mock_populate_cache.assert_any_call("AA:BB:CC:DD:EE:FF", SwitchbotModel.BOT)
+        mock_populate_cache.assert_any_call("11:22:33:44:55:66", SwitchbotModel.CURTAIN)
+        mock_populate_cache.assert_any_call("77:88:99:AA:BB:CC", SwitchbotModel.LOCK)
+
+        # Check that unknown model was logged
+        assert "Unknown model WoUnknown for device DD:EE:FF:00:11:22" in caplog.text
+        assert "extra_field" in caplog.text  # Full item should be logged
+
+
+@pytest.mark.asyncio
+async def test_get_devices_with_region(
+    mock_auth_response: dict[str, Any],
+    mock_device_response: dict[str, Any],
+    caplog: pytest.LogCaptureFixture,
+) -> None:
+    """Test get_devices with different region."""
+    mock_user_info_eu = {
+        "botRegion": "eu",
+        "country": "de",
+        "email": "test@example.com",
+    }
+
+    with (
+        patch.object(
+            SwitchbotBaseDevice, "_get_auth_result", return_value=mock_auth_response
+        ),
+        patch.object(
+            SwitchbotBaseDevice, "_async_get_user_info", return_value=mock_user_info_eu
+        ),
+        patch.object(
+            SwitchbotBaseDevice, "api_request", return_value=mock_device_response
+        ) as mock_api_request,
+        patch("switchbot.devices.device.populate_model_to_mac_cache"),
+    ):
+        session = MagicMock(spec=aiohttp.ClientSession)
+        await SwitchbotBaseDevice.get_devices(
+            session, "test@example.com", "password123"
+        )
+
+        # Check that EU region was used
+        mock_api_request.assert_called_once_with(
+            session,
+            "wonderlabs.eu",
+            "wonder/device/v3/getdevice",
+            {"required_type": "All"},
+            {"authorization": "test_token_123"},
+        )
+
+
+@pytest.mark.asyncio
+async def test_get_devices_no_region(
+    mock_auth_response: dict[str, Any],
+    mock_device_response: dict[str, Any],
+) -> None:
+    """Test get_devices with no region specified (defaults to us)."""
+    mock_user_info_no_region = {
+        "email": "test@example.com",
+    }
+
+    with (
+        patch.object(
+            SwitchbotBaseDevice, "_get_auth_result", return_value=mock_auth_response
+        ),
+        patch.object(
+            SwitchbotBaseDevice,
+            "_async_get_user_info",
+            return_value=mock_user_info_no_region,
+        ),
+        patch.object(
+            SwitchbotBaseDevice, "api_request", return_value=mock_device_response
+        ) as mock_api_request,
+        patch("switchbot.devices.device.populate_model_to_mac_cache"),
+    ):
+        session = MagicMock(spec=aiohttp.ClientSession)
+        await SwitchbotBaseDevice.get_devices(
+            session, "test@example.com", "password123"
+        )
+
+        # Check that default US region was used
+        mock_api_request.assert_called_once_with(
+            session,
+            "wonderlabs.us",
+            "wonder/device/v3/getdevice",
+            {"required_type": "All"},
+            {"authorization": "test_token_123"},
+        )
+
+
+@pytest.mark.asyncio
+async def test_get_devices_empty_region(
+    mock_auth_response: dict[str, Any],
+    mock_device_response: dict[str, Any],
+) -> None:
+    """Test get_devices with empty region string (defaults to us)."""
+    mock_user_info_empty_region = {
+        "botRegion": "",
+        "email": "test@example.com",
+    }
+
+    with (
+        patch.object(
+            SwitchbotBaseDevice, "_get_auth_result", return_value=mock_auth_response
+        ),
+        patch.object(
+            SwitchbotBaseDevice,
+            "_async_get_user_info",
+            return_value=mock_user_info_empty_region,
+        ),
+        patch.object(
+            SwitchbotBaseDevice, "api_request", return_value=mock_device_response
+        ) as mock_api_request,
+        patch("switchbot.devices.device.populate_model_to_mac_cache"),
+    ):
+        session = MagicMock(spec=aiohttp.ClientSession)
+        await SwitchbotBaseDevice.get_devices(
+            session, "test@example.com", "password123"
+        )
+
+        # Check that default US region was used
+        mock_api_request.assert_called_once_with(
+            session,
+            "wonderlabs.us",
+            "wonder/device/v3/getdevice",
+            {"required_type": "All"},
+            {"authorization": "test_token_123"},
+        )
+
+
+@pytest.mark.asyncio
+async def test_fetch_cloud_devices(
+    mock_auth_response: dict[str, Any],
+    mock_user_info: dict[str, Any],
+    mock_device_response: dict[str, Any],
+) -> None:
+    """Test fetch_cloud_devices wrapper function."""
+    with (
+        patch.object(
+            SwitchbotBaseDevice, "_get_auth_result", return_value=mock_auth_response
+        ),
+        patch.object(
+            SwitchbotBaseDevice, "_async_get_user_info", return_value=mock_user_info
+        ),
+        patch.object(
+            SwitchbotBaseDevice, "api_request", return_value=mock_device_response
+        ),
+        patch(
+            "switchbot.devices.device.populate_model_to_mac_cache"
+        ) as mock_populate_cache,
+    ):
+        session = MagicMock(spec=aiohttp.ClientSession)
+        result = await fetch_cloud_devices(session, "test@example.com", "password123")
+
+        # Check returned dictionary
+        assert len(result) == 3
+        assert result["AA:BB:CC:DD:EE:FF"] == SwitchbotModel.BOT
+        assert result["11:22:33:44:55:66"] == SwitchbotModel.CURTAIN
+        assert result["77:88:99:AA:BB:CC"] == SwitchbotModel.LOCK
+
+        # Check that cache was populated
+        assert mock_populate_cache.call_count == 3
+
+
+@pytest.mark.asyncio
+async def test_get_devices_authentication_error() -> None:
+    """Test get_devices with authentication error."""
+    with patch.object(
+        SwitchbotBaseDevice,
+        "_get_auth_result",
+        side_effect=Exception("Auth failed"),
+    ):
+        session = MagicMock(spec=aiohttp.ClientSession)
+        with pytest.raises(SwitchbotAuthenticationError) as exc_info:
+            await SwitchbotBaseDevice.get_devices(
+                session, "test@example.com", "wrong_password"
+            )
+        assert "Authentication failed" in str(exc_info.value)
+
+
+@pytest.mark.asyncio
+async def test_get_devices_connection_error(
+    mock_auth_response: dict[str, Any],
+    mock_user_info: dict[str, Any],
+) -> None:
+    """Test get_devices with connection error."""
+    with (
+        patch.object(
+            SwitchbotBaseDevice, "_get_auth_result", return_value=mock_auth_response
+        ),
+        patch.object(
+            SwitchbotBaseDevice, "_async_get_user_info", return_value=mock_user_info
+        ),
+        patch.object(
+            SwitchbotBaseDevice,
+            "api_request",
+            side_effect=Exception("Network error"),
+        ),
+    ):
+        session = MagicMock(spec=aiohttp.ClientSession)
+        with pytest.raises(SwitchbotAccountConnectionError) as exc_info:
+            await SwitchbotBaseDevice.get_devices(
+                session, "test@example.com", "password123"
+            )
+        assert "Failed to retrieve devices" in str(exc_info.value)
+
+
+@pytest.mark.asyncio
+async def test_populate_model_to_mac_cache() -> None:
+    """Test the populate_model_to_mac_cache helper function."""
+    # Clear the cache first
+    _MODEL_TO_MAC_CACHE.clear()
+
+    # Populate cache with test data
+    populate_model_to_mac_cache("AA:BB:CC:DD:EE:FF", SwitchbotModel.BOT)
+    populate_model_to_mac_cache("11:22:33:44:55:66", SwitchbotModel.CURTAIN)
+
+    # Check cache contents
+    assert _MODEL_TO_MAC_CACHE["AA:BB:CC:DD:EE:FF"] == SwitchbotModel.BOT
+    assert _MODEL_TO_MAC_CACHE["11:22:33:44:55:66"] == SwitchbotModel.CURTAIN
+    assert len(_MODEL_TO_MAC_CACHE) == 2
+
+    # Clear cache after test
+    _MODEL_TO_MAC_CACHE.clear()
+
+
+def test_extract_region() -> None:
+    """Test the _extract_region helper function."""
+    # Test with botRegion present and not empty
+    assert _extract_region({"botRegion": "eu", "country": "de"}) == "eu"
+    assert _extract_region({"botRegion": "us", "country": "us"}) == "us"
+    assert _extract_region({"botRegion": "jp", "country": "jp"}) == "jp"
+
+    # Test with botRegion empty string
+    assert _extract_region({"botRegion": "", "country": "de"}) == "us"
+
+    # Test with botRegion missing
+    assert _extract_region({"country": "de"}) == "us"
+
+    # Test with empty dict
+    assert _extract_region({}) == "us"

+ 41 - 0
tests/test_utils.py

@@ -0,0 +1,41 @@
+"""Tests for utils.py functionality."""
+
+from __future__ import annotations
+
+from switchbot.utils import format_mac_upper
+
+
+def test_format_mac_upper() -> None:
+    """Test the format_mac_upper utility function."""
+    # Test already formatted with colons (lowercase)
+    assert format_mac_upper("aa:bb:cc:dd:ee:ff") == "AA:BB:CC:DD:EE:FF"
+
+    # Test already formatted with colons (uppercase)
+    assert format_mac_upper("AA:BB:CC:DD:EE:FF") == "AA:BB:CC:DD:EE:FF"
+
+    # Test with dashes
+    assert format_mac_upper("aa-bb-cc-dd-ee-ff") == "AA:BB:CC:DD:EE:FF"
+    assert format_mac_upper("AA-BB-CC-DD-EE-FF") == "AA:BB:CC:DD:EE:FF"
+
+    # Test with dots (Cisco format)
+    assert format_mac_upper("aabb.ccdd.eeff") == "AA:BB:CC:DD:EE:FF"
+    assert format_mac_upper("AABB.CCDD.EEFF") == "AA:BB:CC:DD:EE:FF"
+
+    # Test without separators
+    assert format_mac_upper("aabbccddeeff") == "AA:BB:CC:DD:EE:FF"
+    assert format_mac_upper("AABBCCDDEEFF") == "AA:BB:CC:DD:EE:FF"
+
+    # Test mixed case without separators
+    assert format_mac_upper("AaBbCcDdEeFf") == "AA:BB:CC:DD:EE:FF"
+
+    # Test invalid formats (should return original in uppercase)
+    assert format_mac_upper("invalid") == "INVALID"
+    assert format_mac_upper("aa:bb:cc") == "AA:BB:CC"  # Too short
+    assert (
+        format_mac_upper("aa:bb:cc:dd:ee:ff:gg") == "AA:BB:CC:DD:EE:FF:GG"
+    )  # Too long
+
+    # Test edge cases
+    assert format_mac_upper("") == ""
+    assert format_mac_upper("123456789ABC") == "12:34:56:78:9A:BC"
+    assert format_mac_upper("12:34:56:78:9a:bc") == "12:34:56:78:9A:BC"