__init__.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import datetime as dt
  2. import os
  3. import re
  4. import typing
  5. import mutagen
  6. from symuid import _tag_interface
  7. from symuid._datetime import datetime_utc_now, unix_epoch_time_to_datetime_utc
  8. class PlayCount:
  9. def __init__(self, player, library_id, register_dt, count):
  10. self.player = player
  11. self.library_id = library_id
  12. assert isinstance(register_dt, dt.datetime), register_dt
  13. assert register_dt.tzinfo is not None, register_dt
  14. self.register_dt = register_dt
  15. assert isinstance(count, int) and count >= 0, count
  16. self.count = count
  17. def __eq__(self, other):
  18. # pylint: disable=unidiomatic-typecheck
  19. return type(self) == type(other) and vars(self) == vars(other)
  20. def __hash__(self):
  21. attrs_sorted = sorted(vars(self).items(), key=lambda p: p[0])
  22. return hash(tuple(v for k, v in attrs_sorted))
  23. def __repr__(self):
  24. return 'PlayCount({})'.format(', '.join(
  25. '{}={!r}'.format(k, v) for k, v in vars(self).items()
  26. ))
  27. class Track:
  28. PATH_DEFAULT_IGNORE_REGEX = r'\.(itdb|itc2|itl|jpg|midi?|plist|xml|zip)$'
  29. def __init__(self, path):
  30. self._iface = self._select_tag_interface(path)
  31. @staticmethod
  32. def _select_tag_interface(track_path) -> _tag_interface.TagInterface:
  33. mutagen_file = mutagen.File(track_path)
  34. if mutagen_file is None:
  35. raise NotImplementedError(track_path)
  36. if isinstance(mutagen_file.tags, mutagen.id3.ID3):
  37. return _tag_interface.ID3(mutagen_file)
  38. if isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags):
  39. return _tag_interface.MP4(mutagen_file)
  40. if isinstance(mutagen_file, (mutagen.oggopus.OggOpus,
  41. mutagen.oggvorbis.OggVorbis)):
  42. return _tag_interface.Ogg(mutagen_file)
  43. raise NotImplementedError((track_path, type(mutagen_file)))
  44. def __eq__(self, other: 'Track') -> bool:
  45. return self.path == other.path
  46. def __hash__(self) -> int:
  47. return hash(self.path)
  48. @property
  49. def path(self):
  50. return self._iface.track_path
  51. @property
  52. def comment(self):
  53. return self._iface.get_comment()
  54. @comment.setter
  55. def comment(self, comment):
  56. self._iface.set_comment(comment)
  57. self._iface.save()
  58. def get_uuid(self):
  59. return self._iface.get_track_uuid()
  60. def assign_uuid(self, uuid):
  61. if self.get_uuid():
  62. raise Exception("{!r} already has an uuid".format(self.path))
  63. self._iface.set_track_uuid(uuid)
  64. self._iface.save()
  65. def _get_play_counts(self, player=None, library_id=None):
  66. label = 'symuid:pcnt'
  67. assert library_id is None or player is not None
  68. if player:
  69. label += ':' + player
  70. if library_id:
  71. label += ':' + library_id
  72. elif library_id:
  73. raise Exception((player, library_id))
  74. for k, count in self._iface.get_free_ints(label):
  75. player, library_id, register_ts_dec = k.split(':')[2:]
  76. yield PlayCount(
  77. player=player,
  78. library_id=library_id,
  79. register_dt=unix_epoch_time_to_datetime_utc(
  80. int(register_ts_dec)),
  81. count=count,
  82. )
  83. def get_play_counts(self) -> typing.Iterator[PlayCount]:
  84. return self._get_play_counts()
  85. def _get_latest_play_counts(self, player=None, library_id=None):
  86. latest = {}
  87. for count in self._get_play_counts(player, library_id):
  88. if not count.player in latest:
  89. latest[count.player] = {}
  90. if not count.library_id in latest[count.player] \
  91. or latest[count.player][count.library_id].register_dt < count.register_dt:
  92. latest[count.player][count.library_id] = count
  93. return [c for p in latest.values() for c in p.values()]
  94. def get_latest_play_count(self, player, library_id):
  95. assert player is not None, player
  96. assert library_id is not None, library_id
  97. counts = self._get_latest_play_counts(player, library_id)
  98. if len(counts) == 0:
  99. return None
  100. assert len(counts) == 1, counts
  101. return counts[0]
  102. def get_play_count_sum(self, player=None, library_id=None):
  103. return sum(c.count for c in self._get_latest_play_counts(player, library_id))
  104. def register_play_count(self, play_count, tag_set_cb=None):
  105. assert isinstance(play_count, PlayCount), play_count
  106. tag_label = 'symuid:pcnt:{}:{}:{:.0f}'.format(
  107. play_count.player, play_count.library_id,
  108. play_count.register_dt.timestamp(),
  109. )
  110. current_count = self._iface.get_free_int(tag_label)
  111. if current_count is None:
  112. new_tag = self._iface.set_free_int(tag_label, play_count.count)
  113. self._iface.save()
  114. if tag_set_cb:
  115. tag_set_cb(self, new_tag)
  116. elif current_count != play_count.count:
  117. raise Exception((current_count, play_count.count))
  118. def increase_play_count(self, player, library_id, tag_set_cb=None):
  119. current_pc = self.get_latest_play_count(player, library_id)
  120. self.register_play_count(
  121. PlayCount(
  122. player=player,
  123. library_id=library_id,
  124. register_dt=datetime_utc_now(),
  125. count=current_pc.count + 1 if current_pc else 1,
  126. ),
  127. tag_set_cb=tag_set_cb,
  128. )
  129. @classmethod
  130. def walk(cls, root_path, path_ignore_regex, ignored_cb=None, unsupported_cb=None):
  131. for dirpath, _, filenames in os.walk(root_path):
  132. for filename in filenames:
  133. track_path = os.path.join(dirpath, filename)
  134. if path_ignore_regex.search(track_path):
  135. if ignored_cb is not None:
  136. ignored_cb(track_path)
  137. else:
  138. try:
  139. yield cls(track_path)
  140. except NotImplementedError as exc:
  141. if unsupported_cb is not None:
  142. unsupported_cb(track_path, exc)