123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- import argparse
- import collections
- import os
- import re
- import sys
- import typing
- import symuid
- from symuid._uuid import generate_uuid4_bytes
- def _log_path(track_path, msg, stream=None):
- if not stream: # pytest capsys
- stream = sys.stdout
- stream.write("{!r}: {}\n".format(track_path, msg))
- def _log_path_error(track_path, msg):
- _log_path(track_path, msg, stream=sys.stderr)
- class _SyncPosition:
- def __init__(self):
- self._tracks = set() # type: typing.Set[symuid.Track]
- self._play_counts = set() # type: typing.Set[symuid.PlayCount]
- def add_track(self, track: symuid.Track) -> None:
- self._tracks.add(track)
- self._play_counts.update(track.get_play_counts())
- def sync(self, play_count_added_cb=None) -> None:
- for track in self._tracks:
- track_play_counts = set(track.get_play_counts())
- for play_count in self._play_counts:
- if play_count not in track_play_counts:
- track.register_play_count(
- play_count, tag_set_cb=play_count_added_cb
- )
- def __repr__(self) -> str:
- return repr(vars(self))
- def sync(tracks: typing.Iterator[symuid.Track], play_count_added_cb=None):
- sync_positions = collections.defaultdict(_SyncPosition)
- for track in tracks:
- if track.get_uuid() is None:
- track.assign_uuid(generate_uuid4_bytes())
- _log_path(track.path, "assigned uuid {!r}".format(track.get_uuid()))
- sync_positions[track.get_uuid()].add_track(track)
- for sync_position in sync_positions.values():
- sync_position.sync(play_count_added_cb=play_count_added_cb)
- def _walk_tracks(
- paths: typing.List[str],
- path_ignore_regex=None,
- ignored_cb=None,
- unsupported_cb=None,
- ) -> typing.Iterator[symuid.Track]:
- for path in paths:
- if os.path.isdir(path):
- for track in symuid.Track.walk(
- root_path=path,
- path_ignore_regex=path_ignore_regex,
- ignored_cb=ignored_cb,
- unsupported_cb=unsupported_cb,
- ):
- yield track
- else:
- yield symuid.Track(path)
- def _main():
- argparser = argparse.ArgumentParser(description=None)
- argparser.add_argument(
- "paths",
- metavar="path",
- nargs="+",
- help="track or folder containing tracks",
- )
- argparser.add_argument(
- "--path-ignore-regex",
- default=symuid.Track.PATH_DEFAULT_IGNORE_REGEX,
- nargs=1,
- metavar="pattern",
- dest="path_ignore_regex",
- type=re.compile,
- help="(default: %(default)s)",
- )
- argparser.add_argument("--show-ignored", action="store_true")
- args = argparser.parse_args()
- tracks = _walk_tracks(
- paths=args.paths,
- path_ignore_regex=args.path_ignore_regex,
- ignored_cb=lambda p: args.show_ignored and _log_path(p, "ignored"),
- unsupported_cb=lambda p, e: _log_path_error(p, "unsupported type, skipped"),
- )
- sync(
- tracks=tracks,
- play_count_added_cb=lambda track, tag: _log_path(
- track.path, "added play count tag {!r}".format(tag)
- ),
- )
|