vacuum.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. """Vacuum parser."""
  2. from __future__ import annotations
  3. import struct
  4. def process_vacuum(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int | str]:
  5. """Support for s10, k10+ pro combo, k20 process service data."""
  6. if mfr_data is None:
  7. return {}
  8. _seq_num = mfr_data[6]
  9. _soc_version = get_device_fw_version(mfr_data[8:11])
  10. # Steps at the end of the last network configuration
  11. _step = mfr_data[11] & 0b00001111
  12. _mqtt_connected = bool(mfr_data[11] & 0b00010000)
  13. _battery = mfr_data[12]
  14. _work_status = mfr_data[13] & 0b00111111
  15. return {
  16. "sequence_number": _seq_num,
  17. "soc_version": _soc_version,
  18. "step": _step,
  19. "mqtt_connected": _mqtt_connected,
  20. "battery": _battery,
  21. "work_status": _work_status,
  22. }
  23. def get_device_fw_version(version_bytes: bytes) -> str | None:
  24. version1 = version_bytes[0] & 0x0f
  25. version2 = version_bytes[0] >> 4
  26. version3 = struct.unpack('<H', version_bytes[1:])[0]
  27. return f'{version1}.{version2}.{version3:>03d}'
  28. def process_vacuum_k(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int | str]:
  29. """Support for k10+, k10+ pro process service data."""
  30. if mfr_data is None:
  31. return {}
  32. _seq_num = mfr_data[6]
  33. _dustbin_bound = bool(mfr_data[7] & 0b10000000)
  34. _dusbin_connected = bool(mfr_data[7] & 0b01000000)
  35. _network_conncted = bool(mfr_data[7] & 0b00100000)
  36. _work_status = (mfr_data[7] & 0b00010000) >> 4
  37. _battery = mfr_data[8] & 0b01111111
  38. return {
  39. "sequence_number": _seq_num,
  40. "dustbin_bound": _dustbin_bound,
  41. "dusbin_connected": _dusbin_connected,
  42. "network_conncted": _network_conncted,
  43. "work_status": _work_status,
  44. "battery": _battery
  45. }