cmus.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import re
  2. import sys
  3. def _int_from_bytes_sys(data_bytes):
  4. return int.from_bytes(data_bytes, byteorder=sys.byteorder)
  5. class Track:
  6. RESERVED_PAD_REGEX = rb"\xff{16,}"
  7. STRING_TERMINATOR = b"\x00"
  8. def __init__(self, cache_size, cache_bytes):
  9. """
  10. struct cache_entry {
  11. // size of this struct including size itself
  12. uint32_t size;
  13. int32_t play_count;
  14. int64_t mtime;
  15. int32_t duration;
  16. int32_t bitrate;
  17. int32_t bpm; // CACHE_VERSION 0x0d (commit 976c10d0e42c9ecd7389b28dd7c5b560a1702821)
  18. uint8_t _reserved[CACHE_ENTRY_RESERVED_SIZE];
  19. // filename, codec, codec_profile and N * (key, val)
  20. char strings[];
  21. };
  22. """
  23. assert len(cache_bytes) + 4 == cache_size
  24. self._play_count = _int_from_bytes_sys(cache_bytes[0:4])
  25. # self._mtime = _int_from_bytes_sys(cache_bytes[4:12])
  26. # self._duration_seconds = _int_from_bytes_sys(cache_bytes[12:16])
  27. # self._bitrate = _int_from_bytes_sys(cache_bytes[16:20])
  28. # self._bpm = _int_from_bytes_sys(cache_bytes[20:24])
  29. strings = re.split(self.RESERVED_PAD_REGEX, cache_bytes)[1].split(
  30. self.STRING_TERMINATOR
  31. )
  32. self._path = strings[0]
  33. @property
  34. def path(self):
  35. return self._path
  36. @property
  37. def play_count(self):
  38. return self._play_count
  39. class Cache:
  40. # pylint: disable=too-few-public-methods
  41. FILE_PREFIX = b"CTC"
  42. VERSION_LENGTH = 1
  43. SUPPORTED_VERSIONS = [b"\x0c", b"\x0d"]
  44. # always big endian, see cache_init()
  45. FLAGS_BYTEORDER = "big"
  46. FLAGS_LENGTH = 4
  47. FLAG_64_BIT = 0x01
  48. HEADER_LENGTH = len(FILE_PREFIX) + VERSION_LENGTH + FLAGS_LENGTH
  49. def __init__(self, path):
  50. self._path = path
  51. with open(self._path, "rb") as stream:
  52. # see cache.c cache_init()
  53. assert stream.read(len(self.FILE_PREFIX)) == self.FILE_PREFIX
  54. cache_version = stream.read(self.VERSION_LENGTH)
  55. assert cache_version in self.SUPPORTED_VERSIONS, cache_version
  56. flags = int.from_bytes(
  57. stream.read(self.FLAGS_LENGTH),
  58. byteorder=self.FLAGS_BYTEORDER, # persistent
  59. )
  60. assert flags & self.FLAG_64_BIT
  61. # support no other flags
  62. assert flags & ~self.FLAG_64_BIT == 0, flags
  63. def get_tracks(self):
  64. with open(self._path, "rb") as stream:
  65. stream.seek(self.HEADER_LENGTH)
  66. # size includes size itself
  67. while True:
  68. size = _int_from_bytes_sys(stream.read(4))
  69. if size == 0:
  70. break
  71. yield Track(size, stream.read(size - 4))
  72. # see cache.c write_ti ALIGN
  73. stream.read((-size) % 8)