123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- """Library to handle connection with Switchbot Lock."""
- from __future__ import annotations
- import asyncio
- import logging
- import time
- from typing import Any
- import aiohttp
- from bleak.backends.device import BLEDevice
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
- from ..api_config import SWITCHBOT_APP_API_BASE_URL, SWITCHBOT_APP_CLIENT_ID
- from ..const import (
- LockStatus,
- SwitchbotAccountConnectionError,
- SwitchbotApiError,
- SwitchbotAuthenticationError,
- )
- from .device import SwitchbotDevice, SwitchbotOperationError
- COMMAND_HEADER = "57"
- COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
- COMMAND_LOCK_INFO = f"{COMMAND_HEADER}0f4f8101"
- COMMAND_UNLOCK = f"{COMMAND_HEADER}0f4e01011080"
- COMMAND_UNLOCK_WITHOUT_UNLATCH = f"{COMMAND_HEADER}0f4e010110a0"
- COMMAND_LOCK = f"{COMMAND_HEADER}0f4e01011000"
- COMMAND_ENABLE_NOTIFICATIONS = f"{COMMAND_HEADER}0e01001e00008101"
- COMMAND_DISABLE_NOTIFICATIONS = f"{COMMAND_HEADER}0e00"
- MOVING_STATUSES = {LockStatus.LOCKING, LockStatus.UNLOCKING}
- BLOCKED_STATUSES = {LockStatus.LOCKING_STOP, LockStatus.UNLOCKING_STOP}
- REST_STATUSES = {LockStatus.LOCKED, LockStatus.UNLOCKED, LockStatus.NOT_FULLY_LOCKED}
- _LOGGER = logging.getLogger(__name__)
- COMMAND_RESULT_EXPECTED_VALUES = {1, 6}
- # The return value of the command is 1 when the command is successful.
- # The return value of the command is 6 when the command is successful but the battery is low.
- class SwitchbotLock(SwitchbotDevice):
- """Representation of a Switchbot Lock."""
- def __init__(
- self,
- device: BLEDevice,
- key_id: str,
- encryption_key: str,
- interface: int = 0,
- **kwargs: Any,
- ) -> None:
- if len(key_id) == 0:
- raise ValueError("key_id is missing")
- elif len(key_id) != 2:
- raise ValueError("key_id is invalid")
- if len(encryption_key) == 0:
- raise ValueError("encryption_key is missing")
- elif len(encryption_key) != 32:
- raise ValueError("encryption_key is invalid")
- self._iv = None
- self._cipher = None
- self._key_id = key_id
- self._encryption_key = bytearray.fromhex(encryption_key)
- self._notifications_enabled: bool = False
- super().__init__(device, None, interface, **kwargs)
- @staticmethod
- async def verify_encryption_key(
- device: BLEDevice, key_id: str, encryption_key: str
- ) -> bool:
- try:
- lock = SwitchbotLock(
- device=device, key_id=key_id, encryption_key=encryption_key
- )
- except ValueError:
- return False
- try:
- lock_info = await lock.get_basic_info()
- except SwitchbotOperationError:
- return False
- return lock_info is not None
- @staticmethod
- async def api_request(
- session: aiohttp.ClientSession,
- subdomain: str,
- path: str,
- data: dict = None,
- headers: dict = None,
- ) -> dict:
- url = f"https://{subdomain}.{SWITCHBOT_APP_API_BASE_URL}/{path}"
- async with session.post(
- url,
- json=data,
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=10),
- ) as result:
- if result.status > 299:
- raise SwitchbotApiError(
- f"Unexpected status code returned by SwitchBot API: {result.status}"
- )
- response = await result.json()
- if response["statusCode"] != 100:
- raise SwitchbotApiError(
- f"{response['message']}, status code: {response['statusCode']}"
- )
- return response["body"]
- # Old non-async method preserved for backwards compatibility
- @staticmethod
- def retrieve_encryption_key(device_mac: str, username: str, password: str):
- async def async_fn():
- async with aiohttp.ClientSession() as session:
- return await SwitchbotLock.async_retrieve_encryption_key(
- session, device_mac, username, password
- )
- return asyncio.run(async_fn())
- @staticmethod
- async def async_retrieve_encryption_key(
- session: aiohttp.ClientSession, device_mac: str, username: str, password: str
- ) -> dict:
- """Retrieve lock key from internal SwitchBot API."""
- device_mac = device_mac.replace(":", "").replace("-", "").upper()
- try:
- auth_result = await SwitchbotLock.api_request(
- session,
- "account",
- "account/api/v1/user/login",
- {
- "clientId": SWITCHBOT_APP_CLIENT_ID,
- "username": username,
- "password": password,
- "grantType": "password",
- "verifyCode": "",
- },
- )
- auth_headers = {"authorization": auth_result["access_token"]}
- except Exception as err:
- raise SwitchbotAuthenticationError(f"Authentication failed: {err}") from err
- try:
- userinfo = await SwitchbotLock.api_request(
- session, "account", "account/api/v1/user/userinfo", {}, auth_headers
- )
- if "botRegion" in userinfo and userinfo["botRegion"] != "":
- region = userinfo["botRegion"]
- else:
- region = "us"
- except Exception as err:
- raise SwitchbotAccountConnectionError(
- f"Failed to retrieve SwitchBot Account user details: {err}"
- ) from err
- try:
- device_info = await SwitchbotLock.api_request(
- session,
- f"wonderlabs.{region}",
- "wonder/keys/v1/communicate",
- {
- "device_mac": device_mac,
- "keyType": "user",
- },
- auth_headers,
- )
- return {
- "key_id": device_info["communicationKey"]["keyId"],
- "encryption_key": device_info["communicationKey"]["key"],
- }
- except Exception as err:
- raise SwitchbotAccountConnectionError(
- f"Failed to retrieve encryption key from SwitchBot Account: {err}"
- ) from err
- async def lock(self) -> bool:
- """Send lock command."""
- return await self._lock_unlock(
- COMMAND_LOCK, {LockStatus.LOCKED, LockStatus.LOCKING}
- )
- async def unlock(self) -> bool:
- """Send unlock command. If unlatch feature is enabled in EU firmware, also unlatches door"""
- return await self._lock_unlock(
- COMMAND_UNLOCK, {LockStatus.UNLOCKED, LockStatus.UNLOCKING}
- )
- async def unlock_without_unlatch(self) -> bool:
- """Send unlock command. This command will not unlatch the door."""
- return await self._lock_unlock(
- COMMAND_UNLOCK_WITHOUT_UNLATCH,
- {LockStatus.UNLOCKED, LockStatus.UNLOCKING, LockStatus.NOT_FULLY_LOCKED},
- )
- def _parse_basic_data(self, basic_data: bytes) -> dict[str, Any]:
- """Parse basic data from lock."""
- return {
- "battery": basic_data[1],
- "firmware": basic_data[2] / 10.0,
- }
- async def _lock_unlock(
- self, command: str, ignore_statuses: set[LockStatus]
- ) -> bool:
- status = self.get_lock_status()
- if status is None:
- await self.update()
- status = self.get_lock_status()
- if status in ignore_statuses:
- return True
- await self._enable_notifications()
- result = await self._send_command(command)
- status = self._check_command_result(result, 0, COMMAND_RESULT_EXPECTED_VALUES)
- # Also update the battery and firmware version
- if basic_data := await self._get_basic_info():
- self._last_full_update = time.monotonic()
- if len(basic_data) >= 3:
- self._update_parsed_data(self._parse_basic_data(basic_data))
- else:
- _LOGGER.warning("Invalid basic data received: %s", basic_data)
- self._fire_callbacks()
- return status
- async def get_basic_info(self) -> dict[str, Any] | None:
- """Get device basic status."""
- lock_raw_data = await self._get_lock_info()
- if not lock_raw_data:
- return None
- basic_data = await self._get_basic_info()
- if not basic_data:
- return None
- return self._parse_lock_data(lock_raw_data[1:]) | self._parse_basic_data(
- basic_data
- )
- def is_calibrated(self) -> Any:
- """Return True if lock is calibrated."""
- return self._get_adv_value("calibration")
- def get_lock_status(self) -> LockStatus:
- """Return lock status."""
- return self._get_adv_value("status")
- def is_door_open(self) -> bool:
- """Return True if door is open."""
- return self._get_adv_value("door_open")
- def is_unclosed_alarm_on(self) -> bool:
- """Return True if unclosed door alarm is on."""
- return self._get_adv_value("unclosed_alarm")
- def is_unlocked_alarm_on(self) -> bool:
- """Return True if lock unlocked alarm is on."""
- return self._get_adv_value("unlocked_alarm")
- def is_auto_lock_paused(self) -> bool:
- """Return True if auto lock is paused."""
- return self._get_adv_value("auto_lock_paused")
- def is_night_latch_enabled(self) -> bool:
- """Return True if Night Latch is enabled on EU firmware."""
- return self._get_adv_value("night_latch")
- async def _get_lock_info(self) -> bytes | None:
- """Return lock info of device."""
- _data = await self._send_command(key=COMMAND_LOCK_INFO, retry=self._retry_count)
- if not self._check_command_result(_data, 0, COMMAND_RESULT_EXPECTED_VALUES):
- _LOGGER.error("Unsuccessful, please try again")
- return None
- return _data
- async def _enable_notifications(self) -> bool:
- if self._notifications_enabled:
- return True
- result = await self._send_command(COMMAND_ENABLE_NOTIFICATIONS)
- if self._check_command_result(result, 0, COMMAND_RESULT_EXPECTED_VALUES):
- self._notifications_enabled = True
- return self._notifications_enabled
- async def _disable_notifications(self) -> bool:
- if not self._notifications_enabled:
- return True
- result = await self._send_command(COMMAND_DISABLE_NOTIFICATIONS)
- if self._check_command_result(result, 0, COMMAND_RESULT_EXPECTED_VALUES):
- self._notifications_enabled = False
- return not self._notifications_enabled
- def _notification_handler(self, _sender: int, data: bytearray) -> None:
- if self._notifications_enabled and self._check_command_result(data, 0, {0xF}):
- self._update_lock_status(data)
- else:
- super()._notification_handler(_sender, data)
- def _update_lock_status(self, data: bytearray) -> None:
- lock_data = self._parse_lock_data(self._decrypt(data[4:]))
- if self._update_parsed_data(lock_data):
- # We leave notifications enabled in case
- # the lock is operated manually before we
- # disconnect.
- self._reset_disconnect_timer()
- self._fire_callbacks()
- @staticmethod
- def _parse_lock_data(data: bytes) -> dict[str, Any]:
- return {
- "calibration": bool(data[0] & 0b10000000),
- "status": LockStatus((data[0] & 0b01110000) >> 4),
- "door_open": bool(data[0] & 0b00000100),
- "unclosed_alarm": bool(data[1] & 0b00100000),
- "unlocked_alarm": bool(data[1] & 0b00010000),
- }
- async def _send_command(
- self, key: str, retry: int | None = None, encrypt: bool = True
- ) -> bytes | None:
- if not encrypt:
- return await super()._send_command(key[:2] + "000000" + key[2:], retry)
- result = await self._ensure_encryption_initialized()
- if not result:
- _LOGGER.error("Failed to initialize encryption")
- return None
- encrypted = (
- key[:2] + self._key_id + self._iv[0:2].hex() + self._encrypt(key[2:])
- )
- result = await super()._send_command(encrypted, retry)
- return result[:1] + self._decrypt(result[4:])
- async def _ensure_encryption_initialized(self) -> bool:
- if self._iv is not None:
- return True
- result = await self._send_command(
- COMMAND_GET_CK_IV + self._key_id, encrypt=False
- )
- ok = self._check_command_result(result, 0, COMMAND_RESULT_EXPECTED_VALUES)
- if ok:
- self._iv = result[4:]
- return ok
- async def _execute_disconnect(self) -> None:
- await super()._execute_disconnect()
- self._iv = None
- self._cipher = None
- self._notifications_enabled = False
- def _get_cipher(self) -> Cipher:
- if self._cipher is None:
- self._cipher = Cipher(
- algorithms.AES128(self._encryption_key), modes.CTR(self._iv)
- )
- return self._cipher
- def _encrypt(self, data: str) -> str:
- if len(data) == 0:
- return ""
- encryptor = self._get_cipher().encryptor()
- return (encryptor.update(bytearray.fromhex(data)) + encryptor.finalize()).hex()
- def _decrypt(self, data: bytearray) -> bytes:
- if len(data) == 0:
- return b""
- decryptor = self._get_cipher().decryptor()
- return decryptor.update(data) + decryptor.finalize()
|