__init__.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. # -*- coding: utf-8 -*-
  2. import datetime as dt
  3. import mutagen
  4. import os
  5. import re
  6. import symuid.tag_interface
  7. def _timestamp_to_utc_dt(ts):
  8. return dt.datetime.utcfromtimestamp(ts) \
  9. .replace(tzinfo=dt.timezone.utc)
  10. def _utc_dt_now():
  11. return dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
  12. class PlayCount:
  13. def __init__(self, player, library_id, register_dt, count):
  14. self.player = player
  15. self.library_id = library_id
  16. assert isinstance(register_dt, dt.datetime), register_dt
  17. assert register_dt.tzinfo is not None, register_dt
  18. self.register_dt = register_dt
  19. assert isinstance(count, int) and count >= 0, count
  20. self.count = count
  21. def __eq__(self, other):
  22. return type(self) == type(other) and vars(self) == vars(other)
  23. def __hash__(self):
  24. attrs_sorted = sorted(vars(self).items(), key=lambda p: p[0])
  25. return hash(tuple(v for k, v in attrs_sorted))
  26. def __repr__(self):
  27. return 'PlayCount({})'.format(', '.join(
  28. '{}={!r}'.format(k, v) for k, v in vars(self).items(),
  29. ))
  30. class Track:
  31. PATH_DEFAULT_IGNORE_REGEX = r'\.(itdb|itc2|itl|jpg|midi?|plist|xml|zip)$'
  32. def __init__(self, path):
  33. mutagen_file = mutagen.File(filename=path)
  34. # TODO support mp3 files without ID3 headers
  35. if mutagen_file is None:
  36. raise NotImplementedError(path)
  37. elif isinstance(mutagen_file.tags, mutagen.id3.ID3):
  38. self._iface = symuid.tag_interface.ID3(mutagen_file)
  39. elif isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags):
  40. self._iface = symuid.tag_interface.MP4(mutagen_file)
  41. else:
  42. raise NotImplementedError((path, type(mutagen_file)))
  43. @property
  44. def path(self):
  45. return self._iface.track_path
  46. @property
  47. def comment(self):
  48. return self._iface.get_comment()
  49. def get_uuid(self):
  50. return self._iface.get_track_uuid()
  51. def assign_uuid(self, uuid):
  52. if self.get_uuid():
  53. raise Exception("{!r} already has an uuid".format(self.path))
  54. self._iface.set_track_uuid(uuid)
  55. self._iface.save()
  56. def _get_play_counts(self, player=None, library_id=None):
  57. label = 'symuid:pcnt'
  58. assert library_id is None or player is not None
  59. if player:
  60. label += ':' + player
  61. if library_id:
  62. label += ':' + library_id
  63. elif library_id:
  64. raise Exception((player, library_id))
  65. for k, c in self._iface.get_free_ints(label):
  66. player, library_id, register_ts_dec = k.split(':')[2:]
  67. yield PlayCount(
  68. player=player,
  69. library_id=library_id,
  70. register_dt=_timestamp_to_utc_dt(int(register_ts_dec)),
  71. count=c,
  72. )
  73. def _get_latest_play_counts(self, player=None, library_id=None):
  74. latest = {}
  75. for count in self._get_play_counts(player, library_id):
  76. if not count.player in latest:
  77. latest[count.player] = {}
  78. if not count.library_id in latest[count.player] \
  79. or latest[count.player][count.library_id].register_dt < count.register_dt:
  80. latest[count.player][count.library_id] = count
  81. return [c for p in latest.values() for c in p.values()]
  82. def get_latest_play_count(self, player, library_id):
  83. assert player is not None, player
  84. assert library is not None, library_id
  85. counts = self._get_latest_play_counts(player, library_id)
  86. if len(counts) == 0:
  87. return None
  88. else:
  89. assert len(counts) == 1, counts
  90. return counts[0]
  91. def get_play_count_sum(self, player=None, library_id=None):
  92. return sum(c.count for c in self._get_latest_play_counts(player, library_id))
  93. def register_play_count(self, pc, tag_set_cb=None):
  94. assert isinstance(pc, PlayCount), pc
  95. tag_label = 'symuid:pcnt:{}:{}:{:.0f}'.format(
  96. pc.player, pc.library_id, pc.register_dt.timestamp(),
  97. )
  98. current_count = self._iface.get_free_int(tag_label)
  99. if current_count is None:
  100. new_tag = self._iface.set_free_int(tag_label, pc.count)
  101. self._iface.save()
  102. if tag_set_cb:
  103. tag_set_cb(self, new_tag)
  104. elif current_count != pc.count:
  105. raise Exception((current_count, pc.count))
  106. @classmethod
  107. def walk(cls, root_path, path_ignore_regex, ignored_cb=None, unsupported_cb=None):
  108. for dirpath, dirnames, filenames in os.walk(root_path):
  109. for filename in filenames:
  110. track_path = os.path.join(dirpath, filename)
  111. if path_ignore_regex.search(track_path):
  112. if ignored_cb is not None:
  113. ignored_cb(track_path)
  114. else:
  115. try:
  116. yield cls(track_path)
  117. except NotImplementedError as e:
  118. if unsupported_cb is not None:
  119. unsupported_cb(track_path, e)