| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 | 
							- """Library to handle connection with Switchbot Lock."""
 
- from __future__ import annotations
 
- import asyncio
 
- import base64
 
- import hashlib
 
- import hmac
 
- import json
 
- import logging
 
- from typing import Any
 
- import boto3
 
- import requests
 
- from bleak.backends.device import BLEDevice
 
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
 
- from ..api_config import SWITCHBOT_APP_API_BASE_URL, SWITCHBOT_APP_COGNITO_POOL
 
- from ..const import LockStatus
 
- from .device import SwitchbotDevice, SwitchbotOperationError
 
- COMMAND_HEADER = "57"
 
- COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
 
- COMMAND_LOCK_INFO = f"{COMMAND_HEADER}0f4f8101"
 
- COMMAND_UNLOCK = f"{COMMAND_HEADER}0f4e01011080"
 
- COMMAND_LOCK = f"{COMMAND_HEADER}0f4e01011000"
 
- COMMAND_ENABLE_NOTIFICATIONS = f"{COMMAND_HEADER}0e01001e00008101"
 
- COMMAND_DISABLE_NOTIFICATIONS = f"{COMMAND_HEADER}0e00"
 
- MOVING_STATUSES = {LockStatus.LOCKING, LockStatus.UNLOCKING}
 
- BLOCKED_STATUSES = {LockStatus.LOCKING_STOP, LockStatus.UNLOCKING_STOP}
 
- REST_STATUSES = {LockStatus.LOCKED, LockStatus.UNLOCKED, LockStatus.NOT_FULLY_LOCKED}
 
- _LOGGER = logging.getLogger(__name__)
 
- class SwitchbotLock(SwitchbotDevice):
 
-     """Representation of a Switchbot Lock."""
 
-     def __init__(
 
-         self,
 
-         device: BLEDevice,
 
-         key_id: str,
 
-         encryption_key: str,
 
-         interface: int = 0,
 
-         **kwargs: Any,
 
-     ) -> None:
 
-         if len(key_id) == 0:
 
-             raise ValueError("key_id is missing")
 
-         elif len(key_id) != 2:
 
-             raise ValueError("key_id is invalid")
 
-         if len(encryption_key) == 0:
 
-             raise ValueError("encryption_key is missing")
 
-         elif len(encryption_key) != 32:
 
-             raise ValueError("encryption_key is invalid")
 
-         self._iv = None
 
-         self._cipher = None
 
-         self._key_id = key_id
 
-         self._encryption_key = bytearray.fromhex(encryption_key)
 
-         self._notifications_enabled: bool = False
 
-         super().__init__(device, None, interface, **kwargs)
 
-     @staticmethod
 
-     async def verify_encryption_key(
 
-         device: BLEDevice, key_id: str, encryption_key: str
 
-     ) -> bool:
 
-         try:
 
-             lock = SwitchbotLock(
 
-                 device=device, key_id=key_id, encryption_key=encryption_key
 
-             )
 
-         except ValueError:
 
-             return False
 
-         try:
 
-             lock_info = await lock.get_basic_info()
 
-         except SwitchbotOperationError:
 
-             return False
 
-         return lock_info is not None
 
-     @staticmethod
 
-     def retrieve_encryption_key(device_mac: str, username: str, password: str):
 
-         """Retrieve lock key from internal SwitchBot API."""
 
-         device_mac = device_mac.replace(":", "").replace("-", "").upper()
 
-         msg = bytes(username + SWITCHBOT_APP_COGNITO_POOL["AppClientId"], "utf-8")
 
-         secret_hash = base64.b64encode(
 
-             hmac.new(
 
-                 SWITCHBOT_APP_COGNITO_POOL["AppClientSecret"].encode(),
 
-                 msg,
 
-                 digestmod=hashlib.sha256,
 
-             ).digest()
 
-         ).decode()
 
-         cognito_idp_client = boto3.client(
 
-             "cognito-idp", region_name=SWITCHBOT_APP_COGNITO_POOL["Region"]
 
-         )
 
-         try:
 
-             auth_response = cognito_idp_client.initiate_auth(
 
-                 ClientId=SWITCHBOT_APP_COGNITO_POOL["AppClientId"],
 
-                 AuthFlow="USER_PASSWORD_AUTH",
 
-                 AuthParameters={
 
-                     "USERNAME": username,
 
-                     "PASSWORD": password,
 
-                     "SECRET_HASH": secret_hash,
 
-                 },
 
-             )
 
-         except cognito_idp_client.exceptions.NotAuthorizedException as err:
 
-             raise RuntimeError("Failed to authenticate") from err
 
-         except BaseException as err:
 
-             raise RuntimeError("Unexpected error during authentication") from err
 
-         if (
 
-             auth_response is None
 
-             or "AuthenticationResult" not in auth_response
 
-             or "AccessToken" not in auth_response["AuthenticationResult"]
 
-         ):
 
-             raise RuntimeError("Unexpected authentication response")
 
-         access_token = auth_response["AuthenticationResult"]["AccessToken"]
 
-         key_response = requests.post(
 
-             url=SWITCHBOT_APP_API_BASE_URL + "/developStage/keys/v1/communicate",
 
-             headers={"authorization": access_token},
 
-             json={
 
-                 "device_mac": device_mac,
 
-                 "keyType": "user",
 
-             },
 
-             timeout=10,
 
-         )
 
-         key_response_content = json.loads(key_response.content)
 
-         if key_response_content["statusCode"] != 100:
 
-             raise RuntimeError(
 
-                 f"Unexpected status code returned by SwitchBot API: {key_response_content['statusCode']}"
 
-             )
 
-         return {
 
-             "key_id": key_response_content["body"]["communicationKey"]["keyId"],
 
-             "encryption_key": key_response_content["body"]["communicationKey"]["key"],
 
-         }
 
-     async def lock(self) -> bool:
 
-         """Send lock command."""
 
-         return await self._lock_unlock(
 
-             COMMAND_LOCK, {LockStatus.LOCKED, LockStatus.LOCKING}
 
-         )
 
-     async def unlock(self) -> bool:
 
-         """Send unlock command."""
 
-         return await self._lock_unlock(
 
-             COMMAND_UNLOCK, {LockStatus.UNLOCKED, LockStatus.UNLOCKING}
 
-         )
 
-     async def _lock_unlock(
 
-         self, command: str, ignore_statuses: set[LockStatus]
 
-     ) -> bool:
 
-         status = self.get_lock_status()
 
-         if status is None:
 
-             await self.update()
 
-             status = self.get_lock_status()
 
-         if status in ignore_statuses:
 
-             return True
 
-         await self._enable_notifications()
 
-         result = await self._send_command(command)
 
-         if not self._check_command_result(result, 0, {1}):
 
-             return False
 
-         return True
 
-     async def get_basic_info(self) -> dict[str, Any] | None:
 
-         """Get device basic status."""
 
-         lock_raw_data = await self._get_lock_info()
 
-         if not lock_raw_data:
 
-             return None
 
-         basic_data = await self._get_basic_info()
 
-         if not basic_data:
 
-             return None
 
-         lock_data = self._parse_lock_data(lock_raw_data[1:])
 
-         lock_data.update(battery=basic_data[1], firmware=basic_data[2] / 10.0)
 
-         return lock_data
 
-     def is_calibrated(self) -> Any:
 
-         """Return True if lock is calibrated."""
 
-         return self._get_adv_value("calibration")
 
-     def get_lock_status(self) -> LockStatus:
 
-         """Return lock status."""
 
-         return self._get_adv_value("status")
 
-     def is_door_open(self) -> bool:
 
-         """Return True if door is open."""
 
-         return self._get_adv_value("door_open")
 
-     def is_unclosed_alarm_on(self) -> bool:
 
-         """Return True if unclosed door alarm is on."""
 
-         return self._get_adv_value("unclosed_alarm")
 
-     def is_unlocked_alarm_on(self) -> bool:
 
-         """Return True if lock unlocked alarm is on."""
 
-         return self._get_adv_value("unlocked_alarm")
 
-     def is_auto_lock_paused(self) -> bool:
 
-         """Return True if auto lock is paused."""
 
-         return self._get_adv_value("auto_lock_paused")
 
-     async def _get_lock_info(self) -> bytes | None:
 
-         """Return lock info of device."""
 
-         _data = await self._send_command(key=COMMAND_LOCK_INFO, retry=self._retry_count)
 
-         if not self._check_command_result(_data, 0, {1}):
 
-             _LOGGER.error("Unsuccessful, please try again")
 
-             return None
 
-         return _data
 
-     async def _enable_notifications(self) -> bool:
 
-         if self._notifications_enabled:
 
-             return True
 
-         result = await self._send_command(COMMAND_ENABLE_NOTIFICATIONS)
 
-         if self._check_command_result(result, 0, {1}):
 
-             self._notifications_enabled = True
 
-         return self._notifications_enabled
 
-     async def _disable_notifications(self) -> bool:
 
-         if not self._notifications_enabled:
 
-             return True
 
-         result = await self._send_command(COMMAND_DISABLE_NOTIFICATIONS)
 
-         if self._check_command_result(result, 0, {1}):
 
-             self._notifications_enabled = False
 
-         return not self._notifications_enabled
 
-     def _notification_handler(self, _sender: int, data: bytearray) -> None:
 
-         if self._notifications_enabled and self._check_command_result(data, 0, {0xF}):
 
-             self._update_lock_status(data)
 
-         else:
 
-             super()._notification_handler(_sender, data)
 
-     def _update_lock_status(self, data: bytearray) -> None:
 
-         data = self._decrypt(data[4:])
 
-         lock_data = self._parse_lock_data(data)
 
-         current_status = self.get_lock_status()
 
-         if (
 
-             lock_data["status"] != current_status or current_status not in REST_STATUSES
 
-         ) and (
 
-             lock_data["status"] in REST_STATUSES
 
-             or lock_data["status"] in BLOCKED_STATUSES
 
-         ):
 
-             asyncio.create_task(self._disable_notifications())
 
-         self._update_parsed_data(lock_data)
 
-         self._fire_callbacks()
 
-     @staticmethod
 
-     def _parse_lock_data(data: bytes) -> dict[str, Any]:
 
-         return {
 
-             "calibration": bool(data[0] & 0b10000000),
 
-             "status": LockStatus((data[0] & 0b01110000) >> 4),
 
-             "door_open": bool(data[0] & 0b00000100),
 
-             "unclosed_alarm": bool(data[1] & 0b00100000),
 
-             "unlocked_alarm": bool(data[1] & 0b00010000),
 
-         }
 
-     async def _send_command(
 
-         self, key: str, retry: int | None = None, encrypt: bool = True
 
-     ) -> bytes | None:
 
-         if not encrypt:
 
-             return await super()._send_command(key[:2] + "000000" + key[2:], retry)
 
-         result = await self._ensure_encryption_initialized()
 
-         if not result:
 
-             _LOGGER.error("Failed to initialize encryption")
 
-             return None
 
-         encrypted = (
 
-             key[:2] + self._key_id + self._iv[0:2].hex() + self._encrypt(key[2:])
 
-         )
 
-         result = await super()._send_command(encrypted, retry)
 
-         return result[:1] + self._decrypt(result[4:])
 
-     async def _ensure_encryption_initialized(self) -> bool:
 
-         if self._iv is not None:
 
-             return True
 
-         result = await self._send_command(
 
-             COMMAND_GET_CK_IV + self._key_id, encrypt=False
 
-         )
 
-         ok = self._check_command_result(result, 0, {0x01})
 
-         if ok:
 
-             self._iv = result[4:]
 
-         return ok
 
-     async def _execute_disconnect(self) -> None:
 
-         await super()._execute_disconnect()
 
-         self._iv = None
 
-         self._cipher = None
 
-         self._notifications_enabled = False
 
-     def _get_cipher(self) -> Cipher:
 
-         if self._cipher is None:
 
-             self._cipher = Cipher(
 
-                 algorithms.AES128(self._encryption_key), modes.CTR(self._iv)
 
-             )
 
-         return self._cipher
 
-     def _encrypt(self, data: str) -> str:
 
-         if len(data) == 0:
 
-             return ""
 
-         encryptor = self._get_cipher().encryptor()
 
-         return (encryptor.update(bytearray.fromhex(data)) + encryptor.finalize()).hex()
 
-     def _decrypt(self, data: bytearray) -> bytes:
 
-         if len(data) == 0:
 
-             return b""
 
-         decryptor = self._get_cipher().decryptor()
 
-         return decryptor.update(data) + decryptor.finalize()
 
 
  |