sync.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import argparse
  2. import collections
  3. import os
  4. import re
  5. import sys
  6. import typing
  7. import symuid
  8. from symuid._uuid import generate_uuid4_bytes
  9. def _log_path(track_path, msg, stream=None):
  10. if not stream: # pytest capsys
  11. stream = sys.stdout
  12. stream.write("{!r}: {}\n".format(track_path, msg))
  13. def _log_path_error(track_path, msg):
  14. _log_path(track_path, msg, stream=sys.stderr)
  15. class _SyncPosition:
  16. def __init__(self):
  17. self._tracks = set() # type: typing.Set[symuid.Track]
  18. self._play_counts = set() # type: typing.Set[symuid.PlayCount]
  19. def add_track(self, track: symuid.Track) -> None:
  20. self._tracks.add(track)
  21. self._play_counts.update(track.get_play_counts())
  22. def sync(self, play_count_added_cb=None) -> None:
  23. for track in self._tracks:
  24. track_play_counts = set(track.get_play_counts())
  25. for play_count in self._play_counts:
  26. if play_count not in track_play_counts:
  27. track.register_play_count(
  28. play_count, tag_set_cb=play_count_added_cb
  29. )
  30. def __repr__(self) -> str:
  31. return repr(vars(self))
  32. def sync(tracks: typing.Iterator[symuid.Track], play_count_added_cb=None):
  33. sync_positions = collections.defaultdict(_SyncPosition)
  34. for track in tracks:
  35. if track.get_uuid() is None:
  36. track.assign_uuid(generate_uuid4_bytes())
  37. _log_path(track.path, "assigned uuid {!r}".format(track.get_uuid()))
  38. sync_positions[track.get_uuid()].add_track(track)
  39. for sync_position in sync_positions.values():
  40. sync_position.sync(play_count_added_cb=play_count_added_cb)
  41. def _walk_tracks(
  42. paths: typing.List[str],
  43. path_ignore_regex=None,
  44. ignored_cb=None,
  45. unsupported_cb=None,
  46. ) -> typing.Iterator[symuid.Track]:
  47. for path in paths:
  48. if os.path.isdir(path):
  49. for track in symuid.Track.walk(
  50. root_path=path,
  51. path_ignore_regex=path_ignore_regex,
  52. ignored_cb=ignored_cb,
  53. unsupported_cb=unsupported_cb,
  54. ):
  55. yield track
  56. else:
  57. yield symuid.Track(path)
  58. def _main():
  59. argparser = argparse.ArgumentParser(description=None)
  60. argparser.add_argument(
  61. "paths", metavar="path", nargs="+", help="track or folder containing tracks",
  62. )
  63. argparser.add_argument(
  64. "--path-ignore-regex",
  65. default=symuid.Track.PATH_DEFAULT_IGNORE_REGEX,
  66. nargs=1,
  67. metavar="pattern",
  68. dest="path_ignore_regex",
  69. type=re.compile,
  70. help="(default: %(default)s)",
  71. )
  72. argparser.add_argument(
  73. "--show-ignored", action="store_true",
  74. )
  75. args = argparser.parse_args()
  76. tracks = _walk_tracks(
  77. paths=args.paths,
  78. path_ignore_regex=args.path_ignore_regex,
  79. ignored_cb=lambda p: args.show_ignored and _log_path(p, "ignored"),
  80. unsupported_cb=lambda p, e: _log_path_error(p, "unsupported type, skipped"),
  81. )
  82. sync(
  83. tracks=tracks,
  84. play_count_added_cb=lambda track, tag: _log_path(
  85. track.path, "added play count tag {!r}".format(tag)
  86. ),
  87. )