__init__.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import asyncio
  4. import binascii
  5. import logging
  6. from dataclasses import dataclass
  7. from typing import Any
  8. from uuid import UUID
  9. import async_timeout
  10. import bleak
  11. from bleak.backends.device import BLEDevice
  12. from bleak.backends.scanner import AdvertisementData
  13. from bleak_retry_connector import BleakClient, establish_connection
  14. DEFAULT_RETRY_COUNT = 3
  15. DEFAULT_RETRY_TIMEOUT = 1
  16. DEFAULT_SCAN_TIMEOUT = 5
  17. # Keys common to all device types
  18. DEVICE_GET_BASIC_SETTINGS_KEY = "5702"
  19. DEVICE_SET_MODE_KEY = "5703"
  20. DEVICE_SET_EXTENDED_KEY = "570f"
  21. # Plug Mini keys
  22. PLUG_ON_KEY = "570f50010180"
  23. PLUG_OFF_KEY = "570f50010100"
  24. # Bot keys
  25. PRESS_KEY = "570100"
  26. ON_KEY = "570101"
  27. OFF_KEY = "570102"
  28. DOWN_KEY = "570103"
  29. UP_KEY = "570104"
  30. # Curtain keys
  31. OPEN_KEY = "570f450105ff00" # 570F4501010100
  32. CLOSE_KEY = "570f450105ff64" # 570F4501010164
  33. POSITION_KEY = "570F450105ff" # +actual_position ex: 570F450105ff32 for 50%
  34. STOP_KEY = "570F450100ff"
  35. CURTAIN_EXT_SUM_KEY = "570f460401"
  36. CURTAIN_EXT_ADV_KEY = "570f460402"
  37. CURTAIN_EXT_CHAIN_INFO_KEY = "570f468101"
  38. # Base key when encryption is set
  39. KEY_PASSWORD_PREFIX = "571"
  40. _LOGGER = logging.getLogger(__name__)
  41. CONNECT_LOCK = asyncio.Lock()
  42. def _sb_uuid(comms_type: str = "service") -> UUID | str:
  43. """Return Switchbot UUID."""
  44. _uuid = {"tx": "002", "rx": "003", "service": "d00"}
  45. if comms_type in _uuid:
  46. return UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b")
  47. return "Incorrect type, choose between: tx, rx or service"
  48. def _process_wohand(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
  49. """Process woHand/Bot services data."""
  50. _switch_mode = bool(data[1] & 0b10000000)
  51. _bot_data = {
  52. "switchMode": _switch_mode,
  53. "isOn": not bool(data[1] & 0b01000000) if _switch_mode else False,
  54. "battery": data[2] & 0b01111111,
  55. }
  56. return _bot_data
  57. def _process_wocurtain(
  58. data: bytes, mfr_data: bytes | None, reverse: bool = True
  59. ) -> dict[str, bool | int]:
  60. """Process woCurtain/Curtain services data."""
  61. _position = max(min(data[3] & 0b01111111, 100), 0)
  62. _curtain_data = {
  63. "calibration": bool(data[1] & 0b01000000),
  64. "battery": data[2] & 0b01111111,
  65. "inMotion": bool(data[3] & 0b10000000),
  66. "position": (100 - _position) if reverse else _position,
  67. "lightLevel": (data[4] >> 4) & 0b00001111,
  68. "deviceChain": data[4] & 0b00000111,
  69. }
  70. return _curtain_data
  71. def _process_wosensorth(data: bytes, mfr_data: bytes | None) -> dict[str, object]:
  72. """Process woSensorTH/Temp sensor services data."""
  73. _temp_sign = 1 if data[4] & 0b10000000 else -1
  74. _temp_c = _temp_sign * ((data[4] & 0b01111111) + ((data[3] & 0b00001111) / 10))
  75. _temp_f = (_temp_c * 9 / 5) + 32
  76. _temp_f = (_temp_f * 10) / 10
  77. _wosensorth_data = {
  78. "temp": {"c": _temp_c, "f": _temp_f},
  79. "fahrenheit": bool(data[5] & 0b10000000),
  80. "humidity": data[5] & 0b01111111,
  81. "battery": data[2] & 0b01111111,
  82. }
  83. return _wosensorth_data
  84. def _process_wocontact(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
  85. """Process woContact Sensor services data."""
  86. return {
  87. "tested": bool(data[1] & 0b10000000),
  88. "motion_detected": bool(data[1] & 0b01000000),
  89. "battery": data[2] & 0b01111111,
  90. "contact_open": data[3] & 0b00000010 == 0b00000010,
  91. "contact_timeout": data[3] & 0b00000110 == 0b00000110,
  92. "is_light": bool(data[3] & 0b00000001),
  93. "button_count": (data[7] & 0b11110000) >> 4,
  94. }
  95. def _process_wopresence(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
  96. """Process WoPresence Sensor services data."""
  97. return {
  98. "tested": bool(data[1] & 0b10000000),
  99. "motion_detected": bool(data[1] & 0b01000000),
  100. "battery": data[2] & 0b01111111,
  101. "led": (data[5] & 0b00100000) >> 5,
  102. "iot": (data[5] & 0b00010000) >> 4,
  103. "sense_distance": (data[5] & 0b00001100) >> 2,
  104. "light_intensity": data[5] & 0b00000011,
  105. "is_light": bool(data[5] & 0b00000010),
  106. }
  107. def _process_woplugmini(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
  108. """Process plug mini."""
  109. return {
  110. "switchMode": True,
  111. "isOn": mfr_data[7] == 0x80,
  112. "wifi_rssi": -mfr_data[9],
  113. }
  114. def _process_color_bulb(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
  115. """Process WoBulb services data."""
  116. return {
  117. "sequence_number": mfr_data[6],
  118. "isOn": bool(mfr_data[7] & 0b10000000),
  119. "brightness": mfr_data[7] & 0b01111111,
  120. "delay": bool(mfr_data[8] & 0b10000000),
  121. "preset": bool(mfr_data[8] & 0b00001000),
  122. "light_state": mfr_data[8] & 0b00000111,
  123. "speed": mfr_data[9] & 0b01111111,
  124. "loop_index": mfr_data[10] & 0b11111110,
  125. }
  126. @dataclass
  127. class SwitchBotAdvertisement:
  128. """Switchbot advertisement."""
  129. address: str
  130. data: dict[str, Any]
  131. device: BLEDevice
  132. def parse_advertisement_data(
  133. device: BLEDevice, advertisement_data: AdvertisementData
  134. ) -> SwitchBotAdvertisement | None:
  135. """Parse advertisement data."""
  136. _services = list(advertisement_data.service_data.values())
  137. _mgr_datas = list(advertisement_data.manufacturer_data.values())
  138. if not _services:
  139. return
  140. _service_data = _services[0]
  141. _mfr_data = _mgr_datas[0] if _mgr_datas else None
  142. _model = chr(_service_data[0] & 0b01111111)
  143. supported_types: dict[str, dict[str, Any]] = {
  144. "d": {"modelName": "WoContact", "func": _process_wocontact},
  145. "H": {"modelName": "WoHand", "func": _process_wohand},
  146. "s": {"modelName": "WoPresence", "func": _process_wopresence},
  147. "c": {"modelName": "WoCurtain", "func": _process_wocurtain},
  148. "T": {"modelName": "WoSensorTH", "func": _process_wosensorth},
  149. "i": {"modelName": "WoSensorTH", "func": _process_wosensorth},
  150. "g": {"modelName": "WoPlug", "func": _process_woplugmini},
  151. "u": {"modelName": "WoBulb", "func": _process_color_bulb},
  152. }
  153. data = {
  154. "address": device.address, # MacOS uses UUIDs
  155. "rawAdvData": list(advertisement_data.service_data.values())[0],
  156. "data": {
  157. "rssi": device.rssi,
  158. },
  159. }
  160. if _model in supported_types:
  161. data.update(
  162. {
  163. "isEncrypted": bool(_service_data[0] & 0b10000000),
  164. "model": _model,
  165. "modelName": supported_types[_model]["modelName"],
  166. "data": supported_types[_model]["func"](_service_data, _mfr_data),
  167. }
  168. )
  169. data["data"]["rssi"] = device.rssi
  170. return SwitchBotAdvertisement(device.address, data, device)
  171. class GetSwitchbotDevices:
  172. """Scan for all Switchbot devices and return by type."""
  173. def __init__(self, interface: int = 0) -> None:
  174. """Get switchbot devices class constructor."""
  175. self._interface = f"hci{interface}"
  176. self._adv_data: dict[str, SwitchBotAdvertisement] = {}
  177. def detection_callback(
  178. self,
  179. device: BLEDevice,
  180. advertisement_data: AdvertisementData,
  181. ) -> None:
  182. discovery = parse_advertisement_data(device, advertisement_data)
  183. if discovery:
  184. self._adv_data[discovery.address] = discovery
  185. async def discover(
  186. self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT
  187. ) -> dict:
  188. """Find switchbot devices and their advertisement data."""
  189. devices = None
  190. devices = bleak.BleakScanner(
  191. # TODO: Find new UUIDs to filter on. For example, see
  192. # https://github.com/OpenWonderLabs/SwitchBotAPI-BLE/blob/4ad138bb09f0fbbfa41b152ca327a78c1d0b6ba9/devicetypes/meter.md
  193. adapter=self._interface,
  194. )
  195. devices.register_detection_callback(self.detection_callback)
  196. async with CONNECT_LOCK:
  197. await devices.start()
  198. await asyncio.sleep(scan_timeout)
  199. await devices.stop()
  200. if devices is None:
  201. if retry < 1:
  202. _LOGGER.error(
  203. "Scanning for Switchbot devices failed. Stop trying", exc_info=True
  204. )
  205. return self._adv_data
  206. _LOGGER.warning(
  207. "Error scanning for Switchbot devices. Retrying (remaining: %d)",
  208. retry,
  209. )
  210. await asyncio.sleep(DEFAULT_RETRY_TIMEOUT)
  211. return await self.discover(retry - 1, scan_timeout)
  212. return self._adv_data
  213. async def _get_devices_by_model(
  214. self,
  215. model: str,
  216. ) -> dict:
  217. """Get switchbot devices by type."""
  218. if not self._adv_data:
  219. await self.discover()
  220. return {
  221. address: adv
  222. for address, adv in self._adv_data.items()
  223. if adv.data.get("model") == model
  224. }
  225. async def get_curtains(self) -> dict[str, SwitchBotAdvertisement]:
  226. """Return all WoCurtain/Curtains devices with services data."""
  227. return await self._get_devices_by_model("c")
  228. async def get_bots(self) -> dict[str, SwitchBotAdvertisement]:
  229. """Return all WoHand/Bot devices with services data."""
  230. return await self._get_devices_by_model("H")
  231. async def get_tempsensors(self) -> dict[str, SwitchBotAdvertisement]:
  232. """Return all WoSensorTH/Temp sensor devices with services data."""
  233. base_meters = await self._get_devices_by_model("T")
  234. plus_meters = await self._get_devices_by_model("i")
  235. return {**base_meters, **plus_meters}
  236. async def get_contactsensors(self) -> dict[str, SwitchBotAdvertisement]:
  237. """Return all WoContact/Contact sensor devices with services data."""
  238. return await self._get_devices_by_model("d")
  239. async def get_device_data(
  240. self, address: str
  241. ) -> dict[str, SwitchBotAdvertisement] | None:
  242. """Return data for specific device."""
  243. if not self._adv_data:
  244. await self.discover()
  245. _switchbot_data = {
  246. device: data
  247. for device, data in self._adv_data.items()
  248. # MacOS uses UUIDs instead of MAC addresses
  249. if data.get("address") == address
  250. }
  251. return _switchbot_data
  252. class SwitchbotDevice:
  253. """Base Representation of a Switchbot Device."""
  254. def __init__(
  255. self,
  256. device: BLEDevice,
  257. password: str | None = None,
  258. interface: int = 0,
  259. **kwargs: Any,
  260. ) -> None:
  261. """Switchbot base class constructor."""
  262. self._interface = f"hci{interface}"
  263. self._device = device
  264. self._sb_adv_data: SwitchBotAdvertisement | None = None
  265. self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
  266. self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
  267. if password is None or password == "":
  268. self._password_encoded = None
  269. else:
  270. self._password_encoded = "%x" % (
  271. binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
  272. )
  273. def _commandkey(self, key: str) -> str:
  274. """Add password to key if set."""
  275. if self._password_encoded is None:
  276. return key
  277. key_action = key[3]
  278. key_suffix = key[4:]
  279. return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
  280. async def _sendcommand(self, key: str, retry: int) -> bytes:
  281. """Send command to device and read response."""
  282. command = bytearray.fromhex(self._commandkey(key))
  283. _LOGGER.debug("Sending command to switchbot %s", command)
  284. max_attempts = retry + 1
  285. async with CONNECT_LOCK:
  286. for attempt in range(max_attempts):
  287. try:
  288. return await self._send_command_locked(key, command)
  289. except (bleak.BleakError, asyncio.exceptions.TimeoutError):
  290. if attempt == retry:
  291. _LOGGER.error(
  292. "Switchbot communication failed. Stopping trying",
  293. exc_info=True,
  294. )
  295. return b"\x00"
  296. _LOGGER.debug("Switchbot communication failed with:", exc_info=True)
  297. raise RuntimeError("Unreachable")
  298. @property
  299. def name(self) -> str:
  300. """Return device name."""
  301. return f"{self._device.name} ({self._device.address})"
  302. async def _send_command_locked(self, key: str, command: bytes) -> bytes:
  303. """Send command to device and read response."""
  304. client: BleakClient | None = None
  305. try:
  306. _LOGGER.debug("%s: Connnecting to switchbot", self.name)
  307. client = await establish_connection(
  308. BleakClient, self._device, self.name, max_attempts=1
  309. )
  310. _LOGGER.debug(
  311. "%s: Connnected to switchbot: %s", self.name, client.is_connected
  312. )
  313. future: asyncio.Future[bytearray] = asyncio.Future()
  314. def _notification_handler(sender: int, data: bytearray) -> None:
  315. """Handle notification responses."""
  316. if future.done():
  317. _LOGGER.debug("%s: Notification handler already done", self.name)
  318. return
  319. future.set_result(data)
  320. _LOGGER.debug("%s: Subscribe to notifications", self.name)
  321. await client.start_notify(_sb_uuid(comms_type="rx"), _notification_handler)
  322. _LOGGER.debug("%s: Sending command, %s", self.name, key)
  323. await client.write_gatt_char(_sb_uuid(comms_type="tx"), command, False)
  324. async with async_timeout.timeout(5):
  325. notify_msg = await future
  326. _LOGGER.info("%s: Notification received: %s", self.name, notify_msg)
  327. _LOGGER.debug("%s: UnSubscribe to notifications", self.name)
  328. await client.stop_notify(_sb_uuid(comms_type="rx"))
  329. finally:
  330. if client:
  331. await client.disconnect()
  332. if notify_msg == b"\x07":
  333. _LOGGER.error("Password required")
  334. elif notify_msg == b"\t":
  335. _LOGGER.error("Password incorrect")
  336. return notify_msg
  337. def get_address(self) -> str:
  338. """Return address of device."""
  339. return self._device.address
  340. def _get_adv_value(self, key: str) -> Any:
  341. """Return value from advertisement data."""
  342. if not self._sb_adv_data:
  343. return None
  344. return self._sb_adv_data.data["data"][key]
  345. def get_battery_percent(self) -> Any:
  346. """Return device battery level in percent."""
  347. return self._get_adv_value("battery")
  348. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  349. """Update device data from advertisement."""
  350. self._sb_adv_data = advertisement
  351. self._device = advertisement.device
  352. async def get_device_data(
  353. self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
  354. ) -> dict | None:
  355. """Find switchbot devices and their advertisement data."""
  356. if interface:
  357. _interface: int = interface
  358. else:
  359. _interface = int(self._interface.replace("hci", ""))
  360. _data = await GetSwitchbotDevices(interface=_interface).discover(
  361. retry=retry, scan_timeout=self._scan_timeout
  362. )
  363. if self._device.address in _data:
  364. self._sb_adv_data = _data[self._device.address]
  365. return self._sb_adv_data
  366. class Switchbot(SwitchbotDevice):
  367. """Representation of a Switchbot."""
  368. def __init__(self, *args: Any, **kwargs: Any) -> None:
  369. """Switchbot Bot/WoHand constructor."""
  370. super().__init__(*args, **kwargs)
  371. self._inverse: bool = kwargs.pop("inverse_mode", False)
  372. self._settings: dict[str, Any] = {}
  373. async def update(self, interface: int | None = None) -> None:
  374. """Update mode, battery percent and state of device."""
  375. await self.get_device_data(retry=self._retry_count, interface=interface)
  376. async def turn_on(self) -> bool:
  377. """Turn device on."""
  378. result = await self._sendcommand(ON_KEY, self._retry_count)
  379. if result[0] == 1:
  380. return True
  381. if result[0] == 5:
  382. _LOGGER.debug("Bot is in press mode and doesn't have on state")
  383. return True
  384. return False
  385. async def turn_off(self) -> bool:
  386. """Turn device off."""
  387. result = await self._sendcommand(OFF_KEY, self._retry_count)
  388. if result[0] == 1:
  389. return True
  390. if result[0] == 5:
  391. _LOGGER.debug("Bot is in press mode and doesn't have off state")
  392. return True
  393. return False
  394. async def hand_up(self) -> bool:
  395. """Raise device arm."""
  396. result = await self._sendcommand(UP_KEY, self._retry_count)
  397. if result[0] == 1:
  398. return True
  399. if result[0] == 5:
  400. _LOGGER.debug("Bot is in press mode")
  401. return True
  402. return False
  403. async def hand_down(self) -> bool:
  404. """Lower device arm."""
  405. result = await self._sendcommand(DOWN_KEY, self._retry_count)
  406. if result[0] == 1:
  407. return True
  408. if result[0] == 5:
  409. _LOGGER.debug("Bot is in press mode")
  410. return True
  411. return False
  412. async def press(self) -> bool:
  413. """Press command to device."""
  414. result = await self._sendcommand(PRESS_KEY, self._retry_count)
  415. if result[0] == 1:
  416. return True
  417. if result[0] == 5:
  418. _LOGGER.debug("Bot is in switch mode")
  419. return True
  420. return False
  421. async def set_switch_mode(
  422. self, switch_mode: bool = False, strength: int = 100, inverse: bool = False
  423. ) -> bool:
  424. """Change bot mode."""
  425. mode_key = format(switch_mode, "b") + format(inverse, "b")
  426. strength_key = f"{strength:0{2}x}" # to hex with padding to double digit
  427. result = await self._sendcommand(
  428. DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count
  429. )
  430. if result[0] == 1:
  431. return True
  432. return False
  433. async def set_long_press(self, duration: int = 0) -> bool:
  434. """Set bot long press duration."""
  435. duration_key = f"{duration:0{2}x}" # to hex with padding to double digit
  436. result = await self._sendcommand(
  437. DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count
  438. )
  439. if result[0] == 1:
  440. return True
  441. return False
  442. async def get_basic_info(self) -> dict[str, Any] | None:
  443. """Get device basic settings."""
  444. _data = await self._sendcommand(
  445. key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
  446. )
  447. if _data in (b"\x07", b"\x00"):
  448. _LOGGER.error("Unsuccessfull, please try again")
  449. return None
  450. self._settings = {
  451. "battery": _data[1],
  452. "firmware": _data[2] / 10.0,
  453. "strength": _data[3],
  454. "timers": _data[8],
  455. "switchMode": bool(_data[9] & 16),
  456. "inverseDirection": bool(_data[9] & 1),
  457. "holdSeconds": _data[10],
  458. }
  459. return self._settings
  460. def switch_mode(self) -> Any:
  461. """Return true or false from cache."""
  462. # To get actual position call update() first.
  463. return self._get_adv_value("switchMode")
  464. def is_on(self) -> Any:
  465. """Return switch state from cache."""
  466. # To get actual position call update() first.
  467. value = self._get_adv_value("isOn")
  468. if value is None:
  469. return None
  470. if self._inverse:
  471. return not value
  472. return value
  473. class SwitchbotCurtain(SwitchbotDevice):
  474. """Representation of a Switchbot Curtain."""
  475. def __init__(self, *args: Any, **kwargs: Any) -> None:
  476. """Switchbot Curtain/WoCurtain constructor."""
  477. # The position of the curtain is saved returned with 0 = open and 100 = closed.
  478. # This is independent of the calibration of the curtain bot (Open left to right/
  479. # Open right to left/Open from the middle).
  480. # The parameter 'reverse_mode' reverse these values,
  481. # if 'reverse_mode' = True, position = 0 equals close
  482. # and position = 100 equals open. The parameter is default set to True so that
  483. # the definition of position is the same as in Home Assistant.
  484. super().__init__(*args, **kwargs)
  485. self._reverse: bool = kwargs.pop("reverse_mode", True)
  486. self._settings: dict[str, Any] = {}
  487. self.ext_info_sum: dict[str, Any] = {}
  488. self.ext_info_adv: dict[str, Any] = {}
  489. async def open(self) -> bool:
  490. """Send open command."""
  491. result = await self._sendcommand(OPEN_KEY, self._retry_count)
  492. if result[0] == 1:
  493. return True
  494. return False
  495. async def close(self) -> bool:
  496. """Send close command."""
  497. result = await self._sendcommand(CLOSE_KEY, self._retry_count)
  498. if result[0] == 1:
  499. return True
  500. return False
  501. async def stop(self) -> bool:
  502. """Send stop command to device."""
  503. result = await self._sendcommand(STOP_KEY, self._retry_count)
  504. if result[0] == 1:
  505. return True
  506. return False
  507. async def set_position(self, position: int) -> bool:
  508. """Send position command (0-100) to device."""
  509. position = (100 - position) if self._reverse else position
  510. hex_position = "%0.2X" % position
  511. result = await self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
  512. if result[0] == 1:
  513. return True
  514. return False
  515. async def update(self, interface: int | None = None) -> None:
  516. """Update position, battery percent and light level of device."""
  517. await self.get_device_data(retry=self._retry_count, interface=interface)
  518. def get_position(self) -> Any:
  519. """Return cached position (0-100) of Curtain."""
  520. # To get actual position call update() first.
  521. return self._get_adv_value("position")
  522. async def get_basic_info(self) -> dict[str, Any] | None:
  523. """Get device basic settings."""
  524. _data = await self._sendcommand(
  525. key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
  526. )
  527. if _data in (b"\x07", b"\x00"):
  528. _LOGGER.error("Unsuccessfull, please try again")
  529. return None
  530. _position = max(min(_data[6], 100), 0)
  531. self._settings = {
  532. "battery": _data[1],
  533. "firmware": _data[2] / 10.0,
  534. "chainLength": _data[3],
  535. "openDirection": (
  536. "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
  537. ),
  538. "touchToOpen": bool(_data[4] & 0b01000000),
  539. "light": bool(_data[4] & 0b00100000),
  540. "fault": bool(_data[4] & 0b00001000),
  541. "solarPanel": bool(_data[5] & 0b00001000),
  542. "calibrated": bool(_data[5] & 0b00000100),
  543. "inMotion": bool(_data[5] & 0b01000011),
  544. "position": (100 - _position) if self._reverse else _position,
  545. "timers": _data[7],
  546. }
  547. return self._settings
  548. async def get_extended_info_summary(self) -> dict[str, Any] | None:
  549. """Get basic info for all devices in chain."""
  550. _data = await self._sendcommand(
  551. key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count
  552. )
  553. if _data in (b"\x07", b"\x00"):
  554. _LOGGER.error("Unsuccessfull, please try again")
  555. return None
  556. self.ext_info_sum["device0"] = {
  557. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  558. "touchToOpen": bool(_data[1] & 0b01000000),
  559. "light": bool(_data[1] & 0b00100000),
  560. "openDirection": (
  561. "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
  562. ),
  563. }
  564. # if grouped curtain device present.
  565. if _data[2] != 0:
  566. self.ext_info_sum["device1"] = {
  567. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  568. "touchToOpen": bool(_data[1] & 0b01000000),
  569. "light": bool(_data[1] & 0b00100000),
  570. "openDirection": (
  571. "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
  572. ),
  573. }
  574. return self.ext_info_sum
  575. async def get_extended_info_adv(self) -> dict[str, Any] | None:
  576. """Get advance page info for device chain."""
  577. _data = await self._sendcommand(
  578. key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count
  579. )
  580. if _data in (b"\x07", b"\x00"):
  581. _LOGGER.error("Unsuccessfull, please try again")
  582. return None
  583. _state_of_charge = [
  584. "not_charging",
  585. "charging_by_adapter",
  586. "charging_by_solar",
  587. "fully_charged",
  588. "solar_not_charging",
  589. "charging_error",
  590. ]
  591. self.ext_info_adv["device0"] = {
  592. "battery": _data[1],
  593. "firmware": _data[2] / 10.0,
  594. "stateOfCharge": _state_of_charge[_data[3]],
  595. }
  596. # If grouped curtain device present.
  597. if _data[4]:
  598. self.ext_info_adv["device1"] = {
  599. "battery": _data[4],
  600. "firmware": _data[5] / 10.0,
  601. "stateOfCharge": _state_of_charge[_data[6]],
  602. }
  603. return self.ext_info_adv
  604. def get_light_level(self) -> Any:
  605. """Return cached light level."""
  606. # To get actual light level call update() first.
  607. return self._get_adv_value("lightLevel")
  608. def is_reversed(self) -> bool:
  609. """Return True if curtain position is opposite from SB data."""
  610. return self._reverse
  611. def is_calibrated(self) -> Any:
  612. """Return True curtain is calibrated."""
  613. # To get actual light level call update() first.
  614. return self._get_adv_value("calibration")
  615. class SwitchbotPlugMini(SwitchbotDevice):
  616. """Representation of a Switchbot plug mini."""
  617. def __init__(self, *args: Any, **kwargs: Any) -> None:
  618. """Switchbot plug mini constructor."""
  619. super().__init__(*args, **kwargs)
  620. self._settings: dict[str, Any] = {}
  621. async def update(self, interface: int | None = None) -> None:
  622. """Update state of device."""
  623. await self.get_device_data(retry=self._retry_count, interface=interface)
  624. async def turn_on(self) -> bool:
  625. """Turn device on."""
  626. result = await self._sendcommand(PLUG_ON_KEY, self._retry_count)
  627. return result[1] == 0x80
  628. async def turn_off(self) -> bool:
  629. """Turn device off."""
  630. result = await self._sendcommand(PLUG_OFF_KEY, self._retry_count)
  631. return result[1] == 0x00
  632. def is_on(self) -> Any:
  633. """Return switch state from cache."""
  634. # To get actual position call update() first.
  635. value = self._get_adv_value("isOn")
  636. if value is None:
  637. return None
  638. return value