|
@@ -4,8 +4,10 @@ from __future__ import annotations
|
|
|
import asyncio
|
|
|
import binascii
|
|
|
import logging
|
|
|
+import time
|
|
|
+from dataclasses import replace
|
|
|
from enum import Enum
|
|
|
-from typing import Any, Callable
|
|
|
+from typing import Any, Callable, TypeVar, cast
|
|
|
from uuid import UUID
|
|
|
|
|
|
import async_timeout
|
|
@@ -53,6 +55,13 @@ class ColorMode(Enum):
|
|
|
EFFECT = 3
|
|
|
|
|
|
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+PASSIVE_POLL_INTERVAL = 60 * 60 * 24
|
|
|
+
|
|
|
+
|
|
|
class CharacteristicMissingError(Exception):
|
|
|
"""Raised when a characteristic is missing."""
|
|
|
|
|
@@ -76,6 +85,32 @@ READ_CHAR_UUID = _sb_uuid(comms_type="rx")
|
|
|
WRITE_CHAR_UUID = _sb_uuid(comms_type="tx")
|
|
|
|
|
|
|
|
|
+WrapFuncType = TypeVar("WrapFuncType", bound=Callable[..., Any])
|
|
|
+
|
|
|
+
|
|
|
+def update_after_operation(func: WrapFuncType) -> WrapFuncType:
|
|
|
+ """Define a wrapper to update after an operation."""
|
|
|
+
|
|
|
+ async def _async_update_after_operation_wrap(
|
|
|
+ self: SwitchbotBaseDevice, *args: Any, **kwargs: Any
|
|
|
+ ) -> None:
|
|
|
+ ret = await func(self, *args, **kwargs)
|
|
|
+ await self.update()
|
|
|
+ self._fire_callbacks()
|
|
|
+ return ret
|
|
|
+
|
|
|
+ return cast(WrapFuncType, _async_update_after_operation_wrap)
|
|
|
+
|
|
|
+
|
|
|
+def _merge_data(old_data: dict[str, Any], new_data: dict[str, Any]) -> dict[str, Any]:
|
|
|
+ """Merge data but only add None keys if they are missing."""
|
|
|
+ merged = old_data.copy()
|
|
|
+ for key, value in new_data.items():
|
|
|
+ if value is not None or key not in old_data:
|
|
|
+ merged[key] = value
|
|
|
+ return merged
|
|
|
+
|
|
|
+
|
|
|
class SwitchbotBaseDevice:
|
|
|
"""Base Representation of a Switchbot Device."""
|
|
|
|
|
@@ -109,6 +144,7 @@ class SwitchbotBaseDevice:
|
|
|
self.loop = asyncio.get_event_loop()
|
|
|
self._callbacks: list[Callable[[], None]] = []
|
|
|
self._notify_future: asyncio.Future[bytearray] | None = None
|
|
|
+ self._last_full_update: float = -PASSIVE_POLL_INTERVAL
|
|
|
|
|
|
def advertisement_changed(self, advertisement: SwitchBotAdvertisement) -> bool:
|
|
|
"""Check if the advertisement has changed."""
|
|
@@ -190,6 +226,18 @@ class SwitchbotBaseDevice:
|
|
|
"""Return device name."""
|
|
|
return f"{self._device.name} ({self._device.address})"
|
|
|
|
|
|
+ @property
|
|
|
+ def data(self) -> dict[str, Any]:
|
|
|
+ """Return device data."""
|
|
|
+ if self._sb_adv_data:
|
|
|
+ return self._sb_adv_data.data
|
|
|
+ return {}
|
|
|
+
|
|
|
+ @property
|
|
|
+ def parsed_data(self) -> dict[str, Any]:
|
|
|
+ """Return parsed device data."""
|
|
|
+ return self.data.get("data") or {}
|
|
|
+
|
|
|
@property
|
|
|
def rssi(self) -> int:
|
|
|
"""Return RSSI of device."""
|
|
@@ -392,6 +440,7 @@ class SwitchbotBaseDevice:
|
|
|
if self._override_adv_data is None:
|
|
|
self._override_adv_data = {}
|
|
|
self._override_adv_data.update(state)
|
|
|
+ self._update_parsed_data(state)
|
|
|
|
|
|
def _get_adv_value(self, key: str) -> Any:
|
|
|
"""Return value from advertisement data."""
|
|
@@ -466,8 +515,20 @@ class SwitchbotBaseDevice:
|
|
|
|
|
|
return _unsub
|
|
|
|
|
|
- async def update(self) -> None:
|
|
|
- """Update state of device."""
|
|
|
+ async def update(self, interface: int | None = None) -> None:
|
|
|
+ """Update position, battery percent and light level of device."""
|
|
|
+ if info := await self.get_basic_info():
|
|
|
+ self._last_full_update = time.monotonic()
|
|
|
+ self._update_parsed_data(info)
|
|
|
+
|
|
|
+ async def get_basic_info(self) -> dict[str, Any] | None:
|
|
|
+ """Get device basic settings."""
|
|
|
+ if not (_data := await self._get_basic_info()):
|
|
|
+ return None
|
|
|
+ return {
|
|
|
+ "battery": _data[1],
|
|
|
+ "firmware": _data[2] / 10.0,
|
|
|
+ }
|
|
|
|
|
|
def _check_command_result(
|
|
|
self, result: bytes | None, index: int, values: set[int]
|
|
@@ -480,14 +541,35 @@ class SwitchbotBaseDevice:
|
|
|
)
|
|
|
return result[index] in values
|
|
|
|
|
|
+ def _update_parsed_data(self, new_data: dict[str, Any]) -> None:
|
|
|
+ """Update data."""
|
|
|
+ if not self._sb_adv_data:
|
|
|
+ _LOGGER.exception("No advertisement data to update")
|
|
|
+ return
|
|
|
+ self._set_parsed_data(
|
|
|
+ self._sb_adv_data,
|
|
|
+ _merge_data(self._sb_adv_data.data.get("data") or {}, new_data),
|
|
|
+ )
|
|
|
+
|
|
|
+ def _set_parsed_data(
|
|
|
+ self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
|
|
|
+ ) -> None:
|
|
|
+ """Set data."""
|
|
|
+ self._sb_adv_data = replace(
|
|
|
+ advertisement, data=self._sb_adv_data.data | {"data": data}
|
|
|
+ )
|
|
|
+
|
|
|
def _set_advertisement_data(self, advertisement: SwitchBotAdvertisement) -> None:
|
|
|
"""Set advertisement data."""
|
|
|
- if (
|
|
|
- advertisement.data.get("data")
|
|
|
- or not self._sb_adv_data
|
|
|
- or not self._sb_adv_data.data.get("data")
|
|
|
- ):
|
|
|
+ new_data = advertisement.data.get("data") or {}
|
|
|
+ if advertisement.active:
|
|
|
+
|
|
|
+
|
|
|
+ self._last_full_update = time.monotonic()
|
|
|
+ if not self._sb_adv_data:
|
|
|
self._sb_adv_data = advertisement
|
|
|
+ elif new_data:
|
|
|
+ self._update_parsed_data(new_data)
|
|
|
self._override_adv_data = None
|
|
|
|
|
|
def switch_mode(self) -> bool | None:
|
|
@@ -495,6 +577,18 @@ class SwitchbotBaseDevice:
|
|
|
|
|
|
return self._get_adv_value("switchMode")
|
|
|
|
|
|
+ def poll_needed(self, seconds_since_last_poll: float | None) -> bool:
|
|
|
+ """Return if device needs polling."""
|
|
|
+ if (
|
|
|
+ seconds_since_last_poll is not None
|
|
|
+ and seconds_since_last_poll < PASSIVE_POLL_INTERVAL
|
|
|
+ ):
|
|
|
+ return False
|
|
|
+ time_since_last_full_update = time.monotonic() - self._last_full_update
|
|
|
+ if time_since_last_full_update < PASSIVE_POLL_INTERVAL:
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
|
|
|
class SwitchbotDevice(SwitchbotBaseDevice):
|
|
|
"""Base Representation of a Switchbot Device.
|