__init__.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. # -*- coding: utf-8 -*-
  2. import datetime as dt
  3. import dateutil.tz
  4. import mutagen
  5. import os
  6. import re
  7. import symuid.tag_interface
  8. def _timestamp_to_utc_dt(ts):
  9. return dt.datetime.utcfromtimestamp(ts) \
  10. .replace(tzinfo=dateutil.tz.tzutc())
  11. assert _timestamp_to_utc_dt(1528795204) \
  12. == dt.datetime(2018, 6, 12, 9, 20, 4, tzinfo=dateutil.tz.tzutc())
  13. class PlayCount:
  14. def __init__(self, player, library_id, register_dt, count):
  15. self.player = player
  16. self.library_id = library_id
  17. assert isinstance(register_dt, dt.datetime), register_dt
  18. assert register_dt.tzinfo is not None, register_dt
  19. self.register_dt = register_dt
  20. assert isinstance(count, int) and count >= 0, count
  21. self.count = count
  22. class Track:
  23. PATH_DEFAULT_IGNORE_REGEX = r'\.(itdb|itc2|itl|jpg|midi?|plist|xml|zip)$'
  24. def __init__(self, path):
  25. mutagen_file = mutagen.File(filename=path)
  26. if mutagen_file is None:
  27. raise NotImplementedError(path)
  28. elif isinstance(mutagen_file.tags, mutagen.id3.ID3):
  29. self._iface = symuid.tag_interface.ID3(mutagen_file)
  30. elif isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags):
  31. self._iface = symuid.tag_interface.MP4(mutagen_file)
  32. else:
  33. raise NotImplementedError((path, type(mutagen_file)))
  34. @property
  35. def path(self):
  36. return self._iface.track_path
  37. @property
  38. def comment(self):
  39. return self._iface.get_comment()
  40. def get_uuid(self):
  41. return self._iface.get_track_uuid()
  42. def assign_uuid(self, uuid):
  43. if self.get_uuid():
  44. raise Exception("{!r} already has an uuid".format(self.path))
  45. self._iface.set_track_uuid(uuid)
  46. self._iface.save()
  47. def _get_play_counts(self, player=None, library_id=None):
  48. label = 'symuid:pcnt'
  49. assert library_id is None or player is not None
  50. if player:
  51. label += ':' + player
  52. if library_id:
  53. label += ':' + library_id
  54. elif library_id:
  55. raise Exception((player, library_id))
  56. for k, c in self._iface.get_free_ints(label):
  57. player, library_id, register_ts_dec = k.split(':')[2:]
  58. yield PlayCount(
  59. player=player,
  60. library_id=library_id,
  61. register_dt=_timestamp_to_utc_dt(int(register_ts_dec)),
  62. count=c,
  63. )
  64. def _get_latest_play_counts(self, player=None, library_id=None):
  65. latest = {}
  66. for count in self._get_play_counts(player, library_id):
  67. if not count.player in latest:
  68. latest[count.player] = {}
  69. if not count.library_id in latest[count.player] \
  70. or latest[count.player][count.library_id].register_dt < count.register_dt:
  71. latest[count.player][count.library_id] = count
  72. return [c for p in latest.values() for c in p.values()]
  73. def get_latest_play_count(self, player, library_id):
  74. assert player is not None, player
  75. assert library is not None, library_id
  76. counts = self._get_latest_play_counts(player, library_id)
  77. if len(counts) == 0:
  78. return None
  79. else:
  80. assert len(counts) == 1, counts
  81. return counts[0]
  82. def get_play_count_sum(self, player=None, library_id=None):
  83. return sum(c.count for c in self._get_latest_play_counts(player, library_id))
  84. def register_play_count(self, player, library_id, register_dt, play_count, tag_set_cb=None):
  85. # TODO accept PlayCount as param
  86. assert isinstance(register_dt, dt.datetime), register_dt
  87. assert isinstance(play_count, int), play_count
  88. tag_label = 'symuid:pcnt:{}:{}:{:.0f}'.format(
  89. player, library_id, register_dt.timestamp(),
  90. )
  91. current_count = self._iface.get_free_int(tag_label)
  92. if current_count is None:
  93. new_tag = self._iface.set_free_int(tag_label, play_count)
  94. self._iface.save()
  95. if tag_set_cb:
  96. tag_set_cb(self, new_tag)
  97. elif current_count != play_count:
  98. raise Exception((current_count, play_count))
  99. @classmethod
  100. def walk(cls, root_path, path_ignore_regex, ignored_cb=None, unsupported_cb=None):
  101. for dirpath, dirnames, filenames in os.walk(root_path):
  102. for filename in filenames:
  103. track_path = os.path.join(dirpath, filename)
  104. if path_ignore_regex.search(track_path):
  105. if ignored_cb is not None:
  106. ignored_cb(track_path)
  107. else:
  108. try:
  109. yield cls(track_path)
  110. except NotImplementedError as e:
  111. if unsupported_cb is not None:
  112. unsupported_cb(track_path, e)