123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- import datetime as dt
- import os
- import re
- import typing
- import mutagen
- from symuid import _tag_interface
- from symuid._datetime import datetime_utc_now, unix_epoch_time_to_datetime_utc
- class PlayCount:
- def __init__(self, player, library_id, register_dt, count):
- self.player = player
- self.library_id = library_id
- assert isinstance(register_dt, dt.datetime), register_dt
- assert register_dt.tzinfo is not None, register_dt
- self.register_dt = register_dt
- assert isinstance(count, int) and count >= 0, count
- self.count = count
- def __eq__(self, other):
- # pylint: disable=unidiomatic-typecheck
- return type(self) == type(other) and vars(self) == vars(other)
- def __hash__(self):
- attrs_sorted = sorted(vars(self).items(), key=lambda p: p[0])
- return hash(tuple(v for k, v in attrs_sorted))
- def __repr__(self):
- return 'PlayCount({})'.format(', '.join(
- '{}={!r}'.format(k, v) for k, v in vars(self).items()
- ))
- class Track:
- PATH_DEFAULT_IGNORE_REGEX = r'\.(itdb|itc2|itl|jpg|midi?|plist|xml|zip)$'
- def __init__(self, path):
- self._iface = self._select_tag_interface(path)
- @staticmethod
- def _select_tag_interface(track_path) -> _tag_interface.TagInterface:
- mutagen_file = mutagen.File(track_path)
- if mutagen_file is None:
- raise NotImplementedError(track_path)
- if isinstance(mutagen_file.tags, mutagen.id3.ID3):
- return _tag_interface.ID3(mutagen_file)
- if isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags):
- return _tag_interface.MP4(mutagen_file)
- if isinstance(mutagen_file, (mutagen.oggopus.OggOpus,
- mutagen.oggvorbis.OggVorbis)):
- return _tag_interface.Ogg(mutagen_file)
- raise NotImplementedError((track_path, type(mutagen_file)))
- def __eq__(self, other: 'Track') -> bool:
- return self.path == other.path
- def __hash__(self) -> int:
- return hash(self.path)
- @property
- def path(self):
- return self._iface.track_path
- @property
- def comment(self):
- return self._iface.get_comment()
- @comment.setter
- def comment(self, comment):
- self._iface.set_comment(comment)
- self._iface.save()
- def get_uuid(self):
- return self._iface.get_track_uuid()
- def assign_uuid(self, uuid):
- if self.get_uuid():
- raise Exception("{!r} already has an uuid".format(self.path))
- self._iface.set_track_uuid(uuid)
- self._iface.save()
- def _get_play_counts(self, player=None, library_id=None):
- label = 'symuid:pcnt'
- assert library_id is None or player is not None
- if player:
- label += ':' + player
- if library_id:
- label += ':' + library_id
- elif library_id:
- raise Exception((player, library_id))
- for k, count in self._iface.get_free_ints(label):
- player, library_id, register_ts_dec = k.split(':')[2:]
- yield PlayCount(
- player=player,
- library_id=library_id,
- register_dt=unix_epoch_time_to_datetime_utc(
- int(register_ts_dec)),
- count=count,
- )
- def get_play_counts(self) -> typing.Iterator[PlayCount]:
- return self._get_play_counts()
- def _get_latest_play_counts(self, player=None, library_id=None):
- latest = {}
- for count in self._get_play_counts(player, library_id):
- if not count.player in latest:
- latest[count.player] = {}
- if not count.library_id in latest[count.player] \
- or latest[count.player][count.library_id].register_dt < count.register_dt:
- latest[count.player][count.library_id] = count
- return [c for p in latest.values() for c in p.values()]
- def get_latest_play_count(self, player, library_id):
- assert player is not None, player
- assert library_id is not None, library_id
- counts = self._get_latest_play_counts(player, library_id)
- if len(counts) == 0:
- return None
- assert len(counts) == 1, counts
- return counts[0]
- def get_play_count_sum(self, player=None, library_id=None):
- return sum(c.count for c in self._get_latest_play_counts(player, library_id))
- def register_play_count(self, play_count, tag_set_cb=None):
- assert isinstance(play_count, PlayCount), play_count
- tag_label = 'symuid:pcnt:{}:{}:{:.0f}'.format(
- play_count.player, play_count.library_id,
- play_count.register_dt.timestamp(),
- )
- current_count = self._iface.get_free_int(tag_label)
- if current_count is None:
- new_tag = self._iface.set_free_int(tag_label, play_count.count)
- self._iface.save()
- if tag_set_cb:
- tag_set_cb(self, new_tag)
- elif current_count != play_count.count:
- raise Exception((current_count, play_count.count))
- def increase_play_count(self, player, library_id, tag_set_cb=None):
- current_pc = self.get_latest_play_count(player, library_id)
- self.register_play_count(
- PlayCount(
- player=player,
- library_id=library_id,
- register_dt=datetime_utc_now(),
- count=current_pc.count + 1 if current_pc else 1,
- ),
- tag_set_cb=tag_set_cb,
- )
- @classmethod
- def walk(cls, root_path, path_ignore_regex, ignored_cb=None, unsupported_cb=None):
- for dirpath, _, filenames in os.walk(root_path):
- for filename in filenames:
- track_path = os.path.join(dirpath, filename)
- if path_ignore_regex.search(track_path):
- if ignored_cb is not None:
- ignored_cb(track_path)
- else:
- try:
- yield cls(track_path)
- except NotImplementedError as exc:
- if unsupported_cb is not None:
- unsupported_cb(track_path, exc)
|