helpers.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from __future__ import annotations
  2. import asyncio
  3. import struct
  4. from collections.abc import Coroutine
  5. from typing import Any, TypeVar
  6. _R = TypeVar("_R")
  7. _BACKGROUND_TASKS: set[asyncio.Task[Any]] = set()
  8. # Pre-compiled struct unpack methods for better performance
  9. _UNPACK_UINT16_BE = struct.Struct(">H").unpack_from # Big-endian unsigned 16-bit
  10. _UNPACK_UINT24_BE = struct.Struct(">I").unpack # For 3-byte values (read as 4 bytes)
  11. def create_background_task(target: Coroutine[Any, Any, _R]) -> asyncio.Task[_R]:
  12. """Create a background task."""
  13. task = asyncio.create_task(target)
  14. _BACKGROUND_TASKS.add(task)
  15. task.add_done_callback(_BACKGROUND_TASKS.remove)
  16. return task
  17. def parse_power_data(
  18. data: bytes, offset: int, scale: float = 1.0, mask: int | None = None
  19. ) -> float:
  20. """
  21. Parse 2-byte power-related data from bytes.
  22. Args:
  23. data: Raw bytes data
  24. offset: Starting offset for the 2-byte value
  25. scale: Scale factor to divide the raw value by (default: 1.0)
  26. mask: Optional bitmask to apply (default: None)
  27. Returns:
  28. Parsed float value
  29. """
  30. if offset + 2 > len(data):
  31. raise ValueError(
  32. f"Insufficient data: need at least {offset + 2} bytes, got {len(data)}"
  33. )
  34. value = _UNPACK_UINT16_BE(data, offset)[0]
  35. if mask is not None:
  36. value &= mask
  37. return value / scale
  38. def parse_uint24_be(data: bytes, offset: int) -> int:
  39. """
  40. Parse 3-byte big-endian unsigned integer.
  41. Args:
  42. data: Raw bytes data
  43. offset: Starting offset for the 3-byte value
  44. Returns:
  45. Parsed integer value
  46. """
  47. if offset + 3 > len(data):
  48. raise ValueError(
  49. f"Insufficient data: need at least {offset + 3} bytes, got {len(data)}"
  50. )
  51. # Read 3 bytes and pad with 0 at the beginning for 4-byte struct
  52. return _UNPACK_UINT24_BE(b"\x00" + data[offset : offset + 3])[0]
  53. def celsius_to_fahrenheit(celsius: float) -> float:
  54. """Convert temperature from Celsius to Fahrenheit."""
  55. return (celsius * 9 / 5) + 32