Browse Source

symuid-sync: sync play count tags

Fabian Peter Hammerle 4 years ago
parent
commit
9f9a9c3d3e
7 changed files with 142 additions and 33 deletions
  1. 7 7
      symuid/__init__.py
  2. 4 0
      symuid/_datetime.py
  3. 7 0
      symuid/_uuid.py
  4. 33 6
      symuid/sync.py
  5. 10 1
      tests/test_datetime.py
  6. 0 15
      tests/test_functions.py
  7. 81 4
      tests/test_sync.py

+ 7 - 7
symuid/__init__.py

@@ -1,16 +1,12 @@
 import datetime as dt
 import os
 import re
+import typing
 
 import mutagen
 
 from symuid import _tag_interface
-from symuid._datetime import datetime_utc_now
-
-
-def _timestamp_to_utc_dt(ts_sec):
-    return dt.datetime.utcfromtimestamp(ts_sec) \
-        .replace(tzinfo=dt.timezone.utc)
+from symuid._datetime import datetime_utc_now, unix_epoch_time_to_datetime_utc
 
 
 class PlayCount:
@@ -95,10 +91,14 @@ class Track:
             yield PlayCount(
                 player=player,
                 library_id=library_id,
-                register_dt=_timestamp_to_utc_dt(int(register_ts_dec)),
+                register_dt=unix_epoch_time_to_datetime_utc(
+                    int(register_ts_dec)),
                 count=count,
             )
 
+    def get_play_counts(self) -> typing.Iterator[PlayCount]:
+        return self._get_play_counts()
+
     def _get_latest_play_counts(self, player=None, library_id=None):
         latest = {}
         for count in self._get_play_counts(player, library_id):

+ 4 - 0
symuid/_datetime.py

@@ -1,5 +1,9 @@
 import datetime
 
 
+def unix_epoch_time_to_datetime_utc(ts_sec: int) -> datetime.datetime:
+    return datetime.datetime.utcfromtimestamp(ts_sec) \
+        .replace(tzinfo=datetime.timezone.utc)
+
 def datetime_utc_now() -> datetime.datetime:
     return datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)

+ 7 - 0
symuid/_uuid.py

@@ -1,3 +1,6 @@
+import subprocess
+
+
 def uuid_str_to_int(uuid_str: str) -> int:
     """
     >>> uuid_str_to_int('613ea4ac-a4cf-4026-8e99-1904b2bb5cd0')
@@ -58,3 +61,7 @@ def uuid_bytes_to_str(uuid_bytes: bytes) -> str:
     'c064bf40-4fd1-50e1-a2f7-2aefa4593f67'
     """
     return uuid_int_to_str(uuid_bytes_to_int(uuid_bytes))
+
+
+def generate_uuid4_bytes() -> bytes:
+    return subprocess.check_output(['uuid', '-v', '4', '-F', 'BIN']).strip()

+ 33 - 6
symuid/sync.py

@@ -1,13 +1,11 @@
 import argparse
+import collections
 import re
-import subprocess
 import sys
+import typing
 
 import symuid
-
-
-def _generate_uuid():
-    return subprocess.check_output(['uuid', '-v', '4', '-F', 'BIN']).strip()
+from symuid._uuid import generate_uuid4_bytes
 
 
 def _log_path(track_path, msg, stream=sys.stdout):
@@ -18,7 +16,29 @@ def _log_path_error(track_path, msg):
     _log_path(track_path, msg, stream=sys.stderr)
 
 
+class _SyncPosition:
+
+    def __init__(self):
+        self._tracks = set() # type: typing.Set[symuid.Track]
+        self._play_counts = set() # type: typing.Set[symuid.PlayCount]
+
+    def add_track(self, track: symuid.Track) -> None:
+        self._tracks.add(track)
+        self._play_counts.update(track.get_play_counts())
+
+    @property
+    def play_counts(self) -> typing.Set[symuid.PlayCount]:
+        return self._play_counts
+
+    @property
+    def tracks(self) -> typing.Set[symuid.Track]:
+        return self._tracks
+
+    def __repr__(self) -> str:
+        return repr(vars(self))
+
 def sync(path, path_ignore_regex, show_ignored=False):
+    sync_positions = collections.defaultdict(_SyncPosition)
     for track in symuid.Track.walk(
             root_path=path,
             path_ignore_regex=path_ignore_regex,
@@ -27,9 +47,16 @@ def sync(path, path_ignore_regex, show_ignored=False):
                 _log_path_error(p, 'unsupported type, skipped'),
     ):
         if track.get_uuid() is None:
-            track.assign_uuid(_generate_uuid())
+            track.assign_uuid(generate_uuid4_bytes())
             _log_path(track.path, 'assigned uuid {!r}'.format(
                 track.get_uuid()))
+        sync_positions[track.get_uuid()].add_track(track)
+    for sync_position in sync_positions.values():
+        for track in sync_position.tracks:
+            track_play_counts = set(track.get_play_counts())
+            for play_count in sync_position.play_counts:
+                if play_count not in track_play_counts:
+                    track.register_play_count(play_count)
 
 
 def _init_argparser():

+ 10 - 1
tests/test_datetime.py

@@ -1,10 +1,19 @@
 import datetime
 
+import pytest
 import pytz
 
-from symuid._datetime import datetime_utc_now
+from symuid._datetime import datetime_utc_now, unix_epoch_time_to_datetime_utc
 
 
 def test_datetime_utc_now():
     pytz_dt = datetime.datetime.now(pytz.timezone('UTC'))
     assert abs((datetime_utc_now() - pytz_dt).total_seconds()) < 5
+
+
+@pytest.mark.parametrize(('ts_sec', 'expected_dt'), [
+    (1528795204,
+     datetime.datetime(2018, 6, 12, 9, 20, 4, tzinfo=datetime.timezone.utc)),
+])
+def test_unix_epoch_time_to_datetime_utc(ts_sec, expected_dt):
+    assert expected_dt == unix_epoch_time_to_datetime_utc(ts_sec)

+ 0 - 15
tests/test_functions.py

@@ -1,15 +0,0 @@
-import datetime
-
-import pytest
-
-import symuid
-
-# pylint: disable=protected-access
-
-
-@pytest.mark.parametrize(('ts_sec', 'expected_dt'), [
-    (1528795204,
-     datetime.datetime(2018, 6, 12, 9, 20, 4, tzinfo=datetime.timezone.utc)),
-])
-def test__timestamp_to_utc_dt(ts_sec, expected_dt):
-    assert expected_dt == symuid._timestamp_to_utc_dt(ts_sec)

+ 81 - 4
tests/test_sync.py

@@ -1,21 +1,25 @@
 import os
 import re
+import shutil
 import unittest.mock
 
-from symuid import Track
+import pytest
+from symuid import PlayCount, Track
+from symuid._datetime import unix_epoch_time_to_datetime_utc
+from symuid._uuid import generate_uuid4_bytes
 from symuid.sync import _main, sync
 
 DUMMY_PATH_IGNORE_REGEX = re.compile(r'\.jpg$')
 
 
-def test_sync(empty_ogg_opus_path):
+def test_add_uuid(empty_ogg_opus_path):
     assert Track(empty_ogg_opus_path).get_uuid() is None
     sync(os.path.dirname(empty_ogg_opus_path),
          path_ignore_regex=DUMMY_PATH_IGNORE_REGEX)
     assert Track(empty_ogg_opus_path).get_uuid() is not None
 
 
-def test_sync_idempotent(empty_ogg_opus_path):
+def test_add_uuid_idempotent(empty_ogg_opus_path):
     sync(os.path.dirname(empty_ogg_opus_path),
          path_ignore_regex=DUMMY_PATH_IGNORE_REGEX)
     uuid = Track(empty_ogg_opus_path).get_uuid()
@@ -24,8 +28,81 @@ def test_sync_idempotent(empty_ogg_opus_path):
     assert Track(empty_ogg_opus_path).get_uuid() == uuid
 
 
-def test_main(empty_ogg_opus_path):
+def test_add_uuid_main(empty_ogg_opus_path):
     assert Track(empty_ogg_opus_path).get_uuid() is None
     with unittest.mock.patch('sys.argv', ['', os.path.dirname(empty_ogg_opus_path)]):
         _main()
     assert Track(empty_ogg_opus_path).get_uuid() is not None
+
+
+def test_sync_play_count(tmpdir, tracks_dir_path):
+    shutil.copyfile(
+        src=os.path.join(tracks_dir_path, 'id3v2.4-empty.mp3'),
+        dst=tmpdir.join('a1.mp3'),
+    )
+    shutil.copyfile(
+        src=os.path.join(tracks_dir_path, 'ogg-vorbis-empty.ogg'),
+        dst=tmpdir.join('a2.ogg'),
+    )
+    shutil.copyfile(
+        src=os.path.join(tracks_dir_path, 'ogg-opus-empty.opus'),
+        dst=tmpdir.join('a3.opus'),
+    )
+    shutil.copyfile(
+        src=os.path.join(tracks_dir_path, 'id3v2.4-empty.mp3'),
+        dst=tmpdir.join('b.mp3'),
+    )
+    uuid_a = generate_uuid4_bytes()
+    uuid_b = generate_uuid4_bytes()
+    track_a1 = Track(tmpdir.join('a1.mp3'))
+    track_a1.assign_uuid(uuid_a)
+    track_a2 = Track(tmpdir.join('a2.ogg'))
+    track_a2.assign_uuid(uuid_a)
+    track_a3 = Track(tmpdir.join('a3.opus'))
+    track_a3.assign_uuid(uuid_a)
+    track_b = Track(tmpdir.join('b.mp3'))
+    track_b.assign_uuid(uuid_b)
+    track_a1.register_play_count(PlayCount(
+        player='cmus',
+        library_id='lib1',
+        register_dt=unix_epoch_time_to_datetime_utc(0),
+        count=21,
+    ))
+    track_a1.register_play_count(PlayCount(
+        player='cmus',
+        library_id='lib1',
+        register_dt=unix_epoch_time_to_datetime_utc(1),
+        count=22,
+    ))
+    track_a2.register_play_count(PlayCount(
+        player='cmus',
+        library_id='lib1',
+        register_dt=unix_epoch_time_to_datetime_utc(0),
+        count=21,
+    ))
+    track_a2.register_play_count(PlayCount(
+        player='cmus',
+        library_id='lib2',
+        register_dt=unix_epoch_time_to_datetime_utc(2),
+        count=7,
+    ))
+    track_b.register_play_count(PlayCount(
+        player='cmus',
+        library_id='lib2',
+        register_dt=unix_epoch_time_to_datetime_utc(3),
+        count=4,
+    ))
+    sync(tmpdir, path_ignore_regex=DUMMY_PATH_IGNORE_REGEX)
+    play_counts_a1 = set(Track(tmpdir.join('a1.mp3')).get_play_counts())
+    play_counts_a2 = set(Track(tmpdir.join('a2.ogg')).get_play_counts())
+    play_counts_a3 = set(Track(tmpdir.join('a3.opus')).get_play_counts())
+    play_counts_b = set(Track(tmpdir.join('b.mp3')).get_play_counts())
+    assert len(play_counts_a1) == 3
+    assert len(play_counts_a1) == len(play_counts_a2)
+    assert len(play_counts_a1) == len(play_counts_a3)
+    assert play_counts_a1 == play_counts_a2
+    assert {pc.register_dt.timestamp() for pc in play_counts_a1} \
+        == pytest.approx({0, 1, 2})
+    assert len(play_counts_b) == 1
+    assert {pc.register_dt.timestamp() for pc in play_counts_b} \
+        == pytest.approx({3})