123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- """Library to handle connection with Switchbot."""
- from __future__ import annotations
- import asyncio
- import binascii
- import logging
- from typing import Any
- from uuid import UUID
- import async_timeout
- import bleak
- from bleak.backends.device import BLEDevice
- from bleak.backends.service import BleakGATTServiceCollection
- from bleak_retry_connector import (
- BleakClientWithServiceCache,
- ble_device_has_changed,
- establish_connection,
- )
- from ..const import DEFAULT_RETRY_COUNT, DEFAULT_SCAN_TIMEOUT
- from ..discovery import GetSwitchbotDevices
- from ..models import SwitchBotAdvertisement
- _LOGGER = logging.getLogger(__name__)
- # Keys common to all device types
- DEVICE_GET_BASIC_SETTINGS_KEY = "5702"
- DEVICE_SET_MODE_KEY = "5703"
- DEVICE_SET_EXTENDED_KEY = "570f"
- # Base key when encryption is set
- KEY_PASSWORD_PREFIX = "571"
- def _sb_uuid(comms_type: str = "service") -> UUID | str:
- """Return Switchbot UUID."""
- _uuid = {"tx": "002", "rx": "003", "service": "d00"}
- if comms_type in _uuid:
- return UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b")
- return "Incorrect type, choose between: tx, rx or service"
- class SwitchbotDevice:
- """Base Representation of a Switchbot Device."""
- def __init__(
- self,
- device: BLEDevice,
- password: str | None = None,
- interface: int = 0,
- **kwargs: Any,
- ) -> None:
- """Switchbot base class constructor."""
- self._interface = f"hci{interface}"
- self._device = device
- self._sb_adv_data: SwitchBotAdvertisement | None = None
- self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
- self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
- self._connect_lock = asyncio.Lock()
- if password is None or password == "":
- self._password_encoded = None
- else:
- self._password_encoded = "%08x" % (
- binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
- )
- self._cached_services: BleakGATTServiceCollection | None = None
- def _commandkey(self, key: str) -> str:
- """Add password to key if set."""
- if self._password_encoded is None:
- return key
- key_action = key[3]
- key_suffix = key[4:]
- return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
- async def _sendcommand(self, key: str, retry: int) -> bytes:
- """Send command to device and read response."""
- command = bytearray.fromhex(self._commandkey(key))
- _LOGGER.debug("Sending command to switchbot %s", command)
- max_attempts = retry + 1
- async with self._connect_lock:
- for attempt in range(max_attempts):
- try:
- return await self._send_command_locked(key, command)
- except (bleak.BleakError, asyncio.exceptions.TimeoutError):
- if attempt == retry:
- _LOGGER.error(
- "Switchbot communication failed. Stopping trying",
- exc_info=True,
- )
- return b"\x00"
- _LOGGER.debug("Switchbot communication failed with:", exc_info=True)
- raise RuntimeError("Unreachable")
- @property
- def name(self) -> str:
- """Return device name."""
- return f"{self._device.name} ({self._device.address})"
- async def _send_command_locked(self, key: str, command: bytes) -> bytes:
- """Send command to device and read response."""
- client: BleakClientWithServiceCache | None = None
- try:
- _LOGGER.debug("%s: Connnecting to switchbot", self.name)
- client = await establish_connection(
- BleakClientWithServiceCache,
- self._device,
- self.name,
- max_attempts=1,
- cached_services=self._cached_services,
- )
- self._cached_services = client.services
- _LOGGER.debug(
- "%s: Connnected to switchbot: %s", self.name, client.is_connected
- )
- read_char = client.services.get_characteristic(_sb_uuid(comms_type="rx"))
- write_char = client.services.get_characteristic(_sb_uuid(comms_type="tx"))
- future: asyncio.Future[bytearray] = asyncio.Future()
- def _notification_handler(_sender: int, data: bytearray) -> None:
- """Handle notification responses."""
- if future.done():
- _LOGGER.debug("%s: Notification handler already done", self.name)
- return
- future.set_result(data)
- _LOGGER.debug("%s: Subscribe to notifications", self.name)
- await client.start_notify(read_char, _notification_handler)
- _LOGGER.debug("%s: Sending command, %s", self.name, key)
- await client.write_gatt_char(write_char, command, False)
- async with async_timeout.timeout(5):
- notify_msg = await future
- _LOGGER.info("%s: Notification received: %s", self.name, notify_msg)
- _LOGGER.debug("%s: UnSubscribe to notifications", self.name)
- await client.stop_notify(read_char)
- finally:
- if client:
- await client.disconnect()
- if notify_msg == b"\x07":
- _LOGGER.error("Password required")
- elif notify_msg == b"\t":
- _LOGGER.error("Password incorrect")
- return notify_msg
- def get_address(self) -> str:
- """Return address of device."""
- return self._device.address
- def _get_adv_value(self, key: str) -> Any:
- """Return value from advertisement data."""
- if not self._sb_adv_data:
- return None
- return self._sb_adv_data.data["data"][key]
- def get_battery_percent(self) -> Any:
- """Return device battery level in percent."""
- return self._get_adv_value("battery")
- def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
- """Update device data from advertisement."""
- self._sb_adv_data = advertisement
- if self._device and ble_device_has_changed(self._device, advertisement.device):
- self._cached_services = None
- self._device = advertisement.device
- async def get_device_data(
- self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
- ) -> dict | None:
- """Find switchbot devices and their advertisement data."""
- if interface:
- _interface: int = interface
- else:
- _interface = int(self._interface.replace("hci", ""))
- _data = await GetSwitchbotDevices(interface=_interface).discover(
- retry=retry, scan_timeout=self._scan_timeout
- )
- if self._device.address in _data:
- self._sb_adv_data = _data[self._device.address]
- return self._sb_adv_data
- async def _get_basic_info(self) -> dict | None:
- """Return basic info of device."""
- _data = await self._sendcommand(
- key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
- )
- if _data in (b"\x07", b"\x00"):
- _LOGGER.error("Unsuccessful, please try again")
- return None
- return _data
|