__init__.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import binascii
  4. import logging
  5. from threading import Lock
  6. import time
  7. from typing import Any
  8. import bluepy
  9. DEFAULT_RETRY_COUNT = 3
  10. DEFAULT_RETRY_TIMEOUT = 1
  11. DEFAULT_SCAN_TIMEOUT = 5
  12. # Keys common to all device types
  13. DEVICE_GET_BASIC_SETTINGS_KEY = "5702"
  14. DEVICE_SET_MODE_KEY = "5703"
  15. DEVICE_SET_EXTENDED_KEY = "570f"
  16. # Bot keys
  17. PRESS_KEY = "570100"
  18. ON_KEY = "570101"
  19. OFF_KEY = "570102"
  20. DOWN_KEY = "570103"
  21. UP_KEY = "570104"
  22. # Curtain keys
  23. OPEN_KEY = "570f450105ff00" # 570F4501010100
  24. CLOSE_KEY = "570f450105ff64" # 570F4501010164
  25. POSITION_KEY = "570F450105ff" # +actual_position ex: 570F450105ff32 for 50%
  26. STOP_KEY = "570F450100ff"
  27. CURTAIN_EXT_SUM_KEY = "570f460401"
  28. CURTAIN_EXT_ADV_KEY = "570f460402"
  29. CURTAIN_EXT_CHAIN_INFO_KEY = "570f468101"
  30. # Base key when encryption is set
  31. KEY_PASSWORD_PREFIX = "571"
  32. _LOGGER = logging.getLogger(__name__)
  33. CONNECT_LOCK = Lock()
  34. def _sb_uuid(comms_type: str = "service") -> bluepy.btle.UUID:
  35. """Return Switchbot UUID."""
  36. _uuid = {"tx": "002", "rx": "003", "service": "d00"}
  37. if comms_type in _uuid:
  38. return bluepy.btle.UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b")
  39. return "Incorrect type, choose between: tx, rx or service"
  40. def _process_wohand(data: bytes) -> dict[str, bool | int]:
  41. """Process woHand/Bot services data."""
  42. _switch_mode = bool(data[1] & 0b10000000)
  43. _bot_data = {
  44. "switchMode": _switch_mode,
  45. "isOn": not bool(data[1] & 0b01000000) if _switch_mode else False,
  46. "battery": data[2] & 0b01111111,
  47. }
  48. return _bot_data
  49. def _process_wocurtain(data: bytes, reverse: bool = True) -> dict[str, bool | int]:
  50. """Process woCurtain/Curtain services data."""
  51. _position = max(min(data[3] & 0b01111111, 100), 0)
  52. _curtain_data = {
  53. "calibration": bool(data[1] & 0b01000000),
  54. "battery": data[2] & 0b01111111,
  55. "inMotion": bool(data[3] & 0b10000000),
  56. "position": (100 - _position) if reverse else _position,
  57. "lightLevel": (data[4] >> 4) & 0b00001111,
  58. "deviceChain": data[4] & 0b00000111,
  59. }
  60. return _curtain_data
  61. def _process_wosensorth(data: bytes) -> dict[str, object]:
  62. """Process woSensorTH/Temp sensor services data."""
  63. _temp_sign = 1 if data[4] & 0b10000000 else -1
  64. _temp_c = _temp_sign * ((data[4] & 0b01111111) + (data[3] / 10))
  65. _temp_f = (_temp_c * 9 / 5) + 32
  66. _temp_f = (_temp_f * 10) / 10
  67. _wosensorth_data = {
  68. "temp": {"c": _temp_c, "f": _temp_f},
  69. "fahrenheit": bool(data[5] & 0b10000000),
  70. "humidity": data[5] & 0b01111111,
  71. "battery": data[2] & 0b01111111,
  72. }
  73. return _wosensorth_data
  74. def _process_btle_adv_data(dev: bluepy.btle.ScanEntry) -> dict[str, Any]:
  75. """Process bt le adv data."""
  76. _adv_data = {"mac_address": dev.addr}
  77. _data = dev.getValue(22)[2:]
  78. supported_types: dict[str, dict[str, Any]] = {
  79. "H": {"modelName": "WoHand", "func": _process_wohand},
  80. "c": {"modelName": "WoCurtain", "func": _process_wocurtain},
  81. "T": {"modelName": "WoSensorTH", "func": _process_wosensorth},
  82. }
  83. _model = chr(_data[0] & 0b01111111)
  84. _adv_data["isEncrypted"] = bool(_data[0] & 0b10000000)
  85. _adv_data["model"] = _model
  86. if _model in supported_types:
  87. _adv_data["data"] = supported_types[_model]["func"](_data)
  88. _adv_data["data"]["rssi"] = dev.rssi
  89. _adv_data["modelName"] = supported_types[_model]["modelName"]
  90. else:
  91. _adv_data["rawAdvData"] = dev.getValueText(22)
  92. return _adv_data
  93. class GetSwitchbotDevices:
  94. """Scan for all Switchbot devices and return by type."""
  95. def __init__(self, interface: int = 0) -> None:
  96. """Get switchbot devices class constructor."""
  97. self._interface = interface
  98. self._adv_data: dict[str, Any] = {}
  99. def discover(
  100. self,
  101. retry: int = DEFAULT_RETRY_COUNT,
  102. scan_timeout: int = DEFAULT_SCAN_TIMEOUT,
  103. passive: bool = False,
  104. mac: str | None = None,
  105. ) -> dict[str, Any]:
  106. """Find switchbot devices and their advertisement data."""
  107. devices = None
  108. with CONNECT_LOCK:
  109. try:
  110. devices = bluepy.btle.Scanner(self._interface).scan(
  111. scan_timeout, passive
  112. )
  113. except bluepy.btle.BTLEManagementError:
  114. _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
  115. if devices is None:
  116. if retry < 1:
  117. _LOGGER.error(
  118. "Scanning for Switchbot devices failed. Stop trying", exc_info=True
  119. )
  120. return self._adv_data
  121. _LOGGER.warning(
  122. "Error scanning for Switchbot devices. Retrying (remaining: %d)",
  123. retry,
  124. )
  125. time.sleep(DEFAULT_RETRY_TIMEOUT)
  126. return self.discover(
  127. retry=retry - 1,
  128. scan_timeout=scan_timeout,
  129. passive=passive,
  130. mac=mac,
  131. )
  132. for dev in devices:
  133. if dev.getValueText(7) == str(_sb_uuid()):
  134. dev_id = dev.addr.replace(":", "")
  135. if mac:
  136. if dev.addr.lower() == mac.lower():
  137. self._adv_data[dev_id] = _process_btle_adv_data(dev)
  138. else:
  139. self._adv_data[dev_id] = _process_btle_adv_data(dev)
  140. return self._adv_data
  141. def get_curtains(self) -> dict:
  142. """Return all WoCurtain/Curtains devices with services data."""
  143. if not self._adv_data:
  144. self.discover()
  145. _curtain_devices = {
  146. device: data
  147. for device, data in self._adv_data.items()
  148. if data.get("model") == "c"
  149. }
  150. return _curtain_devices
  151. def get_bots(self) -> dict:
  152. """Return all WoHand/Bot devices with services data."""
  153. if not self._adv_data:
  154. self.discover()
  155. _bot_devices = {
  156. device: data
  157. for device, data in self._adv_data.items()
  158. if data.get("model") == "H"
  159. }
  160. return _bot_devices
  161. def get_tempsensors(self) -> dict:
  162. """Return all WoSensorTH/Temp sensor devices with services data."""
  163. if not self._adv_data:
  164. self.discover()
  165. _bot_temp = {
  166. device: data
  167. for device, data in self._adv_data.items()
  168. if data.get("model") == "T"
  169. }
  170. return _bot_temp
  171. def get_device_data(self, mac: str) -> dict:
  172. """Return data for specific device."""
  173. if not self._adv_data:
  174. self.discover()
  175. _switchbot_data = {
  176. device: data
  177. for device, data in self._adv_data.items()
  178. if data.get("mac_address") == mac
  179. }
  180. return _switchbot_data
  181. class SwitchbotDevice(bluepy.btle.Peripheral):
  182. """Base Representation of a Switchbot Device."""
  183. def __init__(
  184. self,
  185. mac: str,
  186. password: str | None = None,
  187. interface: int = 0,
  188. **kwargs: Any,
  189. ) -> None:
  190. """Switchbot base class constructor."""
  191. bluepy.btle.Peripheral.__init__(
  192. self,
  193. deviceAddr=None,
  194. addrType=bluepy.btle.ADDR_TYPE_RANDOM,
  195. iface=interface,
  196. )
  197. self._interface = interface
  198. self._mac = mac.replace("-", ":").lower()
  199. self._sb_adv_data: dict[str, Any] = {}
  200. self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
  201. self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
  202. if password is None or password == "":
  203. self._password_encoded = None
  204. else:
  205. self._password_encoded = "%x" % (
  206. binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
  207. )
  208. # pylint: disable=arguments-differ
  209. def _connect(self, retry: int, timeout: int | None = None) -> None:
  210. _LOGGER.debug("Connecting to Switchbot")
  211. if retry < 1: # failsafe
  212. self._stopHelper()
  213. raise bluepy.btle.BTLEDisconnectError(
  214. "Failed to connect to peripheral %s" % self._mac
  215. )
  216. if self._helper is None:
  217. self._startHelper(self._interface)
  218. self._writeCmd("conn %s %s\n" % (self._mac, bluepy.btle.ADDR_TYPE_RANDOM))
  219. rsp = self._getResp(["stat", "err"], timeout)
  220. while rsp and rsp["state"][0] in [
  221. "tryconn",
  222. "scan",
  223. ]: # Wait for any operations to finish.
  224. rsp = self._getResp(["stat", "err"], timeout)
  225. # If operation in progress, disc is returned.
  226. # Bluepy helper can't handle state. Execute stop, wait and retry.
  227. if rsp and rsp["state"][0] == "disc":
  228. _LOGGER.warning("Bluepy busy, waiting before retry")
  229. self._stopHelper()
  230. time.sleep(self._scan_timeout)
  231. return self._connect(retry - 1, timeout)
  232. if rsp and rsp["rsp"][0] == "err":
  233. errcode = rsp["code"][0]
  234. _LOGGER.debug(
  235. "Error trying to connect to peripheral %s, error: %s",
  236. self._mac,
  237. errcode,
  238. )
  239. if errcode == "connfail": # not terminal, can retry connection.
  240. return self._connect(retry - 1, timeout)
  241. if errcode == "nomgmt":
  242. raise bluepy.btle.BTLEManagementError(
  243. "Management not available (permissions problem?)", rsp
  244. )
  245. if errcode == "atterr":
  246. raise bluepy.btle.BTLEGattError("Bluetooth command failed", rsp)
  247. raise bluepy.btle.BTLEException(
  248. "Error from bluepy-helper (%s)" % errcode, rsp
  249. )
  250. if rsp is None or rsp["state"][0] != "conn":
  251. self._stopHelper()
  252. if rsp is None:
  253. _LOGGER.warning(
  254. "Timed out while trying to connect to peripheral %s", self._mac
  255. )
  256. _LOGGER.warning("Bluehelper returned unable to connect state: %s", rsp)
  257. raise bluepy.btle.BTLEDisconnectError(
  258. "Failed to connect to peripheral %s, rsp: %s" % (self._mac, rsp)
  259. )
  260. def _commandkey(self, key: str) -> str:
  261. if self._password_encoded is None:
  262. return key
  263. key_action = key[3]
  264. key_suffix = key[4:]
  265. return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
  266. def _writekey(self, key: str) -> bool:
  267. _LOGGER.debug("Prepare to send")
  268. if self._helper is None or self.getState() == "disc":
  269. return False
  270. try:
  271. hand = self.getCharacteristics(uuid=_sb_uuid("tx"))[0]
  272. _LOGGER.debug("Sending command, %s", key)
  273. hand.write(bytes.fromhex(key), withResponse=False)
  274. except bluepy.btle.BTLEException:
  275. _LOGGER.warning("Error sending command to Switchbot", exc_info=True)
  276. raise
  277. else:
  278. _LOGGER.info("Successfully sent command to Switchbot (MAC: %s)", self._mac)
  279. return True
  280. def _subscribe(self) -> bool:
  281. _LOGGER.debug("Subscribe to notifications")
  282. if self._helper is None or self.getState() == "disc":
  283. return False
  284. enable_notify_flag = b"\x01\x00" # standard gatt flag to enable notification
  285. try:
  286. handle = self.getCharacteristics(uuid=_sb_uuid("rx"))[0]
  287. notify_handle = handle.getHandle() + 1
  288. self.writeCharacteristic(
  289. notify_handle, enable_notify_flag, withResponse=False
  290. )
  291. except bluepy.btle.BTLEException:
  292. _LOGGER.warning(
  293. "Error while enabling notifications on Switchbot", exc_info=True
  294. )
  295. raise
  296. return True
  297. def _readkey(self) -> bytes:
  298. _LOGGER.debug("Prepare to read notification from switchbot")
  299. if self._helper is None:
  300. return b"\x00"
  301. try:
  302. receive_handle = self.getCharacteristics(uuid=_sb_uuid("rx"))
  303. except bluepy.btle.BTLEException:
  304. _LOGGER.warning(
  305. "Error while reading notifications from Switchbot", exc_info=True
  306. )
  307. else:
  308. for char in receive_handle:
  309. read_result: bytes = char.read()
  310. return read_result
  311. # Could disconnect before reading response. Assume it worked as this is executed after issueing command.
  312. if self._helper and self.getState() == "disc":
  313. return b"\x01"
  314. return b"\x00"
  315. def _sendcommand(self, key: str, retry: int, timeout: int | None = 40) -> bytes:
  316. command = self._commandkey(key)
  317. send_success = False
  318. notify_msg = None
  319. _LOGGER.debug("Sending command to switchbot %s", command)
  320. if len(self._mac.split(":")) != 6:
  321. raise ValueError("Expected MAC address, got %s" % repr(self._mac))
  322. with CONNECT_LOCK:
  323. try:
  324. self._connect(retry, timeout)
  325. send_success = self._subscribe()
  326. except bluepy.btle.BTLEException:
  327. _LOGGER.warning("Error connecting to Switchbot", exc_info=True)
  328. else:
  329. try:
  330. send_success = self._writekey(command)
  331. except bluepy.btle.BTLEException:
  332. _LOGGER.warning(
  333. "Error sending commands to Switchbot", exc_info=True
  334. )
  335. else:
  336. notify_msg = self._readkey()
  337. finally:
  338. self.disconnect()
  339. if notify_msg and send_success:
  340. if notify_msg == b"\x07":
  341. _LOGGER.error("Password required")
  342. elif notify_msg == b"\t":
  343. _LOGGER.error("Password incorrect")
  344. return notify_msg
  345. if retry < 1:
  346. _LOGGER.error(
  347. "Switchbot communication failed. Stopping trying", exc_info=True
  348. )
  349. return b"\x00"
  350. _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
  351. time.sleep(DEFAULT_RETRY_TIMEOUT)
  352. return self._sendcommand(key, retry - 1)
  353. def get_mac(self) -> str:
  354. """Return mac address of device."""
  355. return self._mac
  356. def get_battery_percent(self) -> Any:
  357. """Return device battery level in percent."""
  358. if not self._sb_adv_data.get("data"):
  359. return None
  360. return self._sb_adv_data["data"].get("battery")
  361. def get_device_data(
  362. self,
  363. retry: int = DEFAULT_RETRY_COUNT,
  364. interface: int | None = None,
  365. passive: bool = False,
  366. ) -> dict[str, Any]:
  367. """Find switchbot devices and their advertisement data."""
  368. _interface: int = interface if interface else self._interface
  369. dev_id = self._mac.replace(":", "")
  370. _data = GetSwitchbotDevices(interface=_interface).discover(
  371. retry=retry,
  372. scan_timeout=self._scan_timeout,
  373. passive=passive,
  374. mac=self._mac,
  375. )
  376. if _data.get(dev_id):
  377. self._sb_adv_data = _data[dev_id]
  378. return self._sb_adv_data
  379. class Switchbot(SwitchbotDevice):
  380. """Representation of a Switchbot."""
  381. def __init__(self, *args: Any, **kwargs: Any) -> None:
  382. """Switchbot Bot/WoHand constructor."""
  383. super().__init__(*args, **kwargs)
  384. self._inverse: bool = kwargs.pop("inverse_mode", False)
  385. self._settings: dict[str, Any] = {}
  386. def update(self, interface: int | None = None, passive: bool = False) -> None:
  387. """Update mode, battery percent and state of device."""
  388. self.get_device_data(
  389. retry=self._retry_count, interface=interface, passive=passive
  390. )
  391. def turn_on(self) -> bool:
  392. """Turn device on."""
  393. result = self._sendcommand(ON_KEY, self._retry_count)
  394. if result[0] == 1:
  395. return True
  396. if result[0] == 5:
  397. _LOGGER.debug("Bot is in press mode and doesn't have on state")
  398. return True
  399. return False
  400. def turn_off(self) -> bool:
  401. """Turn device off."""
  402. result = self._sendcommand(OFF_KEY, self._retry_count)
  403. if result[0] == 1:
  404. return True
  405. if result[0] == 5:
  406. _LOGGER.debug("Bot is in press mode and doesn't have off state")
  407. return True
  408. return False
  409. def hand_up(self) -> bool:
  410. """Raise device arm."""
  411. result = self._sendcommand(UP_KEY, self._retry_count)
  412. if result[0] == 1:
  413. return True
  414. if result[0] == 5:
  415. _LOGGER.debug("Bot is in press mode")
  416. return True
  417. return False
  418. def hand_down(self) -> bool:
  419. """Lower device arm."""
  420. result = self._sendcommand(DOWN_KEY, self._retry_count)
  421. if result[0] == 1:
  422. return True
  423. if result[0] == 5:
  424. _LOGGER.debug("Bot is in press mode")
  425. return True
  426. return False
  427. def press(self) -> bool:
  428. """Press command to device."""
  429. result = self._sendcommand(PRESS_KEY, self._retry_count)
  430. if result[0] == 1:
  431. return True
  432. if result[0] == 5:
  433. _LOGGER.debug("Bot is in switch mode")
  434. return True
  435. return False
  436. def set_switch_mode(
  437. self, switch_mode: bool = False, strength: int = 100, inverse: bool = False
  438. ) -> bool:
  439. """Change bot mode."""
  440. mode_key = format(switch_mode, "b") + format(inverse, "b")
  441. strength_key = f"{strength:0{2}x}" # to hex with padding to double digit
  442. result = self._sendcommand(
  443. DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count
  444. )
  445. if result[0] == 1:
  446. return True
  447. return False
  448. def set_long_press(self, duration: int = 0) -> bool:
  449. """Set bot long press duration."""
  450. duration_key = f"{duration:0{2}x}" # to hex with padding to double digit
  451. result = self._sendcommand(
  452. DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count
  453. )
  454. if result[0] == 1:
  455. return True
  456. return False
  457. def get_basic_info(self) -> dict[str, Any] | None:
  458. """Get device basic settings."""
  459. _data = self._sendcommand(
  460. key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
  461. )
  462. if _data in (b"\x07", b"\x00"):
  463. _LOGGER.error("Unsuccessfull, please try again")
  464. return None
  465. self._settings = {
  466. "battery": _data[1],
  467. "firmware": _data[2] / 10.0,
  468. "strength": _data[3],
  469. "timers": _data[8],
  470. "switchMode": bool(_data[9] & 16),
  471. "inverseDirection": bool(_data[9] & 1),
  472. "holdSeconds": _data[10],
  473. }
  474. return self._settings
  475. def switch_mode(self) -> Any:
  476. """Return true or false from cache."""
  477. # To get actual position call update() first.
  478. if not self._sb_adv_data.get("data"):
  479. return None
  480. return self._sb_adv_data.get("switchMode")
  481. def is_on(self) -> Any:
  482. """Return switch state from cache."""
  483. # To get actual position call update() first.
  484. if not self._sb_adv_data.get("data"):
  485. return None
  486. if self._inverse:
  487. return not self._sb_adv_data["data"].get("isOn")
  488. return self._sb_adv_data["data"].get("isOn")
  489. class SwitchbotCurtain(SwitchbotDevice):
  490. """Representation of a Switchbot Curtain."""
  491. def __init__(self, *args: Any, **kwargs: Any) -> None:
  492. """Switchbot Curtain/WoCurtain constructor."""
  493. # The position of the curtain is saved returned with 0 = open and 100 = closed.
  494. # This is independent of the calibration of the curtain bot (Open left to right/
  495. # Open right to left/Open from the middle).
  496. # The parameter 'reverse_mode' reverse these values,
  497. # if 'reverse_mode' = True, position = 0 equals close
  498. # and position = 100 equals open. The parameter is default set to True so that
  499. # the definition of position is the same as in Home Assistant.
  500. super().__init__(*args, **kwargs)
  501. self._reverse: bool = kwargs.pop("reverse_mode", True)
  502. self._settings: dict[str, Any] = {}
  503. self.ext_info_sum: dict[str, Any] = {}
  504. self.ext_info_adv: dict[str, Any] = {}
  505. def open(self) -> bool:
  506. """Send open command."""
  507. result = self._sendcommand(OPEN_KEY, self._retry_count)
  508. if result[0] == 1:
  509. return True
  510. return False
  511. def close(self) -> bool:
  512. """Send close command."""
  513. result = self._sendcommand(CLOSE_KEY, self._retry_count)
  514. if result[0] == 1:
  515. return True
  516. return False
  517. def stop(self) -> bool:
  518. """Send stop command to device."""
  519. result = self._sendcommand(STOP_KEY, self._retry_count)
  520. if result[0] == 1:
  521. return True
  522. return False
  523. def set_position(self, position: int) -> bool:
  524. """Send position command (0-100) to device."""
  525. position = (100 - position) if self._reverse else position
  526. hex_position = "%0.2X" % position
  527. result = self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
  528. if result[0] == 1:
  529. return True
  530. return False
  531. def update(self, interface: int | None = None, passive: bool = False) -> None:
  532. """Update position, battery percent and light level of device."""
  533. self.get_device_data(
  534. retry=self._retry_count, interface=interface, passive=passive
  535. )
  536. def get_position(self) -> Any:
  537. """Return cached position (0-100) of Curtain."""
  538. # To get actual position call update() first.
  539. if not self._sb_adv_data.get("data"):
  540. return None
  541. return self._sb_adv_data["data"].get("position")
  542. def get_basic_info(self) -> dict[str, Any] | None:
  543. """Get device basic settings."""
  544. _data = self._sendcommand(
  545. key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
  546. )
  547. if _data in (b"\x07", b"\x00"):
  548. _LOGGER.error("Unsuccessfull, please try again")
  549. return None
  550. _position = max(min(_data[6], 100), 0)
  551. self._settings = {
  552. "battery": _data[1],
  553. "firmware": _data[2] / 10.0,
  554. "chainLength": _data[3],
  555. "openDirection": (
  556. "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
  557. ),
  558. "touchToOpen": bool(_data[4] & 0b01000000),
  559. "light": bool(_data[4] & 0b00100000),
  560. "fault": bool(_data[4] & 0b00001000),
  561. "solarPanel": bool(_data[5] & 0b00001000),
  562. "calibrated": bool(_data[5] & 0b00000100),
  563. "inMotion": bool(_data[5] & 0b01000011),
  564. "position": (100 - _position) if self._reverse else _position,
  565. "timers": _data[7],
  566. }
  567. return self._settings
  568. def get_extended_info_summary(self) -> dict[str, Any] | None:
  569. """Get basic info for all devices in chain."""
  570. _data = self._sendcommand(key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count)
  571. if _data in (b"\x07", b"\x00"):
  572. _LOGGER.error("Unsuccessfull, please try again")
  573. return None
  574. self.ext_info_sum["device0"] = {
  575. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  576. "touchToOpen": bool(_data[1] & 0b01000000),
  577. "light": bool(_data[1] & 0b00100000),
  578. "openDirection": (
  579. "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
  580. ),
  581. }
  582. # if grouped curtain device present.
  583. if _data[2] != 0:
  584. self.ext_info_sum["device1"] = {
  585. "openDirectionDefault": not bool(_data[1] & 0b10000000),
  586. "touchToOpen": bool(_data[1] & 0b01000000),
  587. "light": bool(_data[1] & 0b00100000),
  588. "openDirection": (
  589. "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
  590. ),
  591. }
  592. return self.ext_info_sum
  593. def get_extended_info_adv(self) -> dict[str, Any] | None:
  594. """Get advance page info for device chain."""
  595. _data = self._sendcommand(key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count)
  596. if _data in (b"\x07", b"\x00"):
  597. _LOGGER.error("Unsuccessfull, please try again")
  598. return None
  599. _state_of_charge = [
  600. "not_charging",
  601. "charging_by_adapter",
  602. "charging_by_solar",
  603. "fully_charged",
  604. "solar_not_charging",
  605. "charging_error",
  606. ]
  607. self.ext_info_adv["device0"] = {
  608. "battery": _data[1],
  609. "firmware": _data[2] / 10.0,
  610. "stateOfCharge": _state_of_charge[_data[3]],
  611. }
  612. # If grouped curtain device present.
  613. if _data[4]:
  614. self.ext_info_adv["device1"] = {
  615. "battery": _data[4],
  616. "firmware": _data[5] / 10.0,
  617. "stateOfCharge": _state_of_charge[_data[6]],
  618. }
  619. return self.ext_info_adv
  620. def get_light_level(self) -> Any:
  621. """Return cached light level."""
  622. # To get actual light level call update() first.
  623. if not self._sb_adv_data.get("data"):
  624. return None
  625. return self._sb_adv_data["data"].get("lightLevel")
  626. def is_reversed(self) -> bool:
  627. """Return True if curtain position is opposite from SB data."""
  628. return self._reverse
  629. def is_calibrated(self) -> Any:
  630. """Return True curtain is calibrated."""
  631. # To get actual light level call update() first.
  632. if not self._sb_adv_data.get("data"):
  633. return None
  634. return self._sb_adv_data["data"].get("calibration")