123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- # -*- coding: utf-8 -*-
- import datetime as dt
- import mutagen
- import os
- import re
- import symuid.tag_interface
- def _timestamp_to_utc_dt(ts):
- return dt.datetime.utcfromtimestamp(ts) \
- .replace(tzinfo=dt.timezone.utc)
- def _utc_dt_now():
- return dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
- class PlayCount:
- def __init__(self, player, library_id, register_dt, count):
- self.player = player
- self.library_id = library_id
- assert isinstance(register_dt, dt.datetime), register_dt
- assert register_dt.tzinfo is not None, register_dt
- self.register_dt = register_dt
- assert isinstance(count, int) and count >= 0, count
- self.count = count
- def __eq__(self, other):
- return type(self) == type(other) and vars(self) == vars(other)
- def __hash__(self):
- attrs_sorted = sorted(vars(self).items(), key=lambda p: p[0])
- return hash(tuple(v for k, v in attrs_sorted))
- def __repr__(self):
- return 'PlayCount({})'.format(', '.join(
- '{}={!r}'.format(k, v) for k, v in vars(self).items(),
- ))
- class Track:
- PATH_DEFAULT_IGNORE_REGEX = r'\.(itdb|itc2|itl|jpg|midi?|plist|xml|zip)$'
- def __init__(self, path):
- mutagen_file = mutagen.File(filename=path)
- # TODO support mp3 files without ID3 headers
- if mutagen_file is None:
- raise NotImplementedError(path)
- elif isinstance(mutagen_file.tags, mutagen.id3.ID3):
- self._iface = symuid.tag_interface.ID3(mutagen_file)
- elif isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags):
- self._iface = symuid.tag_interface.MP4(mutagen_file)
- else:
- raise NotImplementedError((path, type(mutagen_file)))
- @property
- def path(self):
- return self._iface.track_path
- @property
- def comment(self):
- return self._iface.get_comment()
- @comment.setter
- def comment(self, comment):
- self._iface.set_comment(comment)
- self._iface.save()
- def get_uuid(self):
- return self._iface.get_track_uuid()
- def assign_uuid(self, uuid):
- if self.get_uuid():
- raise Exception("{!r} already has an uuid".format(self.path))
- self._iface.set_track_uuid(uuid)
- self._iface.save()
- def _get_play_counts(self, player=None, library_id=None):
- label = 'symuid:pcnt'
- assert library_id is None or player is not None
- if player:
- label += ':' + player
- if library_id:
- label += ':' + library_id
- elif library_id:
- raise Exception((player, library_id))
- for k, c in self._iface.get_free_ints(label):
- player, library_id, register_ts_dec = k.split(':')[2:]
- yield PlayCount(
- player=player,
- library_id=library_id,
- register_dt=_timestamp_to_utc_dt(int(register_ts_dec)),
- count=c,
- )
- def _get_latest_play_counts(self, player=None, library_id=None):
- latest = {}
- for count in self._get_play_counts(player, library_id):
- if not count.player in latest:
- latest[count.player] = {}
- if not count.library_id in latest[count.player] \
- or latest[count.player][count.library_id].register_dt < count.register_dt:
- latest[count.player][count.library_id] = count
- return [c for p in latest.values() for c in p.values()]
- def get_latest_play_count(self, player, library_id):
- assert player is not None, player
- assert library_id is not None, library_id
- counts = self._get_latest_play_counts(player, library_id)
- if len(counts) == 0:
- return None
- else:
- assert len(counts) == 1, counts
- return counts[0]
- def get_play_count_sum(self, player=None, library_id=None):
- return sum(c.count for c in self._get_latest_play_counts(player, library_id))
- def register_play_count(self, pc, tag_set_cb=None):
- assert isinstance(pc, PlayCount), pc
- tag_label = 'symuid:pcnt:{}:{}:{:.0f}'.format(
- pc.player, pc.library_id, pc.register_dt.timestamp(),
- )
- current_count = self._iface.get_free_int(tag_label)
- if current_count is None:
- new_tag = self._iface.set_free_int(tag_label, pc.count)
- self._iface.save()
- if tag_set_cb:
- tag_set_cb(self, new_tag)
- elif current_count != pc.count:
- raise Exception((current_count, pc.count))
- def increase_play_count(self, player, library_id, tag_set_cb=None):
- current_pc = self.get_latest_play_count(player, library_id)
- self.register_play_count(
- PlayCount(
- player=player,
- library_id=library_id,
- register_dt=_utc_dt_now(),
- count=current_pc.count + 1 if current_pc else 1,
- ),
- tag_set_cb=tag_set_cb,
- )
- @classmethod
- def walk(cls, root_path, path_ignore_regex, ignored_cb=None, unsupported_cb=None):
- for dirpath, dirnames, filenames in os.walk(root_path):
- for filename in filenames:
- track_path = os.path.join(dirpath, filename)
- if path_ignore_regex.search(track_path):
- if ignored_cb is not None:
- ignored_cb(track_path)
- else:
- try:
- yield cls(track_path)
- except NotImplementedError as e:
- if unsupported_cb is not None:
- unsupported_cb(track_path, e)
|