sync.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import argparse
  2. import collections
  3. import re
  4. import sys
  5. import typing
  6. import symuid
  7. from symuid._uuid import generate_uuid4_bytes
  8. def _log_path(track_path, msg, stream=sys.stdout):
  9. stream.write("{!r}: {}\n".format(track_path, msg))
  10. def _log_path_error(track_path, msg):
  11. _log_path(track_path, msg, stream=sys.stderr)
  12. class _SyncPosition:
  13. def __init__(self):
  14. self._tracks = set() # type: typing.Set[symuid.Track]
  15. self._play_counts = set() # type: typing.Set[symuid.PlayCount]
  16. def add_track(self, track: symuid.Track) -> None:
  17. self._tracks.add(track)
  18. self._play_counts.update(track.get_play_counts())
  19. def sync(self, play_count_added_cb=None) -> None:
  20. for track in self._tracks:
  21. track_play_counts = set(track.get_play_counts())
  22. for play_count in self._play_counts:
  23. if play_count not in track_play_counts:
  24. track.register_play_count(
  25. play_count, tag_set_cb=play_count_added_cb)
  26. def __repr__(self) -> str:
  27. return repr(vars(self))
  28. def sync(path, path_ignore_regex, show_ignored=False,
  29. play_count_added_cb=None):
  30. sync_positions = collections.defaultdict(_SyncPosition)
  31. for track in symuid.Track.walk(
  32. root_path=path,
  33. path_ignore_regex=path_ignore_regex,
  34. ignored_cb=lambda p: show_ignored and _log_path(p, 'ignored'),
  35. unsupported_cb=lambda p, e:
  36. _log_path_error(p, 'unsupported type, skipped'),
  37. ):
  38. if track.get_uuid() is None:
  39. track.assign_uuid(generate_uuid4_bytes())
  40. _log_path(track.path, 'assigned uuid {!r}'.format(
  41. track.get_uuid()))
  42. sync_positions[track.get_uuid()].add_track(track)
  43. for sync_position in sync_positions.values():
  44. sync_position.sync(play_count_added_cb=play_count_added_cb)
  45. def _init_argparser():
  46. argparser = argparse.ArgumentParser(description=None)
  47. argparser.add_argument('path')
  48. argparser.add_argument(
  49. '--path-ignore-regex',
  50. default=symuid.Track.PATH_DEFAULT_IGNORE_REGEX,
  51. nargs=1,
  52. metavar='pattern',
  53. dest='path_ignore_regex',
  54. type=re.compile,
  55. help='(default: %(default)s)',
  56. )
  57. argparser.add_argument(
  58. '--show-ignored',
  59. action='store_true',
  60. )
  61. return argparser
  62. def _main():
  63. argparser = _init_argparser()
  64. args = argparser.parse_args()
  65. sync(**vars(args))