import datetime as dt import os import re import typing import mutagen from symuid import _tag_interface from symuid._datetime import datetime_utc_now, unix_epoch_time_to_datetime_utc class PlayCount: def __init__(self, player, library_id, register_dt, count): self.player = player self.library_id = library_id assert isinstance(register_dt, dt.datetime), register_dt assert register_dt.tzinfo is not None, register_dt self.register_dt = register_dt assert isinstance(count, int) and count >= 0, count self.count = count def __eq__(self, other): # pylint: disable=unidiomatic-typecheck return type(self) == type(other) and vars(self) == vars(other) def __hash__(self): attrs_sorted = sorted(vars(self).items(), key=lambda p: p[0]) return hash(tuple(v for k, v in attrs_sorted)) def __repr__(self): return 'PlayCount({})'.format(', '.join( '{}={!r}'.format(k, v) for k, v in vars(self).items() )) class Track: PATH_DEFAULT_IGNORE_REGEX = r'\.(itdb|itc2|itl|jpg|midi?|plist|xml|zip)$' def __init__(self, path): self._iface = self._select_tag_interface(path) @staticmethod def _select_tag_interface(track_path) -> _tag_interface.TagInterface: mutagen_file = mutagen.File(track_path) if mutagen_file is None: raise NotImplementedError(track_path) if isinstance(mutagen_file.tags, mutagen.id3.ID3): return _tag_interface.ID3(mutagen_file) if isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags): return _tag_interface.MP4(mutagen_file) if isinstance(mutagen_file, (mutagen.oggopus.OggOpus, mutagen.oggvorbis.OggVorbis)): return _tag_interface.Ogg(mutagen_file) raise NotImplementedError((track_path, type(mutagen_file))) def __eq__(self, other: 'Track') -> bool: return self.path == other.path def __hash__(self) -> int: return hash(self.path) @property def path(self): return self._iface.track_path @property def comment(self): return self._iface.get_comment() @comment.setter def comment(self, comment): self._iface.set_comment(comment) self._iface.save() def get_uuid(self): return self._iface.get_track_uuid() def assign_uuid(self, uuid): if self.get_uuid(): raise Exception("{!r} already has an uuid".format(self.path)) self._iface.set_track_uuid(uuid) self._iface.save() def _get_play_counts(self, player=None, library_id=None): label = 'symuid:pcnt' assert library_id is None or player is not None if player: label += ':' + player if library_id: label += ':' + library_id elif library_id: raise Exception((player, library_id)) for k, count in self._iface.get_free_ints(label): player, library_id, register_ts_dec = k.split(':')[2:] yield PlayCount( player=player, library_id=library_id, register_dt=unix_epoch_time_to_datetime_utc( int(register_ts_dec)), count=count, ) def get_play_counts(self) -> typing.Iterator[PlayCount]: return self._get_play_counts() def _get_latest_play_counts(self, player=None, library_id=None): latest = {} for count in self._get_play_counts(player, library_id): if not count.player in latest: latest[count.player] = {} if not count.library_id in latest[count.player] \ or latest[count.player][count.library_id].register_dt < count.register_dt: latest[count.player][count.library_id] = count return [c for p in latest.values() for c in p.values()] def get_latest_play_count(self, player, library_id): assert player is not None, player assert library_id is not None, library_id counts = self._get_latest_play_counts(player, library_id) if len(counts) == 0: return None assert len(counts) == 1, counts return counts[0] def get_play_count_sum(self, player=None, library_id=None): return sum(c.count for c in self._get_latest_play_counts(player, library_id)) def register_play_count(self, play_count, tag_set_cb=None): assert isinstance(play_count, PlayCount), play_count tag_label = 'symuid:pcnt:{}:{}:{:.0f}'.format( play_count.player, play_count.library_id, play_count.register_dt.timestamp(), ) current_count = self._iface.get_free_int(tag_label) if current_count is None: new_tag = self._iface.set_free_int(tag_label, play_count.count) self._iface.save() if tag_set_cb: tag_set_cb(self, new_tag) elif current_count != play_count.count: raise Exception((current_count, play_count.count)) def increase_play_count(self, player, library_id, tag_set_cb=None): current_pc = self.get_latest_play_count(player, library_id) self.register_play_count( PlayCount( player=player, library_id=library_id, register_dt=datetime_utc_now(), count=current_pc.count + 1 if current_pc else 1, ), tag_set_cb=tag_set_cb, ) @classmethod def walk(cls, root_path, path_ignore_regex, ignored_cb=None, unsupported_cb=None): for dirpath, _, filenames in os.walk(root_path): for filename in filenames: track_path = os.path.join(dirpath, filename) if path_ignore_regex.search(track_path): if ignored_cb is not None: ignored_cb(track_path) else: try: yield cls(track_path) except NotImplementedError as exc: if unsupported_cb is not None: unsupported_cb(track_path, exc)