123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- import re
- import sys
- def _int_from_bytes_sys(data_bytes):
- return int.from_bytes(data_bytes, byteorder=sys.byteorder)
- class Track:
- RESERVED_PAD_REGEX = rb'\xff{16,}'
- STRING_TERMINATOR = b'\x00'
- def __init__(self, cache_size, cache_bytes):
- """
- struct cache_entry {
- // size of this struct including size itself
- uint32_t size;
- int32_t play_count;
- int64_t mtime;
- int32_t duration;
- int32_t bitrate;
- int32_t bpm; // CACHE_VERSION 0x0d (commit 976c10d0e42c9ecd7389b28dd7c5b560a1702821)
- uint8_t _reserved[CACHE_ENTRY_RESERVED_SIZE];
- // filename, codec, codec_profile and N * (key, val)
- char strings[];
- };
- """
- assert len(cache_bytes) + 4 == cache_size
- self._play_count = _int_from_bytes_sys(cache_bytes[0:4])
- # self._mtime = _int_from_bytes_sys(cache_bytes[4:12])
- # self._duration_seconds = _int_from_bytes_sys(cache_bytes[12:16])
- # self._bitrate = _int_from_bytes_sys(cache_bytes[16:20])
- # self._bpm = _int_from_bytes_sys(cache_bytes[20:24])
- strings = re.split(self.RESERVED_PAD_REGEX, cache_bytes)[1] \
- .split(self.STRING_TERMINATOR)
- self._path = strings[0]
- @property
- def path(self):
- return self._path
- @property
- def play_count(self):
- return self._play_count
- class Cache:
- # pylint: disable=too-few-public-methods
- FILE_PREFIX = b'CTC'
- VERSION_LENGTH = 1
- SUPPORTED_VERSIONS = [b'\x0c', b'\x0d']
- # always big endian, see cache_init()
- FLAGS_BYTEORDER = 'big'
- FLAGS_LENGTH = 4
- FLAG_64_BIT = 0x01
- HEADER_LENGTH = len(FILE_PREFIX) + VERSION_LENGTH + FLAGS_LENGTH
- def __init__(self, path):
- self._path = path
- with open(self._path, 'rb') as stream:
- # see cache.c cache_init()
- assert stream.read(len(self.FILE_PREFIX)) == self.FILE_PREFIX
- cache_version = stream.read(self.VERSION_LENGTH)
- assert cache_version in self.SUPPORTED_VERSIONS, cache_version
- flags = int.from_bytes(
- stream.read(self.FLAGS_LENGTH),
- byteorder=self.FLAGS_BYTEORDER, # persistent
- )
- assert flags & self.FLAG_64_BIT
- # support no other flags
- assert flags & ~self.FLAG_64_BIT == 0, flags
- def get_tracks(self):
- with open(self._path, 'rb') as stream:
- stream.seek(self.HEADER_LENGTH)
- # size includes size itself
- while True:
- size = _int_from_bytes_sys(stream.read(4))
- if size == 0:
- break
- yield Track(size, stream.read(size - 4))
- # see cache.c write_ti ALIGN
- stream.read((-size) % 8)
|