sync.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import argparse
  2. import collections
  3. import re
  4. import sys
  5. import typing
  6. import symuid
  7. from symuid._uuid import generate_uuid4_bytes
  8. def _log_path(track_path, msg, stream=None):
  9. if not stream: # pytest capsys
  10. stream = sys.stdout
  11. stream.write("{!r}: {}\n".format(track_path, msg))
  12. def _log_path_error(track_path, msg):
  13. _log_path(track_path, msg, stream=sys.stderr)
  14. class _SyncPosition:
  15. def __init__(self):
  16. self._tracks = set() # type: typing.Set[symuid.Track]
  17. self._play_counts = set() # type: typing.Set[symuid.PlayCount]
  18. def add_track(self, track: symuid.Track) -> None:
  19. self._tracks.add(track)
  20. self._play_counts.update(track.get_play_counts())
  21. def sync(self, play_count_added_cb=None) -> None:
  22. for track in self._tracks:
  23. track_play_counts = set(track.get_play_counts())
  24. for play_count in self._play_counts:
  25. if play_count not in track_play_counts:
  26. track.register_play_count(
  27. play_count, tag_set_cb=play_count_added_cb)
  28. def __repr__(self) -> str:
  29. return repr(vars(self))
  30. def sync(path, path_ignore_regex, show_ignored=False,
  31. play_count_added_cb=None):
  32. sync_positions = collections.defaultdict(_SyncPosition)
  33. for track in symuid.Track.walk(
  34. root_path=path,
  35. path_ignore_regex=path_ignore_regex,
  36. ignored_cb=lambda p: show_ignored and _log_path(p, 'ignored'),
  37. unsupported_cb=lambda p, e:
  38. _log_path_error(p, 'unsupported type, skipped'),
  39. ):
  40. if track.get_uuid() is None:
  41. track.assign_uuid(generate_uuid4_bytes())
  42. _log_path(track.path, 'assigned uuid {!r}'.format(
  43. track.get_uuid()))
  44. sync_positions[track.get_uuid()].add_track(track)
  45. for sync_position in sync_positions.values():
  46. sync_position.sync(play_count_added_cb=play_count_added_cb)
  47. def _init_argparser():
  48. argparser = argparse.ArgumentParser(description=None)
  49. argparser.add_argument('path')
  50. argparser.add_argument(
  51. '--path-ignore-regex',
  52. default=symuid.Track.PATH_DEFAULT_IGNORE_REGEX,
  53. nargs=1,
  54. metavar='pattern',
  55. dest='path_ignore_regex',
  56. type=re.compile,
  57. help='(default: %(default)s)',
  58. )
  59. argparser.add_argument(
  60. '--show-ignored',
  61. action='store_true',
  62. )
  63. return argparser
  64. def _main():
  65. argparser = _init_argparser()
  66. args = argparser.parse_args()
  67. sync(**vars(args),
  68. play_count_added_cb=lambda track, tag:
  69. _log_path(track.path, 'added play count tag {!r}'.format(tag)))