vacuum.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. """Vacuum parser."""
  2. from __future__ import annotations
  3. import logging
  4. import struct
  5. _LOGGER = logging.getLogger(__name__)
  6. def process_vacuum(
  7. data: bytes | None, mfr_data: bytes | None
  8. ) -> dict[str, bool | int | str]:
  9. """Support for s10, k10+ pro combo, k20 process service data."""
  10. if mfr_data is None:
  11. return {}
  12. _seq_num = mfr_data[6]
  13. _soc_version = get_device_fw_version(mfr_data[8:11])
  14. # Steps at the end of the last network configuration
  15. _step = mfr_data[11] & 0b00001111
  16. _mqtt_connected = bool(mfr_data[11] & 0b00010000)
  17. _battery = mfr_data[12]
  18. _work_status = mfr_data[13] & 0b00111111
  19. result = {
  20. "sequence_number": _seq_num,
  21. "soc_version": _soc_version,
  22. "step": _step,
  23. "mqtt_connected": _mqtt_connected,
  24. "battery": _battery,
  25. "work_status": _work_status,
  26. }
  27. _LOGGER.debug("Processed Vacuum data: %s, result: %s", data, result)
  28. return result
  29. def get_device_fw_version(version_bytes: bytes) -> str | None:
  30. version1 = version_bytes[0] & 0x0F
  31. version2 = version_bytes[0] >> 4
  32. version3 = struct.unpack("<H", version_bytes[1:])[0]
  33. return f"{version1}.{version2}.{version3:>03d}"
  34. def process_vacuum_k(
  35. data: bytes | None, mfr_data: bytes | None
  36. ) -> dict[str, bool | int | str]:
  37. """Support for k10+, k10+ pro process service data."""
  38. if mfr_data is None:
  39. return {}
  40. _seq_num = mfr_data[6]
  41. _dustbin_bound = bool(mfr_data[7] & 0b10000000)
  42. _dusbin_connected = bool(mfr_data[7] & 0b01000000)
  43. _network_connected = bool(mfr_data[7] & 0b00100000)
  44. _work_status = (mfr_data[7] & 0b00010000) >> 4
  45. _battery = mfr_data[8] & 0b01111111
  46. result = {
  47. "sequence_number": _seq_num,
  48. "dustbin_bound": _dustbin_bound,
  49. "dusbin_connected": _dusbin_connected,
  50. "network_connected": _network_connected,
  51. "work_status": _work_status,
  52. "battery": _battery,
  53. }
  54. _LOGGER.debug("Processed Vacuum K data: %s, result: %s", data, result)
  55. return result