123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import argparse
- import collections
- import os
- import re
- import sys
- import typing
- import symuid
- from symuid._uuid import generate_uuid4_bytes
- def _log_path(track_path, msg, stream=None):
- if not stream: # pytest capsys
- stream = sys.stdout
- stream.write("{!r}: {}\n".format(track_path, msg))
- def _log_path_error(track_path, msg):
- _log_path(track_path, msg, stream=sys.stderr)
- class _SyncPosition:
- def __init__(self):
- self._tracks = set() # type: typing.Set[symuid.Track]
- self._play_counts = set() # type: typing.Set[symuid.PlayCount]
- def add_track(self, track: symuid.Track) -> None:
- self._tracks.add(track)
- self._play_counts.update(track.get_play_counts())
- def sync(self, play_count_added_cb=None) -> None:
- for track in self._tracks:
- track_play_counts = set(track.get_play_counts())
- for play_count in self._play_counts:
- if play_count not in track_play_counts:
- track.register_play_count(
- play_count, tag_set_cb=play_count_added_cb)
- def __repr__(self) -> str:
- return repr(vars(self))
- def sync(tracks: typing.Iterator[symuid.Track], play_count_added_cb=None):
- sync_positions = collections.defaultdict(_SyncPosition)
- for track in tracks:
- if track.get_uuid() is None:
- track.assign_uuid(generate_uuid4_bytes())
- _log_path(track.path, 'assigned uuid {!r}'.format(
- track.get_uuid()))
- sync_positions[track.get_uuid()].add_track(track)
- for sync_position in sync_positions.values():
- sync_position.sync(play_count_added_cb=play_count_added_cb)
- def _walk_tracks(paths: typing.List[str], path_ignore_regex=None,
- ignored_cb=None, unsupported_cb=None) \
- -> typing.Iterator[symuid.Track]:
- for path in paths:
- if os.path.isdir(path):
- for track in symuid.Track.walk(
- root_path=path,
- path_ignore_regex=path_ignore_regex,
- ignored_cb=ignored_cb,
- unsupported_cb=unsupported_cb):
- yield track
- else:
- yield symuid.Track(path)
- def _main():
- argparser = argparse.ArgumentParser(description=None)
- argparser.add_argument(
- 'paths',
- metavar='path',
- nargs='+',
- help='track or folder containing tracks',
- )
- argparser.add_argument(
- '--path-ignore-regex',
- default=symuid.Track.PATH_DEFAULT_IGNORE_REGEX,
- nargs=1,
- metavar='pattern',
- dest='path_ignore_regex',
- type=re.compile,
- help='(default: %(default)s)',
- )
- argparser.add_argument(
- '--show-ignored',
- action='store_true',
- )
- args = argparser.parse_args()
- tracks = _walk_tracks(
- paths=args.paths,
- path_ignore_regex=args.path_ignore_regex,
- ignored_cb=lambda p: args.show_ignored and _log_path(p, 'ignored'),
- unsupported_cb=lambda p, e:
- _log_path_error(p, 'unsupported type, skipped'),
- )
- sync(tracks=tracks,
- play_count_added_cb=lambda track, tag:
- _log_path(track.path, 'added play count tag {!r}'.format(tag)))
|