__init__.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import asyncio
  4. import binascii
  5. import logging
  6. from dataclasses import dataclass
  7. from typing import Any
  8. from uuid import UUID
  9. import bleak
  10. from bleak.backends.device import BLEDevice
  11. from bleak.backends.scanner import AdvertisementData
  12. from bleak_retry_connector import BleakClient, establish_connection
  13. DEFAULT_RETRY_COUNT = 3
  14. DEFAULT_RETRY_TIMEOUT = 1
  15. DEFAULT_SCAN_TIMEOUT = 5
  16. # Keys common to all device types
  17. DEVICE_GET_BASIC_SETTINGS_KEY = "5702"
  18. DEVICE_SET_MODE_KEY = "5703"
  19. DEVICE_SET_EXTENDED_KEY = "570f"
  20. # Plug Mini keys
  21. PLUG_ON_KEY = "570f50010180"
  22. PLUG_OFF_KEY = "570f50010100"
  23. # Bot keys
  24. PRESS_KEY = "570100"
  25. ON_KEY = "570101"
  26. OFF_KEY = "570102"
  27. DOWN_KEY = "570103"
  28. UP_KEY = "570104"
  29. # Curtain keys
  30. OPEN_KEY = "570f450105ff00" # 570F4501010100
  31. CLOSE_KEY = "570f450105ff64" # 570F4501010164
  32. POSITION_KEY = "570F450105ff" # +actual_position ex: 570F450105ff32 for 50%
  33. STOP_KEY = "570F450100ff"
  34. CURTAIN_EXT_SUM_KEY = "570f460401"
  35. CURTAIN_EXT_ADV_KEY = "570f460402"
  36. CURTAIN_EXT_CHAIN_INFO_KEY = "570f468101"
  37. # Base key when encryption is set
  38. KEY_PASSWORD_PREFIX = "571"
  39. _LOGGER = logging.getLogger(__name__)
  40. CONNECT_LOCK = asyncio.Lock()
  41. def _sb_uuid(comms_type: str = "service") -> UUID | str:
  42. """Return Switchbot UUID."""
  43. _uuid = {"tx": "002", "rx": "003", "service": "d00"}
  44. if comms_type in _uuid:
  45. return UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b")
  46. return "Incorrect type, choose between: tx, rx or service"
  47. def _process_wohand(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
  48. """Process woHand/Bot services data."""
  49. _switch_mode = bool(data[1] & 0b10000000)
  50. _bot_data = {
  51. "switchMode": _switch_mode,
  52. "isOn": not bool(data[1] & 0b01000000) if _switch_mode else False,
  53. "battery": data[2] & 0b01111111,
  54. }
  55. return _bot_data
  56. def _process_wocurtain(
  57. data: bytes, mfr_data: bytes | None, reverse: bool = True
  58. ) -> dict[str, bool | int]:
  59. """Process woCurtain/Curtain services data."""
  60. _position = max(min(data[3] & 0b01111111, 100), 0)
  61. _curtain_data = {
  62. "calibration": bool(data[1] & 0b01000000),
  63. "battery": data[2] & 0b01111111,
  64. "inMotion": bool(data[3] & 0b10000000),
  65. "position": (100 - _position) if reverse else _position,
  66. "lightLevel": (data[4] >> 4) & 0b00001111,
  67. "deviceChain": data[4] & 0b00000111,
  68. }
  69. return _curtain_data
  70. def _process_wosensorth(data: bytes, mfr_data: bytes | None) -> dict[str, object]:
  71. """Process woSensorTH/Temp sensor services data."""
  72. _temp_sign = 1 if data[4] & 0b10000000 else -1
  73. _temp_c = _temp_sign * ((data[4] & 0b01111111) + ((data[3] & 0b00001111) / 10))
  74. _temp_f = (_temp_c * 9 / 5) + 32
  75. _temp_f = (_temp_f * 10) / 10
  76. _wosensorth_data = {
  77. "temp": {"c": _temp_c, "f": _temp_f},
  78. "fahrenheit": bool(data[5] & 0b10000000),
  79. "humidity": data[5] & 0b01111111,
  80. "battery": data[2] & 0b01111111,
  81. }
  82. return _wosensorth_data
  83. def _process_wocontact(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
  84. """Process woContact Sensor services data."""
  85. return {
  86. "tested": bool(data[1] & 0b10000000),
  87. "motion_detected": bool(data[1] & 0b01000000),
  88. "battery": data[2] & 0b01111111,
  89. "contact_open": data[3] & 0b00000010 == 0b00000010,
  90. "contact_timeout": data[3] & 0b00000110 == 0b00000110,
  91. "is_light": bool(data[3] & 0b00000001),
  92. "button_count": (data[7] & 0b11110000) >> 4,
  93. }
  94. def _process_wopresence(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
  95. """Process WoPresence Sensor services data."""
  96. return {
  97. "tested": bool(data[1] & 0b10000000),
  98. "motion_detected": bool(data[1] & 0b01000000),
  99. "battery": data[2] & 0b01111111,
  100. "led": (data[5] & 0b00100000) >> 5,
  101. "iot": (data[5] & 0b00010000) >> 4,
  102. "sense_distance": (data[5] & 0b00001100) >> 2,
  103. "light_intensity": data[5] & 0b00000011,
  104. "is_light": bool(data[5] & 0b00000010),
  105. }
  106. def _process_woplugmini(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
  107. """Process plug mini."""
  108. return {
  109. "switchMode": True,
  110. "isOn": mfr_data[7] == 0x80,
  111. "wifi_rssi": -mfr_data[9],
  112. }
  113. def _process_color_bulb(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
  114. """Process WoBulb services data."""
  115. return {
  116. "sequence_number": mfr_data[6],
  117. "isOn": bool(mfr_data[7] & 0b10000000),
  118. "brightness": mfr_data[7] & 0b01111111,
  119. "delay": bool(mfr_data[8] & 0b10000000),
  120. "preset": bool(mfr_data[8] & 0b00001000),
  121. "light_state": mfr_data[8] & 0b00000111,
  122. "speed": mfr_data[9] & 0b01111111,
  123. "loop_index": mfr_data[10] & 0b11111110,
  124. }
  125. @dataclass
  126. class SwitchBotAdvertisement:
  127. """Switchbot advertisement."""
  128. address: str
  129. data: dict[str, Any]
  130. device: BLEDevice
  131. def parse_advertisement_data(
  132. device: BLEDevice, advertisement_data: AdvertisementData
  133. ) -> SwitchBotAdvertisement | None:
  134. """Parse advertisement data."""
  135. _services = list(advertisement_data.service_data.values())
  136. _mgr_datas = list(advertisement_data.manufacturer_data.values())
  137. if not _services:
  138. return
  139. _service_data = _services[0]
  140. _mfr_data = _mgr_datas[0] if _mgr_datas else None
  141. _model = chr(_service_data[0] & 0b01111111)
  142. supported_types: dict[str, dict[str, Any]] = {
  143. "d": {"modelName": "WoContact", "func": _process_wocontact},
  144. "H": {"modelName": "WoHand", "func": _process_wohand},
  145. "s": {"modelName": "WoPresence", "func": _process_wopresence},
  146. "c": {"modelName": "WoCurtain", "func": _process_wocurtain},
  147. "T": {"modelName": "WoSensorTH", "func": _process_wosensorth},
  148. "i": {"modelName": "WoSensorTH", "func": _process_wosensorth},
  149. "g": {"modelName": "WoPlug", "func": _process_woplugmini},
  150. "u": {"modelName": "WoBulb", "func": _process_color_bulb},
  151. }
  152. data = {
  153. "address": device.address, # MacOS uses UUIDs
  154. "rawAdvData": list(advertisement_data.service_data.values())[0],
  155. "data": {
  156. "rssi": device.rssi,
  157. },
  158. }
  159. if _model in supported_types:
  160. data.update(
  161. {
  162. "isEncrypted": bool(_service_data[0] & 0b10000000),
  163. "model": _model,
  164. "modelName": supported_types[_model]["modelName"],
  165. "data": supported_types[_model]["func"](_service_data, _mfr_data),
  166. }
  167. )
  168. data["data"]["rssi"] = device.rssi
  169. return SwitchBotAdvertisement(device.address, data, device)
  170. class GetSwitchbotDevices:
  171. """Scan for all Switchbot devices and return by type."""
  172. def __init__(self, interface: int = 0) -> None:
  173. """Get switchbot devices class constructor."""
  174. self._interface = f"hci{interface}"
  175. self._adv_data: dict[str, SwitchBotAdvertisement] = {}
  176. def detection_callback(
  177. self,
  178. device: BLEDevice,
  179. advertisement_data: AdvertisementData,
  180. ) -> None:
  181. discovery = parse_advertisement_data(device, advertisement_data)
  182. if discovery:
  183. self._adv_data[discovery.address] = discovery
  184. async def discover(
  185. self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT
  186. ) -> dict:
  187. """Find switchbot devices and their advertisement data."""
  188. devices = None
  189. devices = bleak.BleakScanner(
  190. # TODO: Find new UUIDs to filter on. For example, see
  191. # https://github.com/OpenWonderLabs/SwitchBotAPI-BLE/blob/4ad138bb09f0fbbfa41b152ca327a78c1d0b6ba9/devicetypes/meter.md
  192. adapter=self._interface,
  193. )
  194. devices.register_detection_callback(self.detection_callback)
  195. async with CONNECT_LOCK:
  196. await devices.start()
  197. await asyncio.sleep(scan_timeout)
  198. await devices.stop()
  199. if devices is None:
  200. if retry < 1:
  201. _LOGGER.error(
  202. "Scanning for Switchbot devices failed. Stop trying", exc_info=True
  203. )
  204. return self._adv_data
  205. _LOGGER.warning(
  206. "Error scanning for Switchbot devices. Retrying (remaining: %d)",
  207. retry,
  208. )
  209. await asyncio.sleep(DEFAULT_RETRY_TIMEOUT)
  210. return await self.discover(retry - 1, scan_timeout)
  211. return self._adv_data
  212. async def _get_devices_by_model(
  213. self,
  214. model: str,
  215. ) -> dict:
  216. """Get switchbot devices by type."""
  217. if not self._adv_data:
  218. await self.discover()
  219. return {
  220. address: adv
  221. for address, adv in self._adv_data.items()
  222. if adv.data.get("model") == model
  223. }
  224. async def get_curtains(self) -> dict[str, SwitchBotAdvertisement]:
  225. """Return all WoCurtain/Curtains devices with services data."""
  226. return await self._get_devices_by_model("c")
  227. async def get_bots(self) -> dict[str, SwitchBotAdvertisement]:
  228. """Return all WoHand/Bot devices with services data."""
  229. return await self._get_devices_by_model("H")
  230. async def get_tempsensors(self) -> dict[str, SwitchBotAdvertisement]:
  231. """Return all WoSensorTH/Temp sensor devices with services data."""
  232. base_meters = await self._get_devices_by_model("T")
  233. plus_meters = await self._get_devices_by_model("i")
  234. return {**base_meters, **plus_meters}
  235. async def get_contactsensors(self) -> dict[str, SwitchBotAdvertisement]:
  236. """Return all WoContact/Contact sensor devices with services data."""
  237. return await self._get_devices_by_model("d")
  238. async def get_device_data(
  239. self, address: str
  240. ) -> dict[str, SwitchBotAdvertisement] | None:
  241. """Return data for specific device."""
  242. if not self._adv_data:
  243. await self.discover()
  244. _switchbot_data = {
  245. device: data
  246. for device, data in self._adv_data.items()
  247. # MacOS uses UUIDs instead of MAC addresses
  248. if data.get("address") == address
  249. }
  250. return _switchbot_data
  251. class SwitchbotDevice:
  252. """Base Representation of a Switchbot Device."""
  253. def __init__(
  254. self,
  255. device: BLEDevice,
  256. password: str | None = None,
  257. interface: int = 0,
  258. **kwargs: Any,
  259. ) -> None:
  260. """Switchbot base class constructor."""
  261. self._interface = f"hci{interface}"
  262. self._device = device
  263. self._sb_adv_data: SwitchBotAdvertisement | None = None
  264. self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
  265. self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
  266. if password is None or password == "":
  267. self._password_encoded = None
  268. else:
  269. self._password_encoded = "%x" % (
  270. binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
  271. )
  272. def _commandkey(self, key: str) -> str:
  273. """Add password to key if set."""
  274. if self._password_encoded is None:
  275. return key
  276. key_action = key[3]
  277. key_suffix = key[4:]
  278. return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
  279. async def _sendcommand(self, key: str, retry: int) -> bytes:
  280. """Send command to device and read response."""
  281. command = bytearray.fromhex(self._commandkey(key))
  282. _LOGGER.debug("Sending command to switchbot %s", command)
  283. max_attempts = retry + 1
  284. async with CONNECT_LOCK:
  285. for attempt in range(max_attempts):
  286. try:
  287. return await self._send_command_locked(key, command)
  288. except (bleak.BleakError, asyncio.exceptions.TimeoutError):
  289. if attempt == retry:
  290. _LOGGER.error(
  291. "Switchbot communication failed. Stopping trying",
  292. exc_info=True,
  293. )
  294. return b"\x00"
  295. _LOGGER.debug("Switchbot communication failed with:", exc_info=True)
  296. raise RuntimeError("Unreachable")
  297. @property
  298. def name(self) -> str:
  299. """Return device name."""
  300. return f"{self._device.name} ({self._device.address})"
  301. async def _send_command_locked(self, key: str, command: bytes) -> bytes:
  302. """Send command to device and read response."""
  303. client: BleakClient | None = None
  304. try:
  305. _LOGGER.debug("%s: Connnecting to switchbot", self.name)
  306. client = await establish_connection(
  307. BleakClient, self._device, self.name, max_attempts=1
  308. )
  309. _LOGGER.debug(
  310. "%s: Connnected to switchbot: %s", self.name, client.is_connected
  311. )
  312. future: asyncio.Future[bytearray] = asyncio.Future()
  313. def _notification_handler(sender: int, data: bytearray) -> None:
  314. """Handle notification responses."""
  315. if future.done():
  316. _LOGGER.debug("%s: Notification handler already done", self.name)
  317. return
  318. future.set_result(data)
  319. _LOGGER.debug("%s: Subscribe to notifications", self.name)
  320. await client.start_notify(_sb_uuid(comms_type="rx"), _notification_handler)
  321. _LOGGER.debug("%s: Sending command, %s", self.name, key)
  322. await client.write_gatt_char(_sb_uuid(comms_type="tx"), command, False)
  323. notify_msg = await asyncio.wait_for(future, timeout=5)
  324. _LOGGER.info("%s: Notification received: %s", self.name, notify_msg)
  325. _LOGGER.debug("%s: UnSubscribe to notifications", self.name)
  326. await client.stop_notify(_sb_uuid(comms_type="rx"))
  327. finally:
  328. if client:
  329. await client.disconnect()
  330. if notify_msg == b"\x07":
  331. _LOGGER.error("Password required")
  332. elif notify_msg == b"\t":
  333. _LOGGER.error("Password incorrect")
  334. return notify_msg
  335. def get_address(self) -> str:
  336. """Return address of device."""
  337. return self._device.address
  338. def _get_adv_value(self, key: str) -> Any:
  339. """Return value from advertisement data."""
  340. if not self._sb_adv_data:
  341. return None
  342. return self._sb_adv_data.data["data"][key]
  343. def get_battery_percent(self) -> Any:
  344. """Return device battery level in percent."""
  345. return self._get_adv_value("battery")
  346. def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
  347. """Update device data from advertisement."""
  348. self._sb_adv_data = advertisement
  349. self._device = advertisement.device
  350. async def get_device_data(
  351. self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
  352. ) -> dict | None:
  353. """Find switchbot devices and their advertisement data."""
  354. if interface:
  355. _interface: int = interface
  356. else:
  357. _interface = int(self._interface.replace("hci", ""))
  358. _data = await GetSwitchbotDevices(interface=_interface).discover(
  359. retry=retry, scan_timeout=self._scan_timeout
  360. )
  361. if self._device.address in _data:
  362. self._sb_adv_data = _data[self._device.address]
  363. return self._sb_adv_data
  364. class Switchbot(SwitchbotDevice):
  365. """Representation of a Switchbot."""
  366. def __init__(self, *args: Any, **kwargs: Any) -> None:
  367. """Switchbot Bot/WoHand constructor."""
  368. super().__init__(*args, **kwargs)
  369. self._inverse: bool = kwargs.pop("inverse_mode", False)
  370. self._settings: dict[str, Any] = {}
  371. async def update(self, interface: int | None = None) -> None:
  372. """Update mode, battery percent and state of device."""
  373. await self.get_device_data(retry=self._retry_count, interface=interface)
  374. async def turn_on(self) -> bool:
  375. """Turn device on."""
  376. result = await self._sendcommand(ON_KEY, self._retry_count)
  377. if result[0] == 1:
  378. return True
  379. if result[0] == 5:
  380. _LOGGER.debug("Bot is in press mode and doesn't have on state")
  381. return True
  382. return False
  383. async def turn_off(self) -> bool:
  384. """Turn device off."""
  385. result = await self._sendcommand(OFF_KEY, self._retry_count)
  386. if result[0] == 1:
  387. return True
  388. if result[0] == 5:
  389. _LOGGER.debug("Bot is in press mode and doesn't have off state")
  390. return True
  391. return False
  392. async def hand_up(self) -> bool:
  393. """Raise device arm."""
  394. result = await self._sendcommand(UP_KEY, self._retry_count)
  395. if result[0] == 1:
  396. return True
  397. if result[0] == 5:
  398. _LOGGER.debug("Bot is in press mode")
  399. return True
  400. return False
  401. async def hand_down(self) -> bool:
  402. """Lower device arm."""
  403. result = await self._sendcommand(DOWN_KEY, self._retry_count)
  404. if result[0] == 1:
  405. return True
  406. if result[0] == 5:
  407. _LOGGER.debug("Bot is in press mode")
  408. return True
  409. return False
  410. async def press(self) -> bool:
  411. """Press command to device."""
  412. result = await self._sendcommand(PRESS_KEY, self._retry_count)
  413. if result[0] == 1:
  414. return True
  415. if result[0] == 5:
  416. _LOGGER.debug("Bot is in switch mode")
  417. return True
  418. return False
  419. async def set_switch_mode(
  420. self, switch_mode: bool = False, strength: int = 100, inverse: bool = False
  421. ) -> bool:
  422. """Change bot mode."""
  423. mode_key = format(switch_mode, "b") + format(inverse, "b")
  424. strength_key = f"{strength:0{2}x}" # to hex with padding to double digit
  425. result = await self._sendcommand(
  426. DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count
  427. )
  428. if result[0] == 1:
  429. return True
  430. return False
  431. async def set_long_press(self, duration: int = 0) -> bool:
  432. """Set bot long press duration."""
  433. duration_key = f"{duration:0{2}x}" # to hex with padding to double digit
  434. result = await self._sendcommand(
  435. DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count
  436. )
  437. if result[0] == 1:
  438. return True
  439. return False
  440. async def get_basic_info(self) -> dict[str, Any] | None:
  441. """Get device basic settings."""
  442. _data = await self._sendcommand(
  443. key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
  444. )
  445. if _data in (b"\x07", b"\x00"):
  446. _LOGGER.error("Unsuccessfull, please try again")
  447. return None
  448. self._settings = {
  449. "battery": _data[1],
  450. "firmware": _data[2] / 10.0,
  451. "strength": _data[3],
  452. "timers": _data[8],
  453. "switchMode": bool(_data[9] & 16),
  454. "inverseDirection": bool(_data[9] & 1),
  455. "holdSeconds": _data[10],
  456. }
  457. return self._settings
  458. def switch_mode(self) -> Any:
  459. """Return true or false from cache."""
  460. # To get actual position call update() first.
  461. return self._get_adv_value("switchMode")
  462. def is_on(self) -> Any:
  463. """Return switch state from cache."""
  464. # To get actual position call update() first.
  465. value = self._get_adv_value("isOn")
  466. if value is None:
  467. return None
  468. if self._inverse:
  469. return not value
  470. return value
  471. class SwitchbotCurtain(SwitchbotDevice):
  472. """Representation of a Switchbot Curtain."""
  473. def __init__(self, *args: Any, **kwargs: Any) -> None:
  474. """Switchbot Curtain/WoCurtain constructor."""
  475. # The position of the curtain is saved returned with 0 = open and 100 = closed.
  476. # This is independent of the calibration of the curtain bot (Open left to right/
  477. # Open right to left/Open from the middle).
  478. # The parameter 'reverse_mode' reverse these values,
  479. # if 'reverse_mode' = True, position = 0 equals close
  480. # and position = 100 equals open. The parameter is default set to True so that
  481. # the definition of position is the same as in Home Assistant.
  482. super().__init__(*args, **kwargs)
  483. self._reverse: bool = kwargs.pop("reverse_mode", True)
  484. self._settings: dict[str, Any] = {}
  485. self.ext_info_sum: dict[str, Any] = {}
  486. self.ext_info_adv: dict[str, Any] = {}
  487. async def open(self) -> bool:
  488. """Send open command."""
  489. result = await self._sendcommand(OPEN_KEY, self._retry_count)
  490. if result[0] == 1:
  491. return True
  492. return False
  493. async def close(self) -> bool:
  494. """Send close command."""
  495. result = await self._sendcommand(CLOSE_KEY, self._retry_count)
  496. if result[0] == 1:
  497. return True
  498. return False
  499. async def stop(self) -> bool:
  500. """Send stop command to device."""
  501. result = await self._sendcommand(STOP_KEY, self._retry_count)
  502. if result[0] == 1:
  503. return True
  504. return False
  505. async def set_position(self, position: int) -> bool:
  506. """Send position command (0-100) to device."""
  507. position = (100 - position) if self._reverse else position
  508. hex_position = "%0.2X" % position
  509. result = await self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
  510. if result[0] == 1:
  511. return True
  512. return False
  513. async def update(self, interface: int | None = None) -> None:
  514. """Update position, battery percent and light level of device."""
  515. await self.get_device_data(retry=self._retry_count, interface=interface)
  516. def get_position(self) -> Any:
  517. """Return cached position (0-100) of Curtain."""
  518. # To get actual position call update() first.
  519. return self._get_adv_value("position")
  520. async def get_basic_info(self) -> dict[str, Any] | None:
  521. """Get device basic settings."""
  522. _data = await self._sendcommand(
  523. key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
  524. )
  525. if _data in (b"\x07", b"\x00"):
  526. _LOGGER.error("Unsuccessfull, please try again")
  527. return None
  528. _position = max(min(_data[6], 100), 0)
  529. self._settings = {
  530. "battery": _data[1],
  531. "firmware": _data[2] / 10.0,
  532. "chainLength": _data[3],
  533. "openDirection": (
  534. "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
  535. ),
  536. "touchToOpen": bool(_data[4] & 0b01000000),
  537. "light": bool(_data[4] & 0b00100000),
  538. "fault": bool(_data[4] & 0b00001000),
  539. "solarPanel": bool(_data[5] & 0b00001000),
  540. "calibrated": bool(_data[5] & 0b00000100),
  541. "inMotion": bool(_data[5] & 0b01000011),
  542. "position": (100 - _position) if self._reverse else _position,
  543. "timers": _data[7],
  544. }
  545. return self._settings
  546. async def get_extended_info_summary(self) -> dict[str, Any] | None:
  547. """Get basic info for all devices in chain."""
  548. _data = await self._sendcommand(
  549. key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count
  550. )
  551. if _data in (b"\x07", b"\x00"):
  552. _LOGGER.error("Unsuccessfull, please try again")
  553. return None
  554. self.ext_info_sum["device0"] = {
  555. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  556. "touchToOpen": bool(_data[1] & 0b01000000),
  557. "light": bool(_data[1] & 0b00100000),
  558. "openDirection": (
  559. "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
  560. ),
  561. }
  562. # if grouped curtain device present.
  563. if _data[2] != 0:
  564. self.ext_info_sum["device1"] = {
  565. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  566. "touchToOpen": bool(_data[1] & 0b01000000),
  567. "light": bool(_data[1] & 0b00100000),
  568. "openDirection": (
  569. "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
  570. ),
  571. }
  572. return self.ext_info_sum
  573. async def get_extended_info_adv(self) -> dict[str, Any] | None:
  574. """Get advance page info for device chain."""
  575. _data = await self._sendcommand(
  576. key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count
  577. )
  578. if _data in (b"\x07", b"\x00"):
  579. _LOGGER.error("Unsuccessfull, please try again")
  580. return None
  581. _state_of_charge = [
  582. "not_charging",
  583. "charging_by_adapter",
  584. "charging_by_solar",
  585. "fully_charged",
  586. "solar_not_charging",
  587. "charging_error",
  588. ]
  589. self.ext_info_adv["device0"] = {
  590. "battery": _data[1],
  591. "firmware": _data[2] / 10.0,
  592. "stateOfCharge": _state_of_charge[_data[3]],
  593. }
  594. # If grouped curtain device present.
  595. if _data[4]:
  596. self.ext_info_adv["device1"] = {
  597. "battery": _data[4],
  598. "firmware": _data[5] / 10.0,
  599. "stateOfCharge": _state_of_charge[_data[6]],
  600. }
  601. return self.ext_info_adv
  602. def get_light_level(self) -> Any:
  603. """Return cached light level."""
  604. # To get actual light level call update() first.
  605. return self._get_adv_value("lightLevel")
  606. def is_reversed(self) -> bool:
  607. """Return True if curtain position is opposite from SB data."""
  608. return self._reverse
  609. def is_calibrated(self) -> Any:
  610. """Return True curtain is calibrated."""
  611. # To get actual light level call update() first.
  612. return self._get_adv_value("calibration")
  613. class SwitchbotPlugMini(SwitchbotDevice):
  614. """Representation of a Switchbot plug mini."""
  615. def __init__(self, *args: Any, **kwargs: Any) -> None:
  616. """Switchbot plug mini constructor."""
  617. super().__init__(*args, **kwargs)
  618. self._settings: dict[str, Any] = {}
  619. async def update(self, interface: int | None = None) -> None:
  620. """Update state of device."""
  621. await self.get_device_data(retry=self._retry_count, interface=interface)
  622. async def turn_on(self) -> bool:
  623. """Turn device on."""
  624. result = await self._sendcommand(PLUG_ON_KEY, self._retry_count)
  625. return result[1] == 0x80
  626. async def turn_off(self) -> bool:
  627. """Turn device off."""
  628. result = await self._sendcommand(PLUG_OFF_KEY, self._retry_count)
  629. return result[1] == 0x00
  630. def is_on(self) -> Any:
  631. """Return switch state from cache."""
  632. # To get actual position call update() first.
  633. value = self._get_adv_value("isOn")
  634. if value is None:
  635. return None
  636. return value