123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- import asyncio
- import logging
- import time
- from typing import Any
- from bleak.backends.device import BLEDevice
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
- from ..const import SwitchbotModel
- from ..models import SwitchBotAdvertisement
- from .device import SwitchbotDevice
- _LOGGER = logging.getLogger(__name__)
- COMMAND_HEADER = "57"
- COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
- COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
- COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
- COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
- COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
- PASSIVE_POLL_INTERVAL = 10 * 60
- class SwitchbotRelaySwitch(SwitchbotDevice):
- """Representation of a Switchbot relay switch 1pm."""
- def __init__(
- self,
- device: BLEDevice,
- key_id: str,
- encryption_key: str,
- interface: int = 0,
- model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
- **kwargs: Any,
- ) -> None:
- if len(key_id) == 0:
- raise ValueError("key_id is missing")
- elif len(key_id) != 2:
- raise ValueError("key_id is invalid")
- if len(encryption_key) == 0:
- raise ValueError("encryption_key is missing")
- elif len(encryption_key) != 32:
- raise ValueError("encryption_key is invalid")
- self._iv = None
- self._cipher = None
- self._key_id = key_id
- self._encryption_key = bytearray.fromhex(encryption_key)
- self._model: SwitchbotModel = model
- self._force_next_update = False
- super().__init__(device, None, interface, **kwargs)
- def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
- """Update device data from advertisement."""
- # Obtain voltage and current through command.
- adv_data = advertisement.data["data"]
- if previous_voltage := self._get_adv_value("voltage"):
- adv_data["voltage"] = previous_voltage
- if previous_current := self._get_adv_value("current"):
- adv_data["current"] = previous_current
- current_state = self._get_adv_value("sequence_number")
- super().update_from_advertisement(advertisement)
- new_state = self._get_adv_value("sequence_number")
- _LOGGER.debug(
- "%s: update advertisement: %s (seq before: %s) (seq after: %s)",
- self.name,
- advertisement,
- current_state,
- new_state,
- )
- if current_state != new_state:
- self._force_next_update = True
- async def update(self, interface: int | None = None) -> None:
- """Update state of device."""
- if info := await self.get_voltage_and_current():
- self._last_full_update = time.monotonic()
- self._update_parsed_data(info)
- self._fire_callbacks()
- async def get_voltage_and_current(self) -> dict[str, Any] | None:
- """Get voltage and current because advtisement don't have these"""
- result = await self._send_command(COMMAND_GET_VOLTAGE_AND_CURRENT)
- ok = self._check_command_result(result, 0, {1})
- if ok:
- return {
- "voltage": ((result[9] << 8) + result[10]) / 10,
- "current": (result[11] << 8) + result[12],
- }
- return None
- def poll_needed(self, seconds_since_last_poll: float | None) -> bool:
- """Return if device needs polling."""
- if self._force_next_update:
- self._force_next_update = False
- return True
- if (
- seconds_since_last_poll is not None
- and seconds_since_last_poll < PASSIVE_POLL_INTERVAL
- ):
- return False
- time_since_last_full_update = time.monotonic() - self._last_full_update
- if time_since_last_full_update < PASSIVE_POLL_INTERVAL:
- return False
- return True
- async def turn_on(self) -> bool:
- """Turn device on."""
- result = await self._send_command(COMMAND_TURN_ON)
- ok = self._check_command_result(result, 0, {1})
- if ok:
- self._override_state({"isOn": True})
- self._fire_callbacks()
- return ok
- async def turn_off(self) -> bool:
- """Turn device off."""
- result = await self._send_command(COMMAND_TURN_OFF)
- ok = self._check_command_result(result, 0, {1})
- if ok:
- self._override_state({"isOn": False})
- self._fire_callbacks()
- return ok
- async def async_toggle(self, **kwargs) -> bool:
- """Toggle device."""
- result = await self._send_command(COMMAND_TOGGLE)
- status = self._check_command_result(result, 0, {1})
- return status
- def is_on(self) -> bool | None:
- """Return switch state from cache."""
- return self._get_adv_value("isOn")
- async def _send_command(
- self, key: str, retry: int | None = None, encrypt: bool = True
- ) -> bytes | None:
- if not encrypt:
- return await super()._send_command(key[:2] + "000000" + key[2:], retry)
- result = await self._ensure_encryption_initialized()
- if not result:
- return None
- encrypted = (
- key[:2] + self._key_id + self._iv[0:2].hex() + self._encrypt(key[2:])
- )
- result = await super()._send_command(encrypted, retry)
- return result[:1] + self._decrypt(result[4:])
- async def _ensure_encryption_initialized(self) -> bool:
- if self._iv is not None:
- return True
- result = await self._send_command(
- COMMAND_GET_CK_IV + self._key_id, encrypt=False
- )
- ok = self._check_command_result(result, 0, {1})
- if ok:
- self._iv = result[4:]
- return ok
- async def _execute_disconnect(self) -> None:
- await super()._execute_disconnect()
- self._iv = None
- self._cipher = None
- def _get_cipher(self) -> Cipher:
- if self._cipher is None:
- self._cipher = Cipher(
- algorithms.AES128(self._encryption_key), modes.CTR(self._iv)
- )
- return self._cipher
- def _encrypt(self, data: str) -> str:
- if len(data) == 0:
- return ""
- encryptor = self._get_cipher().encryptor()
- return (encryptor.update(bytearray.fromhex(data)) + encryptor.finalize()).hex()
- def _decrypt(self, data: bytearray) -> bytes:
- if len(data) == 0:
- return b""
- decryptor = self._get_cipher().decryptor()
- return decryptor.update(data) + decryptor.finalize()
|