sync.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import argparse
  2. import collections
  3. import re
  4. import sys
  5. import typing
  6. import symuid
  7. from symuid._uuid import generate_uuid4_bytes
  8. def _log_path(track_path, msg, stream=sys.stdout):
  9. stream.write("{!r}: {}\n".format(track_path, msg))
  10. def _log_path_error(track_path, msg):
  11. _log_path(track_path, msg, stream=sys.stderr)
  12. class _SyncPosition:
  13. def __init__(self):
  14. self._tracks = set() # type: typing.Set[symuid.Track]
  15. self._play_counts = set() # type: typing.Set[symuid.PlayCount]
  16. def add_track(self, track: symuid.Track) -> None:
  17. self._tracks.add(track)
  18. self._play_counts.update(track.get_play_counts())
  19. @property
  20. def play_counts(self) -> typing.Set[symuid.PlayCount]:
  21. return self._play_counts
  22. @property
  23. def tracks(self) -> typing.Set[symuid.Track]:
  24. return self._tracks
  25. def __repr__(self) -> str:
  26. return repr(vars(self))
  27. def sync(path, path_ignore_regex, show_ignored=False):
  28. sync_positions = collections.defaultdict(_SyncPosition)
  29. for track in symuid.Track.walk(
  30. root_path=path,
  31. path_ignore_regex=path_ignore_regex,
  32. ignored_cb=lambda p: show_ignored and _log_path(p, 'ignored'),
  33. unsupported_cb=lambda p, e:
  34. _log_path_error(p, 'unsupported type, skipped'),
  35. ):
  36. if track.get_uuid() is None:
  37. track.assign_uuid(generate_uuid4_bytes())
  38. _log_path(track.path, 'assigned uuid {!r}'.format(
  39. track.get_uuid()))
  40. sync_positions[track.get_uuid()].add_track(track)
  41. for sync_position in sync_positions.values():
  42. for track in sync_position.tracks:
  43. track_play_counts = set(track.get_play_counts())
  44. for play_count in sync_position.play_counts:
  45. if play_count not in track_play_counts:
  46. track.register_play_count(play_count)
  47. def _init_argparser():
  48. argparser = argparse.ArgumentParser(description=None)
  49. argparser.add_argument('path')
  50. argparser.add_argument(
  51. '--path-ignore-regex',
  52. default=symuid.Track.PATH_DEFAULT_IGNORE_REGEX,
  53. nargs=1,
  54. metavar='pattern',
  55. dest='path_ignore_regex',
  56. type=re.compile,
  57. help='(default: %(default)s)',
  58. )
  59. argparser.add_argument(
  60. '--show-ignored',
  61. action='store_true',
  62. )
  63. return argparser
  64. def _main():
  65. argparser = _init_argparser()
  66. args = argparser.parse_args()
  67. sync(**vars(args))