123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367 |
- """Tests for SwitchbotEncryptedDevice base class."""
- from __future__ import annotations
- import asyncio
- from typing import Any
- from unittest.mock import AsyncMock, patch
- import pytest
- from bleak.exc import BleakDBusError
- from switchbot import SwitchbotModel
- from switchbot.devices.device import (
- SwitchbotEncryptedDevice,
- )
- from .test_adv_parser import generate_ble_device
- class MockEncryptedDevice(SwitchbotEncryptedDevice):
- """Mock encrypted device for testing."""
- def __init__(self, *args: Any, **kwargs: Any) -> None:
- super().__init__(*args, **kwargs)
- self.update_count: int = 0
- async def update(self, interface: int | None = None) -> None:
- self.update_count += 1
- def create_encrypted_device(
- model: SwitchbotModel = SwitchbotModel.LOCK,
- ) -> MockEncryptedDevice:
- """Create an encrypted device for testing."""
- ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "Test Device")
- return MockEncryptedDevice(
- ble_device, "01", "0123456789abcdef0123456789abcdef", model=model
- )
- @pytest.mark.asyncio
- async def test_encrypted_device_init() -> None:
- """Test encrypted device initialization."""
- device = create_encrypted_device()
- assert device._key_id == "01"
- assert device._encryption_key == bytearray.fromhex(
- "0123456789abcdef0123456789abcdef"
- )
- assert device._iv is None
- assert device._cipher is None
- @pytest.mark.asyncio
- async def test_encrypted_device_init_validation() -> None:
- """Test encrypted device initialization with invalid parameters."""
- ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "Test Device")
- # Test empty key_id
- with pytest.raises(ValueError, match="key_id is missing"):
- MockEncryptedDevice(
- ble_device, "", "0123456789abcdef0123456789abcdef", SwitchbotModel.LOCK
- )
- # Test invalid key_id length
- with pytest.raises(ValueError, match="key_id is invalid"):
- MockEncryptedDevice(
- ble_device, "1", "0123456789abcdef0123456789abcdef", SwitchbotModel.LOCK
- )
- # Test empty encryption_key
- with pytest.raises(ValueError, match="encryption_key is missing"):
- MockEncryptedDevice(ble_device, "01", "", SwitchbotModel.LOCK)
- # Test invalid encryption_key length
- with pytest.raises(ValueError, match="encryption_key is invalid"):
- MockEncryptedDevice(ble_device, "01", "0123456789abcdef", SwitchbotModel.LOCK)
- @pytest.mark.asyncio
- async def test_send_command_unencrypted() -> None:
- """Test sending unencrypted command."""
- device = create_encrypted_device()
- with patch.object(device, "_send_command_locked_with_retry") as mock_send:
- mock_send.return_value = b"\x01\x00\x00\x00"
- result = await device._send_command("570200", encrypt=False)
- assert result == b"\x01\x00\x00\x00"
- mock_send.assert_called_once()
- # Verify the key was padded with zeros for unencrypted command
- call_args = mock_send.call_args[0]
- assert call_args[0] == "570000000200" # Original key with zeros inserted
- @pytest.mark.asyncio
- async def test_send_command_encrypted_success() -> None:
- """Test successful encrypted command."""
- device = create_encrypted_device()
- # Mock the connection and command execution
- with (
- patch.object(device, "_send_command_locked_with_retry") as mock_send,
- patch.object(device, "_decrypt") as mock_decrypt,
- ):
- mock_decrypt.return_value = b"decrypted_response"
- # First call is for IV initialization, second is for the actual command
- mock_send.side_effect = [
- b"\x01\x00\x00\x00\x12\x34\x56\x78\x9a\xbc\xde\xf0\x12\x34\x56\x78\x9a\xbc\xde\xf0", # IV response (16 bytes)
- b"\x01\x00\x00\x00encrypted_response", # Command response
- ]
- result = await device._send_command("570200", encrypt=True)
- assert result is not None
- assert mock_send.call_count == 2
- # Verify IV was initialized
- assert device._iv is not None
- @pytest.mark.asyncio
- async def test_send_command_iv_already_initialized() -> None:
- """Test sending encrypted command when IV is already initialized."""
- device = create_encrypted_device()
- # Pre-set the IV
- device._iv = b"\x12\x34\x56\x78\x9a\xbc\xde\xf0\x12\x34\x56\x78\x9a\xbc\xde\xf0"
- with (
- patch.object(device, "_send_command_locked_with_retry") as mock_send,
- patch.object(device, "_encrypt") as mock_encrypt,
- patch.object(device, "_decrypt") as mock_decrypt,
- ):
- mock_encrypt.return_value = (
- "656e637279707465645f64617461" # "encrypted_data" in hex
- )
- mock_decrypt.return_value = b"decrypted_response"
- mock_send.return_value = b"\x01\x00\x00\x00encrypted_response"
- result = await device._send_command("570200", encrypt=True)
- assert result == b"\x01decrypted_response"
- # Should only call once since IV is already initialized
- mock_send.assert_called_once()
- mock_encrypt.assert_called_once()
- mock_decrypt.assert_called_once()
- @pytest.mark.asyncio
- async def test_iv_race_condition_during_disconnect() -> None:
- """Test that commands during disconnect are handled properly."""
- device = create_encrypted_device()
- # Pre-set the IV
- device._iv = b"\x12\x34\x56\x78\x9a\xbc\xde\xf0\x12\x34\x56\x78"
- # Mock the connection
- mock_client = AsyncMock()
- mock_client.is_connected = True
- device._client = mock_client
- async def simulate_disconnect() -> None:
- """Simulate disconnect happening during command execution."""
- await asyncio.sleep(0.01) # Small delay
- await device._execute_disconnect()
- with (
- patch.object(device, "_send_command_locked_with_retry") as mock_send,
- patch.object(device, "_ensure_connected"),
- patch.object(device, "_encrypt") as mock_encrypt,
- patch.object(device, "_decrypt") as mock_decrypt,
- ):
- mock_encrypt.return_value = "656e63727970746564" # "encrypted" in hex
- mock_decrypt.return_value = b"response"
- mock_send.return_value = b"\x01\x00\x00\x00response"
- # Start command and disconnect concurrently
- command_task = asyncio.create_task(device._send_command("570200"))
- disconnect_task = asyncio.create_task(simulate_disconnect())
- # Both should complete without error
- result, _ = await asyncio.gather(
- command_task, disconnect_task, return_exceptions=True
- )
- # Command should have completed successfully
- assert isinstance(result, bytes) or result is None
- # IV should be cleared after disconnect
- assert device._iv is None
- @pytest.mark.asyncio
- async def test_ensure_encryption_initialized_with_lock_held() -> None:
- """Test that _ensure_encryption_initialized properly handles the operation lock."""
- device = create_encrypted_device()
- # Acquire the operation lock
- async with device._operation_lock:
- with patch.object(device, "_send_command_locked_with_retry") as mock_send:
- mock_send.return_value = b"\x01\x00\x00\x00\x12\x34\x56\x78\x9a\xbc\xde\xf0\x12\x34\x56\x78\x9a\xbc\xde\xf0"
- result = await device._ensure_encryption_initialized()
- assert result is True
- assert (
- device._iv
- == b"\x12\x34\x56\x78\x9a\xbc\xde\xf0\x12\x34\x56\x78\x9a\xbc\xde\xf0"
- )
- assert device._cipher is None # Should be reset when IV changes
- @pytest.mark.asyncio
- async def test_ensure_encryption_initialized_failure() -> None:
- """Test _ensure_encryption_initialized when IV initialization fails."""
- device = create_encrypted_device()
- async with device._operation_lock:
- with patch.object(device, "_send_command_locked_with_retry") as mock_send:
- # Return failure response
- mock_send.return_value = b"\x00"
- result = await device._ensure_encryption_initialized()
- assert result is False
- assert device._iv is None
- @pytest.mark.asyncio
- async def test_encrypt_decrypt_with_valid_iv() -> None:
- """Test encryption and decryption with valid IV."""
- device = create_encrypted_device()
- device._iv = b"\x00" * 16 # Use zeros for predictable test
- # Test encryption
- encrypted = device._encrypt("48656c6c6f") # "Hello" in hex
- assert isinstance(encrypted, str)
- assert len(encrypted) > 0
- # Test decryption
- decrypted = device._decrypt(bytearray.fromhex(encrypted))
- assert decrypted.hex() == "48656c6c6f"
- @pytest.mark.asyncio
- async def test_encrypt_with_none_iv() -> None:
- """Test that encryption raises error when IV is None."""
- device = create_encrypted_device()
- device._iv = None
- with pytest.raises(RuntimeError, match="Cannot encrypt: IV is None"):
- device._encrypt("48656c6c6f")
- @pytest.mark.asyncio
- async def test_decrypt_with_none_iv() -> None:
- """Test that decryption raises error when IV is None."""
- device = create_encrypted_device()
- device._iv = None
- with pytest.raises(RuntimeError, match="Cannot decrypt: IV is None"):
- device._decrypt(bytearray.fromhex("48656c6c6f"))
- @pytest.mark.asyncio
- async def test_get_cipher_with_none_iv() -> None:
- """Test that _get_cipher raises error when IV is None."""
- device = create_encrypted_device()
- device._iv = None
- with pytest.raises(RuntimeError, match="Cannot create cipher: IV is None"):
- device._get_cipher()
- @pytest.mark.asyncio
- async def test_execute_disconnect_clears_encryption_state() -> None:
- """Test that disconnect properly clears encryption state."""
- device = create_encrypted_device()
- device._iv = b"\x12\x34\x56\x78\x9a\xbc\xde\xf0\x12\x34\x56\x78\x9a\xbc\xde\xf0"
- device._cipher = None # type: ignore[assignment]
- # Mock client
- mock_client = AsyncMock()
- device._client = mock_client
- with patch.object(device, "_execute_disconnect_with_lock") as mock_disconnect:
- await device._execute_disconnect()
- assert device._iv is None
- assert device._cipher is None
- mock_disconnect.assert_called_once()
- @pytest.mark.asyncio
- async def test_concurrent_commands_with_same_device() -> None:
- """Test multiple concurrent commands on the same device."""
- device = create_encrypted_device()
- # Pre-initialize IV (16 bytes for AES CTR mode)
- device._iv = b"\x12\x34\x56\x78\x9a\xbc\xde\xf0\x12\x34\x56\x78\x9a\xbc\xde\xf0"
- with (
- patch.object(device, "_send_command_locked_with_retry") as mock_send,
- patch.object(device, "_encrypt") as mock_encrypt,
- patch.object(device, "_decrypt") as mock_decrypt,
- ):
- mock_encrypt.return_value = "656e63727970746564" # "encrypted" in hex
- mock_decrypt.return_value = b"response"
- mock_send.return_value = b"\x01\x00\x00\x00data"
- # Send multiple commands concurrently
- tasks = [
- device._send_command("570200"),
- device._send_command("570201"),
- device._send_command("570202"),
- ]
- results = await asyncio.gather(*tasks)
- # All commands should succeed
- assert all(result == b"\x01response" for result in results)
- assert mock_send.call_count == 3
- @pytest.mark.asyncio
- async def test_command_retry_with_encryption() -> None:
- """Test command retry logic with encrypted commands."""
- device = create_encrypted_device()
- device._retry_count = 2
- # Pre-initialize IV (16 bytes for AES CTR mode)
- device._iv = b"\x12\x34\x56\x78\x9a\xbc\xde\xf0\x12\x34\x56\x78\x9a\xbc\xde\xf0"
- with (
- patch.object(device, "_send_command_locked") as mock_send_locked,
- patch.object(device, "_ensure_connected"),
- patch.object(device, "_encrypt") as mock_encrypt,
- patch.object(device, "_decrypt") as mock_decrypt,
- ):
- mock_encrypt.return_value = "656e63727970746564" # "encrypted" in hex
- mock_decrypt.return_value = b"response"
- # First attempt fails, second succeeds
- mock_send_locked.side_effect = [
- BleakDBusError("org.bluez.Error", []),
- b"\x01\x00\x00\x00data",
- ]
- result = await device._send_command("570200")
- assert result == b"\x01response"
- assert mock_send_locked.call_count == 2
- @pytest.mark.asyncio
- async def test_empty_data_encryption_decryption() -> None:
- """Test encryption/decryption of empty data."""
- device = create_encrypted_device()
- device._iv = b"\x00" * 16
- # Test empty encryption
- encrypted = device._encrypt("")
- assert encrypted == ""
- # Test empty decryption
- decrypted = device._decrypt(bytearray())
- assert decrypted == b""
|