123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604 |
- """Library to handle connection with Switchbot."""
- from __future__ import annotations
- import binascii
- import logging
- import threading
- import time
- from typing import Any
- import bluepy
- DEFAULT_RETRY_COUNT = 3
- DEFAULT_RETRY_TIMEOUT = 1
- DEFAULT_SCAN_TIMEOUT = 5
- UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
- HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
- NOTIFICATION_HANDLE = "cba20003-224d-11e6-9fb8-0002a5d5c51b"
- KEY_PASSWORD_PREFIX = "5711"
- KEY_PASSWORD_NOTIFY_PREFIX = "5712"
- PRESS_KEY = "570100"
- ON_KEY = "570101"
- OFF_KEY = "570102"
- OPEN_KEY = "570f450105ff00" # 570F4501010100
- CLOSE_KEY = "570f450105ff64" # 570F4501010164
- POSITION_KEY = "570F450105ff" # +actual_position ex: 570F450105ff32 for 50%
- STOP_KEY = "570F450100ff"
- DEVICE_BASIC_SETTINGS_KEY = "5702"
- ON_KEY_SUFFIX = "01"
- OFF_KEY_SUFFIX = "02"
- PRESS_KEY_SUFFIX = "00"
- _LOGGER = logging.getLogger(__name__)
- CONNECT_LOCK = threading.Lock()
- def _process_wohand(data: bytes) -> dict[str, bool | int]:
- """Process woHand/Bot services data."""
- _bot_data: dict[str, bool | int] = {}
- # 128 switch or 0 press.
- _bot_data["switchMode"] = bool(data[1] & 0b10000000)
- # 64 off or 0 for on, if not inversed in app.
- if _bot_data["switchMode"]:
- _bot_data["isOn"] = not bool(data[1] & 0b01000000)
- else:
- _bot_data["isOn"] = False
- _bot_data["battery"] = data[2] & 0b01111111
- return _bot_data
- def _process_wocurtain(data: bytes, reverse: bool = True) -> dict[str, bool | int]:
- """Process woCurtain/Curtain services data."""
- _curtain_data: dict[str, bool | int] = {}
- _curtain_data["calibration"] = bool(data[1] & 0b01000000)
- _curtain_data["battery"] = data[2] & 0b01111111
- _curtain_data["inMotion"] = bool(data[3] & 0b10000000)
- _position = max(min(data[3] & 0b01111111, 100), 0)
- _curtain_data["position"] = (100 - _position) if reverse else _position
- # light sensor level (1-10)
- _curtain_data["lightLevel"] = (data[4] >> 4) & 0b00001111
- _curtain_data["deviceChain"] = data[4] & 0b00000111
- return _curtain_data
- def _process_wosensorth(data: bytes) -> dict[str, Any]:
- """Process woSensorTH/Temp sensor services data."""
- _wosensorth_data: dict[str, Any] = {}
- _temp_sign = 1 if data[4] & 0b10000000 else -1
- _temp_c = _temp_sign * ((data[4] & 0b01111111) + (data[3] / 10))
- _temp_f = (_temp_c * 9 / 5) + 32
- _temp_f = (_temp_f * 10) / 10
- _wosensorth_data["temp"] = {}
- _wosensorth_data["temp"]["c"] = _temp_c
- _wosensorth_data["temp"]["f"] = _temp_f
- _wosensorth_data["fahrenheit"] = bool(data[5] & 0b10000000)
- _wosensorth_data["humidity"] = data[5] & 0b01111111
- _wosensorth_data["battery"] = data[2] & 0b01111111
- return _wosensorth_data
- class GetSwitchbotDevices:
- """Scan for all Switchbot devices and return by type."""
- def __init__(self, interface: int | None = None) -> None:
- """Get switchbot devices class constructor."""
- self._interface = interface
- self._all_services_data: dict[str, Any] = {}
- def discover(
- self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT
- ) -> dict | None:
- """Find switchbot devices and their advertisement data."""
- devices = None
- try:
- devices = bluepy.btle.Scanner(self._interface).scan(scan_timeout)
- except bluepy.btle.BTLEManagementError:
- _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
- if devices is None:
- if retry < 1:
- _LOGGER.error(
- "Scanning for Switchbot devices failed. Stop trying", exc_info=True
- )
- return None
- _LOGGER.warning(
- "Error scanning for Switchbot devices. Retrying (remaining: %d)",
- retry,
- )
- time.sleep(DEFAULT_RETRY_TIMEOUT)
- return self.discover(retry - 1, scan_timeout)
- for dev in devices:
- if dev.getValueText(7) == UUID:
- dev_id = dev.addr.replace(":", "")
- self._all_services_data[dev_id] = {}
- self._all_services_data[dev_id]["mac_address"] = dev.addr
- for (adtype, desc, value) in dev.getScanData():
- if adtype == 22:
- _data = bytes.fromhex(value[4:])
- _model = chr(_data[0] & 0b01111111)
- if _model == "H":
- self._all_services_data[dev_id]["data"] = _process_wohand(
- _data
- )
- self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
- self._all_services_data[dev_id]["isEncrypted"] = bool(
- _data[0] & 0b10000000
- )
- self._all_services_data[dev_id]["model"] = _model
- self._all_services_data[dev_id]["modelName"] = "WoHand"
- elif _model == "c":
- self._all_services_data[dev_id][
- "data"
- ] = _process_wocurtain(_data)
- self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
- self._all_services_data[dev_id]["isEncrypted"] = bool(
- _data[0] & 0b10000000
- )
- self._all_services_data[dev_id]["model"] = _model
- self._all_services_data[dev_id]["modelName"] = "WoCurtain"
- elif _model == "T":
- self._all_services_data[dev_id][
- "data"
- ] = _process_wosensorth(_data)
- self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
- self._all_services_data[dev_id]["isEncrypted"] = bool(
- _data[0] & 0b10000000
- )
- self._all_services_data[dev_id]["model"] = _model
- self._all_services_data[dev_id]["modelName"] = "WoSensorTH"
- else:
- continue
- else:
- self._all_services_data[dev_id][desc] = value
- return self._all_services_data
- def get_curtains(self) -> dict:
- """Return all WoCurtain/Curtains devices with services data."""
- if not self._all_services_data:
- self.discover()
- _curtain_devices = {}
- for device, data in self._all_services_data.items():
- if data.get("model") == "c":
- _curtain_devices[device] = data
- return _curtain_devices
- def get_bots(self) -> dict:
- """Return all WoHand/Bot devices with services data."""
- if not self._all_services_data:
- self.discover()
- _bot_devices = {}
- for device, data in self._all_services_data.items():
- if data.get("model") == "H":
- _bot_devices[device] = data
- return _bot_devices
- def get_tempsensors(self) -> dict:
- """Return all WoSensorTH/Temp sensor devices with services data."""
- if not self._all_services_data:
- self.discover()
- _bot_temp = {}
- for device, data in self._all_services_data.items():
- if data.get("model") == "T":
- _bot_temp[device] = data
- return _bot_temp
- def get_device_data(self, mac: str) -> dict:
- """Return data for specific device."""
- if not self._all_services_data:
- self.discover()
- _switchbot_data = {}
- for device in self._all_services_data.values():
- if device["mac_address"] == mac:
- _switchbot_data = device
- return _switchbot_data
- class SwitchbotDevice:
- """Base Representation of a Switchbot Device."""
- def __init__(
- self,
- mac: str,
- password: str | None = None,
- interface: int | None = None,
- **kwargs: Any,
- ) -> None:
- """Switchbot base class constructor."""
- self._interface = interface
- self._mac = mac
- self._device = bluepy.btle.Peripheral(
- deviceAddr=None, addrType=bluepy.btle.ADDR_TYPE_RANDOM, iface=interface
- )
- self._switchbot_device_data: dict[str, Any] = {}
- self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
- self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
- if password is None or password == "":
- self._password_encoded = None
- else:
- self._password_encoded = "%x" % (
- binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
- )
- def _connect(self) -> None:
- try:
- _LOGGER.debug("Connecting to Switchbot")
- self._device.connect(
- self._mac, bluepy.btle.ADDR_TYPE_RANDOM, self._interface
- )
- _LOGGER.debug("Connected to Switchbot")
- except bluepy.btle.BTLEException:
- _LOGGER.debug("Failed connecting to Switchbot", exc_info=True)
- raise
- def _disconnect(self) -> None:
- _LOGGER.debug("Disconnecting")
- try:
- self._device.disconnect()
- except bluepy.btle.BTLEException:
- _LOGGER.warning("Error disconnecting from Switchbot", exc_info=True)
- def _commandkey(self, key: str) -> str:
- if self._password_encoded is None:
- return key
- key_suffix = PRESS_KEY_SUFFIX
- if key == ON_KEY:
- key_suffix = ON_KEY_SUFFIX
- elif key == OFF_KEY:
- key_suffix = OFF_KEY_SUFFIX
- elif key == DEVICE_BASIC_SETTINGS_KEY:
- return KEY_PASSWORD_NOTIFY_PREFIX + self._password_encoded
- return KEY_PASSWORD_PREFIX + self._password_encoded + key_suffix
- def _writekey(self, key: str) -> Any:
- _LOGGER.debug("Prepare to send")
- hand = self._device.getCharacteristics(uuid=HANDLE)[0]
- _LOGGER.debug("Sending command, %s", key)
- write_result = hand.write(binascii.a2b_hex(key), withResponse=False)
- if not write_result:
- _LOGGER.error(
- "Sent command but didn't get a response from Switchbot confirming command was sent."
- " Please check the Switchbot"
- )
- else:
- _LOGGER.info("Successfully sent command to Switchbot (MAC: %s)", self._mac)
- return write_result
- def _subscribe(self, key: str) -> Any:
- _LOGGER.debug("Subscribe to notifications")
- handle = self._device.getCharacteristics(uuid=NOTIFICATION_HANDLE)[0]
- notify_handle = handle.getHandle() + 1
- response = self._device.writeCharacteristic(
- notify_handle, binascii.a2b_hex(key), withResponse=False
- )
- return response
- def _readkey(self) -> bytes | None:
- _LOGGER.debug("Prepare to read")
- receive_handle = self._device.getCharacteristics(uuid=NOTIFICATION_HANDLE)
- if receive_handle:
- for char in receive_handle:
- read_result: bytes = char.read()
- return read_result
- return None
- def _sendcommand(self, key: str, retry: int) -> bool:
- send_success = False
- command = self._commandkey(key)
- _LOGGER.debug("Sending command to switchbot %s", command)
- with CONNECT_LOCK:
- try:
- self._connect()
- send_success = self._writekey(command)
- except bluepy.btle.BTLEException:
- _LOGGER.warning("Error talking to Switchbot", exc_info=True)
- finally:
- self._disconnect()
- if send_success:
- return True
- if retry < 1:
- _LOGGER.error(
- "Switchbot communication failed. Stopping trying", exc_info=True
- )
- return False
- _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
- time.sleep(DEFAULT_RETRY_TIMEOUT)
- return self._sendcommand(key, retry - 1)
- def get_mac(self) -> str:
- """Return mac address of device."""
- return self._mac
- def get_battery_percent(self) -> Any:
- """Return device battery level in percent."""
- if not self._switchbot_device_data:
- return None
- return self._switchbot_device_data["data"]["battery"]
- def get_device_data(
- self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
- ) -> dict | None:
- """Find switchbot devices and their advertisement data."""
- if interface:
- _interface: int | None = interface
- else:
- _interface = self._interface
- devices = None
- try:
- devices = bluepy.btle.Scanner(_interface).scan(self._scan_timeout)
- except bluepy.btle.BTLEManagementError:
- _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
- if devices is None:
- if retry < 1:
- _LOGGER.error(
- "Scanning for Switchbot devices failed. Stop trying", exc_info=True
- )
- return None
- _LOGGER.warning(
- "Error scanning for Switchbot devices. Retrying (remaining: %d)",
- retry,
- )
- time.sleep(DEFAULT_RETRY_TIMEOUT)
- return self.get_device_data(retry=retry - 1, interface=_interface)
- for dev in devices:
- if self._mac.lower() == dev.addr.lower():
- self._switchbot_device_data["mac_address"] = dev.addr
- for (adtype, desc, value) in dev.getScanData():
- if adtype == 22:
- _data = bytes.fromhex(value[4:])
- _model = chr(_data[0] & 0b01111111)
- if _model == "H":
- self._switchbot_device_data["data"] = _process_wohand(_data)
- self._switchbot_device_data["data"]["rssi"] = dev.rssi
- self._switchbot_device_data["isEncrypted"] = bool(
- _data[0] & 0b10000000
- )
- self._switchbot_device_data["model"] = _model
- self._switchbot_device_data["modelName"] = "WoHand"
- elif _model == "c":
- self._switchbot_device_data["data"] = _process_wocurtain(
- _data
- )
- self._switchbot_device_data["data"]["rssi"] = dev.rssi
- self._switchbot_device_data["isEncrypted"] = bool(
- _data[0] & 0b10000000
- )
- self._switchbot_device_data["model"] = _model
- self._switchbot_device_data["modelName"] = "WoCurtain"
- elif _model == "T":
- self._switchbot_device_data["data"] = _process_wosensorth(
- _data
- )
- self._switchbot_device_data["data"]["rssi"] = dev.rssi
- self._switchbot_device_data["isEncrypted"] = bool(
- _data[0] & 0b10000000
- )
- self._switchbot_device_data["model"] = _model
- self._switchbot_device_data["modelName"] = "WoSensorTH"
- else:
- continue
- else:
- self._switchbot_device_data[desc] = value
- return self._switchbot_device_data
- def _get_basic_info(self, retry: int = DEFAULT_RETRY_COUNT) -> bytes:
- """Get device basic settings."""
- send_success = False
- command = self._commandkey(DEVICE_BASIC_SETTINGS_KEY)
- try:
- self._connect()
- self._subscribe(command)
- send_success = self._writekey(command)
- value = self._readkey()
- except bluepy.btle.BTLEException:
- _LOGGER.warning("Error talking to Switchbot", exc_info=True)
- finally:
- self._disconnect()
- if send_success and value:
- print("Successfully retrieved data from device " + str(self._mac))
- return value
- if retry < 1:
- _LOGGER.error(
- "Switchbot communication failed. Stopping trying", exc_info=True
- )
- return bytes(0)
- _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
- time.sleep(DEFAULT_RETRY_TIMEOUT)
- return self._get_basic_info(retry - 1)
- class Switchbot(SwitchbotDevice):
- """Representation of a Switchbot."""
- def __init__(self, *args: Any, **kwargs: Any) -> None:
- """Switchbot Bot/WoHand constructor."""
- super().__init__(*args, **kwargs)
- self._inverse: bool = kwargs.pop("inverse_mode", False)
- self._settings: dict[str, Any] = {}
- def update(self, interface: int | None = None) -> None:
- """Update mode, battery percent and state of device."""
- self.get_device_data(retry=self._retry_count, interface=interface)
- def turn_on(self) -> bool:
- """Turn device on."""
- return self._sendcommand(ON_KEY, self._retry_count)
- def turn_off(self) -> bool:
- """Turn device off."""
- return self._sendcommand(OFF_KEY, self._retry_count)
- def press(self) -> bool:
- """Press command to device."""
- return self._sendcommand(PRESS_KEY, self._retry_count)
- def get_basic_info(self) -> dict[str, Any]:
- """Get device basic settings."""
- settings_data = self._get_basic_info()
- self._settings["battery"] = settings_data[1]
- self._settings["firmware"] = settings_data[2] / 10.0
- self._settings["timers"] = settings_data[8]
- self._settings["dualStateMode"] = bool(settings_data[9] & 16)
- self._settings["inverseDirection"] = bool(settings_data[9] & 1)
- self._settings["holdSeconds"] = settings_data[10]
- return self._settings
- def switch_mode(self) -> Any:
- """Return true or false from cache."""
- # To get actual position call update() first.
- if not self._switchbot_device_data:
- return None
- return self._switchbot_device_data["data"]["switchMode"]
- def is_on(self) -> Any:
- """Return switch state from cache."""
- # To get actual position call update() first.
- if not self._switchbot_device_data:
- return None
- if self._inverse:
- return not self._switchbot_device_data["data"]["isOn"]
- return self._switchbot_device_data["data"]["isOn"]
- class SwitchbotCurtain(SwitchbotDevice):
- """Representation of a Switchbot Curtain."""
- def __init__(self, *args: Any, **kwargs: Any) -> None:
- """Switchbot Curtain/WoCurtain constructor."""
- # The position of the curtain is saved returned with 0 = open and 100 = closed.
- # This is independent of the calibration of the curtain bot (Open left to right/
- # Open right to left/Open from the middle).
- # The parameter 'reverse_mode' reverse these values,
- # if 'reverse_mode' = True, position = 0 equals close
- # and position = 100 equals open. The parameter is default set to True so that
- # the definition of position is the same as in Home Assistant.
- super().__init__(*args, **kwargs)
- self._reverse: bool = kwargs.pop("reverse_mode", True)
- self._settings: dict[str, Any] = {}
- def open(self) -> bool:
- """Send open command."""
- return self._sendcommand(OPEN_KEY, self._retry_count)
- def close(self) -> bool:
- """Send close command."""
- return self._sendcommand(CLOSE_KEY, self._retry_count)
- def stop(self) -> bool:
- """Send stop command to device."""
- return self._sendcommand(STOP_KEY, self._retry_count)
- def set_position(self, position: int) -> bool:
- """Send position command (0-100) to device."""
- position = (100 - position) if self._reverse else position
- hex_position = "%0.2X" % position
- return self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
- def update(self, interface: int | None = None) -> None:
- """Update position, battery percent and light level of device."""
- self.get_device_data(retry=self._retry_count, interface=interface)
- def get_position(self) -> Any:
- """Return cached position (0-100) of Curtain."""
- # To get actual position call update() first.
- if not self._switchbot_device_data:
- return None
- return self._switchbot_device_data["data"]["position"]
- def get_basic_info(self) -> dict[str, Any]:
- """Get device basic settings."""
- settings_data = self._get_basic_info()
- self._settings["battery"] = settings_data[1]
- self._settings["firmware"] = settings_data[2] / 10.0
- self._settings["chainLength"] = settings_data[3]
- self._settings["openDirection"] = (
- "right_to_left" if settings_data[4] & 0b10000000 == 128 else "left_to_right"
- )
- self._settings["touchToOpen"] = bool(settings_data[4] & 0b01000000)
- self._settings["light"] = bool(settings_data[4] & 0b00100000)
- self._settings["fault"] = bool(settings_data[4] & 0b00001000)
- self._settings["solarPanel"] = bool(settings_data[5] & 0b00001000)
- self._settings["calibrated"] = bool(settings_data[5] & 0b00000100)
- self._settings["inMotion"] = bool(settings_data[5] & 0b01000011)
- _position = max(min(settings_data[6], 100), 0)
- self._settings["position"] = (100 - _position) if self._reverse else _position
- self._settings["timers"] = settings_data[7]
- return self._settings
- def get_light_level(self) -> Any:
- """Return cached light level."""
- # To get actual light level call update() first.
- if not self._switchbot_device_data:
- return None
- return self._switchbot_device_data["data"]["lightLevel"]
- def is_reversed(self) -> bool:
- """Return True if the curtain open from left to right."""
- return self._reverse
- def is_calibrated(self) -> Any:
- """Return True curtain is calibrated."""
- # To get actual light level call update() first.
- if not self._switchbot_device_data:
- return None
- return self._switchbot_device_data["data"]["calibration"]
|