sync.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import argparse
  2. import collections
  3. import os
  4. import re
  5. import sys
  6. import typing
  7. import symuid
  8. from symuid._uuid import generate_uuid4_bytes
  9. def _log_path(track_path, msg, stream=None):
  10. if not stream: # pytest capsys
  11. stream = sys.stdout
  12. stream.write("{!r}: {}\n".format(track_path, msg))
  13. def _log_path_error(track_path, msg):
  14. _log_path(track_path, msg, stream=sys.stderr)
  15. class _SyncPosition:
  16. def __init__(self):
  17. self._tracks = set() # type: typing.Set[symuid.Track]
  18. self._play_counts = set() # type: typing.Set[symuid.PlayCount]
  19. def add_track(self, track: symuid.Track) -> None:
  20. self._tracks.add(track)
  21. self._play_counts.update(track.get_play_counts())
  22. def sync(self, play_count_added_cb=None) -> None:
  23. for track in self._tracks:
  24. track_play_counts = set(track.get_play_counts())
  25. for play_count in self._play_counts:
  26. if play_count not in track_play_counts:
  27. track.register_play_count(
  28. play_count, tag_set_cb=play_count_added_cb)
  29. def __repr__(self) -> str:
  30. return repr(vars(self))
  31. def sync(tracks: typing.Iterator[symuid.Track], play_count_added_cb=None):
  32. sync_positions = collections.defaultdict(_SyncPosition)
  33. for track in tracks:
  34. if track.get_uuid() is None:
  35. track.assign_uuid(generate_uuid4_bytes())
  36. _log_path(track.path, 'assigned uuid {!r}'.format(
  37. track.get_uuid()))
  38. sync_positions[track.get_uuid()].add_track(track)
  39. for sync_position in sync_positions.values():
  40. sync_position.sync(play_count_added_cb=play_count_added_cb)
  41. def _walk_tracks(paths: typing.List[str], path_ignore_regex=None,
  42. ignored_cb=None, unsupported_cb=None) \
  43. -> typing.Iterator[symuid.Track]:
  44. for path in paths:
  45. if os.path.isdir(path):
  46. for track in symuid.Track.walk(
  47. root_path=path,
  48. path_ignore_regex=path_ignore_regex,
  49. ignored_cb=ignored_cb,
  50. unsupported_cb=unsupported_cb):
  51. yield track
  52. else:
  53. yield symuid.Track(path)
  54. def _main():
  55. argparser = argparse.ArgumentParser(description=None)
  56. argparser.add_argument(
  57. 'paths',
  58. metavar='path',
  59. nargs='+',
  60. help='track or folder containing tracks',
  61. )
  62. argparser.add_argument(
  63. '--path-ignore-regex',
  64. default=symuid.Track.PATH_DEFAULT_IGNORE_REGEX,
  65. nargs=1,
  66. metavar='pattern',
  67. dest='path_ignore_regex',
  68. type=re.compile,
  69. help='(default: %(default)s)',
  70. )
  71. argparser.add_argument(
  72. '--show-ignored',
  73. action='store_true',
  74. )
  75. args = argparser.parse_args()
  76. tracks = _walk_tracks(
  77. paths=args.paths,
  78. path_ignore_regex=args.path_ignore_regex,
  79. ignored_cb=lambda p: args.show_ignored and _log_path(p, 'ignored'),
  80. unsupported_cb=lambda p, e:
  81. _log_path_error(p, 'unsupported type, skipped'),
  82. )
  83. sync(tracks=tracks,
  84. play_count_added_cb=lambda track, tag:
  85. _log_path(track.path, 'added play count tag {!r}'.format(tag)))