vacuum.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. """Vacuum parser."""
  2. from __future__ import annotations
  3. import struct
  4. def process_vacuum(
  5. data: bytes | None, mfr_data: bytes | None
  6. ) -> dict[str, bool | int | str]:
  7. """Support for s10, k10+ pro combo, k20 process service data."""
  8. if mfr_data is None:
  9. return {}
  10. _seq_num = mfr_data[6]
  11. _soc_version = get_device_fw_version(mfr_data[8:11])
  12. # Steps at the end of the last network configuration
  13. _step = mfr_data[11] & 0b00001111
  14. _mqtt_connected = bool(mfr_data[11] & 0b00010000)
  15. _battery = mfr_data[12]
  16. _work_status = mfr_data[13] & 0b00111111
  17. return {
  18. "sequence_number": _seq_num,
  19. "soc_version": _soc_version,
  20. "step": _step,
  21. "mqtt_connected": _mqtt_connected,
  22. "battery": _battery,
  23. "work_status": _work_status,
  24. }
  25. def get_device_fw_version(version_bytes: bytes) -> str | None:
  26. version1 = version_bytes[0] & 0x0F
  27. version2 = version_bytes[0] >> 4
  28. version3 = struct.unpack("<H", version_bytes[1:])[0]
  29. return f"{version1}.{version2}.{version3:>03d}"
  30. def process_vacuum_k(
  31. data: bytes | None, mfr_data: bytes | None
  32. ) -> dict[str, bool | int | str]:
  33. """Support for k10+, k10+ pro process service data."""
  34. if mfr_data is None:
  35. return {}
  36. _seq_num = mfr_data[6]
  37. _dustbin_bound = bool(mfr_data[7] & 0b10000000)
  38. _dusbin_connected = bool(mfr_data[7] & 0b01000000)
  39. _network_connected = bool(mfr_data[7] & 0b00100000)
  40. _work_status = (mfr_data[7] & 0b00010000) >> 4
  41. _battery = mfr_data[8] & 0b01111111
  42. return {
  43. "sequence_number": _seq_num,
  44. "dustbin_bound": _dustbin_bound,
  45. "dusbin_connected": _dusbin_connected,
  46. "network_connected": _network_connected,
  47. "work_status": _work_status,
  48. "battery": _battery,
  49. }