__init__.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import datetime as dt
  2. import os
  3. import re
  4. import subprocess
  5. import typing
  6. import mutagen
  7. from symuid import _tag_interface
  8. from symuid._datetime import datetime_utc_now, unix_epoch_time_to_datetime_utc
  9. class PlayCount:
  10. def __init__(self, player, library_id, register_dt, count):
  11. self.player = player
  12. self.library_id = library_id
  13. assert isinstance(register_dt, dt.datetime), register_dt
  14. assert register_dt.tzinfo is not None, register_dt
  15. self.register_dt = register_dt
  16. assert isinstance(count, int) and count >= 0, count
  17. self.count = count
  18. def __eq__(self, other):
  19. # pylint: disable=unidiomatic-typecheck
  20. return type(self) == type(other) and vars(self) == vars(other)
  21. def __hash__(self):
  22. attrs_sorted = sorted(vars(self).items(), key=lambda p: p[0])
  23. return hash(tuple(v for k, v in attrs_sorted))
  24. def __repr__(self):
  25. return "PlayCount({})".format(
  26. ", ".join("{}={!r}".format(k, v) for k, v in vars(self).items())
  27. )
  28. class Track:
  29. PATH_DEFAULT_IGNORE_REGEX = r"\.(itdb|itc2|itl|jpg|midi?|plist|xml|zip)$"
  30. def __init__(self, path):
  31. self._iface = self._select_tag_interface(path)
  32. @staticmethod
  33. def _select_tag_interface(track_path) -> _tag_interface.TagInterface:
  34. mutagen_file = mutagen.File(track_path)
  35. if mutagen_file is None:
  36. raise NotImplementedError(track_path)
  37. if isinstance(mutagen_file.tags, mutagen.id3.ID3):
  38. return _tag_interface.ID3(mutagen_file)
  39. if isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags):
  40. return _tag_interface.MP4(mutagen_file)
  41. if isinstance(
  42. mutagen_file, (mutagen.oggopus.OggOpus, mutagen.oggvorbis.OggVorbis)
  43. ):
  44. return _tag_interface.Ogg(mutagen_file)
  45. raise NotImplementedError((track_path, type(mutagen_file)))
  46. def __eq__(self, other: "Track") -> bool:
  47. return self.path == other.path
  48. def __hash__(self) -> int:
  49. return hash(self.path)
  50. @property
  51. def path(self):
  52. return self._iface.track_path
  53. @property
  54. def comment(self):
  55. return self._iface.get_comment()
  56. @comment.setter
  57. def comment(self, comment):
  58. self._iface.set_comment(comment)
  59. self._iface.save()
  60. def get_uuid(self):
  61. return self._iface.get_track_uuid()
  62. def assign_uuid(self, uuid):
  63. if self.get_uuid():
  64. raise Exception("{!r} already has an uuid".format(self.path))
  65. self._iface.set_track_uuid(uuid)
  66. self._iface.save()
  67. def _get_play_counts(self, player=None, library_id=None):
  68. label = "symuid:pcnt"
  69. assert library_id is None or player is not None
  70. if player:
  71. label += ":" + player
  72. if library_id:
  73. label += ":" + library_id
  74. elif library_id:
  75. raise Exception((player, library_id))
  76. for k, count in self._iface.get_free_ints(label):
  77. player, library_id, register_ts_dec = k.split(":")[2:]
  78. yield PlayCount(
  79. player=player,
  80. library_id=library_id,
  81. register_dt=unix_epoch_time_to_datetime_utc(int(register_ts_dec)),
  82. count=count,
  83. )
  84. def get_play_counts(self) -> typing.Iterator[PlayCount]:
  85. return self._get_play_counts()
  86. def _get_latest_play_counts(self, player=None, library_id=None):
  87. latest = {}
  88. for count in self._get_play_counts(player, library_id):
  89. if not count.player in latest:
  90. latest[count.player] = {}
  91. if (
  92. not count.library_id in latest[count.player]
  93. or latest[count.player][count.library_id].register_dt
  94. < count.register_dt
  95. ):
  96. latest[count.player][count.library_id] = count
  97. return [c for p in latest.values() for c in p.values()]
  98. def get_latest_play_count(self, player, library_id):
  99. assert player is not None, player
  100. assert library_id is not None, library_id
  101. counts = self._get_latest_play_counts(player, library_id)
  102. if len(counts) == 0:
  103. return None
  104. assert len(counts) == 1, counts
  105. return counts[0]
  106. def get_play_count_sum(self, player=None, library_id=None):
  107. return sum(c.count for c in self._get_latest_play_counts(player, library_id))
  108. def register_play_count(self, play_count, tag_set_cb=None):
  109. assert isinstance(play_count, PlayCount), play_count
  110. tag_label = "symuid:pcnt:{}:{}:{:.0f}".format(
  111. play_count.player,
  112. play_count.library_id,
  113. play_count.register_dt.timestamp(),
  114. )
  115. current_count = self._iface.get_free_int(tag_label)
  116. if current_count is None:
  117. new_tag = self._iface.set_free_int(tag_label, play_count.count)
  118. self._iface.save()
  119. if tag_set_cb:
  120. tag_set_cb(self, new_tag)
  121. elif current_count != play_count.count:
  122. raise Exception((current_count, play_count.count))
  123. def increase_play_count(self, player, library_id, tag_set_cb=None):
  124. current_pc = self.get_latest_play_count(player, library_id)
  125. self.register_play_count(
  126. PlayCount(
  127. player=player,
  128. library_id=library_id,
  129. register_dt=datetime_utc_now(),
  130. count=current_pc.count + 1 if current_pc else 1,
  131. ),
  132. tag_set_cb=tag_set_cb,
  133. )
  134. @classmethod
  135. def walk(cls, root_path, path_ignore_regex, ignored_cb=None, unsupported_cb=None):
  136. for dirpath, _, filenames in os.walk(root_path):
  137. for filename in filenames:
  138. track_path = os.path.join(dirpath, filename)
  139. if path_ignore_regex and path_ignore_regex.search(track_path):
  140. if ignored_cb is not None:
  141. ignored_cb(track_path)
  142. else:
  143. try:
  144. yield cls(track_path)
  145. except NotImplementedError as exc:
  146. if unsupported_cb is not None:
  147. unsupported_cb(track_path, exc)
  148. _CMUS_REMOTE_COMMAND = "cmus-remote"
  149. _CMUS_STATUS_FILE_PATH_REGEX = re.compile(r"^file (.*)$", flags=re.MULTILINE)
  150. _CMUS_STATUS_ENCODING = "utf-8"
  151. @classmethod
  152. def get_active_cmus(cls) -> typing.Optional["Track"]:
  153. try:
  154. status_proc = subprocess.run(
  155. [cls._CMUS_REMOTE_COMMAND, "-Q"],
  156. stdout=subprocess.PIPE,
  157. stderr=subprocess.PIPE,
  158. check=True,
  159. encoding=cls._CMUS_STATUS_ENCODING,
  160. )
  161. except subprocess.CalledProcessError as exc:
  162. if exc.returncode == 1 and "cmus is not running" in exc.stderr:
  163. return None
  164. raise
  165. active_path_match = cls._CMUS_STATUS_FILE_PATH_REGEX.search(status_proc.stdout)
  166. if not active_path_match:
  167. return None
  168. return cls(path=active_path_match.group(1))