__init__.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # -*- coding: utf-8 -*-
  2. import datetime as dt
  3. import dateutil.tz
  4. import mutagen
  5. import os
  6. import symuid.tag_interface
  7. def _timestamp_to_utc_dt(ts):
  8. return dt.datetime.utcfromtimestamp(ts) \
  9. .replace(tzinfo=dateutil.tz.tzutc())
  10. assert _timestamp_to_utc_dt(1528795204) \
  11. == dt.datetime(2018, 6, 12, 9, 20, 4, tzinfo=dateutil.tz.tzutc())
  12. class PlayCount:
  13. def __init__(self, player, library_id, register_dt, count):
  14. self.player = player
  15. self.library_id = library_id
  16. assert isinstance(register_dt, dt.datetime), register_dt
  17. assert register_dt.tzinfo is not None, register_dt
  18. self.register_dt = register_dt
  19. assert isinstance(count, int) and count >= 0, count
  20. self.count = count
  21. class Track:
  22. def __init__(self, path):
  23. mutagen_file = mutagen.File(filename=path)
  24. if mutagen_file is None:
  25. raise NotImplementedError(path)
  26. elif isinstance(mutagen_file.tags, mutagen.id3.ID3):
  27. self._iface = symuid.tag_interface.ID3(mutagen_file)
  28. elif isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags):
  29. self._iface = symuid.tag_interface.MP4(mutagen_file)
  30. else:
  31. raise NotImplementedError((path, type(mutagen_file)))
  32. @property
  33. def path(self):
  34. return self._iface.track_path
  35. def get_uuid(self):
  36. return self._iface.get_track_uuid()
  37. def assign_uuid(self, uuid):
  38. if self.get_uuid():
  39. raise Exception("{!r} already has an uuid".format(self.path))
  40. self._iface.set_track_uuid(uuid)
  41. self._iface.save()
  42. def _get_play_counts(self, player=None, library_id=None):
  43. label = 'symuid:pcnt'
  44. assert library_id is None or player is not None
  45. if player:
  46. label += ':' + player
  47. if library_id:
  48. label += ':' + library_id
  49. for k, c in self._iface.get_free_ints(label):
  50. player, library_id, register_ts_dec = k.split(':')[2:]
  51. yield PlayCount(
  52. player=player,
  53. library_id=library_id,
  54. register_dt=_timestamp_to_utc_dt(int(register_ts_dec)),
  55. count=c,
  56. )
  57. def _get_latest_play_counts(self, player=None, library_id=None):
  58. latest = {}
  59. for count in self._get_play_counts(player, library_id):
  60. if not count.player in latest:
  61. latest[count.player] = {}
  62. if not count.library_id in latest[count.player] \
  63. or latest[count.player][count.library_id].register_dt < count.register_dt:
  64. latest[count.player][count.library_id] = count
  65. return [c for p in latest.values() for c in p.values()]
  66. def get_latest_play_count(self, player, library_id):
  67. assert player is not None, player
  68. assert library is not None, library_id
  69. counts = self._get_latest_play_counts(player, library_id)
  70. if len(counts) == 0:
  71. return None
  72. else:
  73. assert len(counts) == 1, counts
  74. return counts[0]
  75. def register_play_count(self, player, library_id, register_dt, play_count, tag_set_cb=None):
  76. # TODO accept PlayCount as param
  77. assert isinstance(register_dt, dt.datetime), register_dt
  78. assert isinstance(play_count, int), play_count
  79. tag_label = 'symuid:pcnt:{}:{}:{:.0f}'.format(
  80. player, library_id, register_dt.timestamp(),
  81. )
  82. current_count = self._iface.get_free_int(tag_label)
  83. if current_count is None:
  84. new_tag = self._iface.set_free_int(tag_label, play_count)
  85. self._iface.save()
  86. if tag_set_cb:
  87. tag_set_cb(self, new_tag)
  88. elif current_count != play_count:
  89. raise Exception((current_count, play_count))
  90. @classmethod
  91. def walk(cls, root_path, path_ignore_regex, ignored_cb, unsupported_cb):
  92. for dirpath, dirnames, filenames in os.walk(root_path):
  93. for filename in filenames:
  94. track_path = os.path.join(dirpath, filename)
  95. if path_ignore_regex.search(track_path):
  96. ignored_cb(track_path)
  97. else:
  98. try:
  99. yield cls(track_path)
  100. except NotImplementedError as e:
  101. unsupported_cb(track_path, e)