|
@@ -0,0 +1,153 @@
|
|
|
|
+import pytest
|
|
|
|
+
|
|
|
|
+import symuid
|
|
|
|
+
|
|
|
|
+import datetime as dt
|
|
|
|
+import mutagen
|
|
|
|
+import os
|
|
|
|
+import shutil
|
|
|
|
+
|
|
|
|
+# TODO test aac / m4a itunes tags
|
|
|
|
+
|
|
|
|
+TRACKS_DIR_PATH = os.path.join(os.path.dirname(__file__), 'tracks')
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def utc_dt(h=0):
|
|
|
|
+ return dt.datetime(2018, 9, 26, h, tzinfo=dt.timezone.utc)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@pytest.fixture
|
|
|
|
+def empty_id3_track(tmpdir):
|
|
|
|
+ p = tmpdir.join('empty.mp3').strpath
|
|
|
|
+ shutil.copyfile(
|
|
|
|
+ src=os.path.join(TRACKS_DIR_PATH, 'id3v2.4-empty.mp3'),
|
|
|
|
+ dst=p,
|
|
|
|
+ )
|
|
|
|
+ return symuid.Track(p)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@pytest.mark.parametrize('path', [
|
|
|
|
+ os.path.join(TRACKS_DIR_PATH, 'id3v2.4-typical.mp3'),
|
|
|
|
+ os.path.join(TRACKS_DIR_PATH, 'id3v2.4-empty.mp3'),
|
|
|
|
+])
|
|
|
|
+def test_init(path):
|
|
|
|
+ symuid.Track(path)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@pytest.mark.parametrize(('player', 'library_id', 'register_dt', 'count'), [
|
|
|
|
+ ('pytest', 'lib', dt.datetime(2018, 9, 26, 21, 16, 20, tzinfo=dt.timezone.utc), 7),
|
|
|
|
+])
|
|
|
|
+def test_register_play_count(empty_id3_track, player, library_id, register_dt, count):
|
|
|
|
+ empty_id3_track.register_play_count(
|
|
|
|
+ player=player,
|
|
|
|
+ library_id=library_id,
|
|
|
|
+ register_dt=register_dt,
|
|
|
|
+ play_count=count,
|
|
|
|
+ )
|
|
|
|
+ tags = mutagen.File(empty_id3_track.path).tags
|
|
|
|
+ assert len(tags) == 1
|
|
|
|
+ expected_desc = 'symuid:pcnt:{}:{}:{}'.format(
|
|
|
|
+ player, library_id, int(register_dt.timestamp()),
|
|
|
|
+ )
|
|
|
|
+ tag = tags['TXXX:' + expected_desc]
|
|
|
|
+ assert tag.desc == expected_desc
|
|
|
|
+ assert tag.text == [str(count)]
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@pytest.mark.parametrize(('player', 'library_id', 'register_dt', 'count'), [
|
|
|
|
+ ('pytest', 'lib', utc_dt(), 7),
|
|
|
|
+])
|
|
|
|
+def test_register_play_count(empty_id3_track, player, library_id, register_dt, count):
|
|
|
|
+ empty_id3_track.register_play_count(
|
|
|
|
+ player=player,
|
|
|
|
+ library_id=library_id,
|
|
|
|
+ register_dt=register_dt,
|
|
|
|
+ play_count=count,
|
|
|
|
+ )
|
|
|
|
+ tags = mutagen.File(empty_id3_track.path).tags
|
|
|
|
+ assert len(tags) == 1
|
|
|
|
+ expected_desc = 'symuid:pcnt:{}:{}:{}'.format(
|
|
|
|
+ player, library_id, int(register_dt.timestamp()),
|
|
|
|
+ )
|
|
|
|
+ tag = tags['TXXX:' + expected_desc]
|
|
|
|
+ assert tag.desc == expected_desc
|
|
|
|
+ assert tag.text == [str(count)]
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@pytest.mark.parametrize(('expected_counts'), [
|
|
|
|
+ [],
|
|
|
|
+ [symuid.PlayCount('player', 'lib', utc_dt(), 3)],
|
|
|
|
+ [symuid.PlayCount('a', '0', utc_dt(0), 1),
|
|
|
|
+ symuid.PlayCount('b', '1', utc_dt(1), 2)],
|
|
|
|
+ [symuid.PlayCount('a', '0', utc_dt(0), 1),
|
|
|
|
+ symuid.PlayCount('a', '2', utc_dt(1), 2),
|
|
|
|
+ symuid.PlayCount('b', '3', utc_dt(3), 3)],
|
|
|
|
+])
|
|
|
|
+def test__get_play_counts_all(empty_id3_track, expected_counts):
|
|
|
|
+ for pc in expected_counts:
|
|
|
|
+ empty_id3_track.register_play_count(
|
|
|
|
+ player=pc.player,
|
|
|
|
+ library_id=pc.library_id,
|
|
|
|
+ register_dt=pc.register_dt,
|
|
|
|
+ play_count=pc.count,
|
|
|
|
+ )
|
|
|
|
+ assert expected_counts == list(empty_id3_track._get_play_counts())
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def test__get_play_counts_filtered(empty_id3_track):
|
|
|
|
+ counts = [symuid.PlayCount('a', '0', utc_dt(0), 1),
|
|
|
|
+ symuid.PlayCount('a', '0', utc_dt(1), 2),
|
|
|
|
+ symuid.PlayCount('a', '1', utc_dt(0), 3),
|
|
|
|
+ symuid.PlayCount('b', '2', utc_dt(1), 4)]
|
|
|
|
+ for pc in counts:
|
|
|
|
+ empty_id3_track.register_play_count(
|
|
|
|
+ player=pc.player,
|
|
|
|
+ library_id=pc.library_id,
|
|
|
|
+ register_dt=pc.register_dt,
|
|
|
|
+ play_count=pc.count,
|
|
|
|
+ )
|
|
|
|
+ assert set(empty_id3_track._get_play_counts(player='a')) \
|
|
|
|
+ == set(filter(lambda pc: pc.player == 'a', counts))
|
|
|
|
+ assert set(empty_id3_track._get_play_counts(player='b')) \
|
|
|
|
+ == set(filter(lambda pc: pc.player == 'b', counts))
|
|
|
|
+ assert set(empty_id3_track._get_play_counts(player='a', library_id='0')) \
|
|
|
|
+ == set(filter(lambda pc: pc.library_id == '0', counts))
|
|
|
|
+ assert set(empty_id3_track._get_play_counts(player='a', library_id='2')) \
|
|
|
|
+ == set()
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def test__get_latest_play_counts(empty_id3_track):
|
|
|
|
+ counts = [symuid.PlayCount('a', '0', utc_dt(0), 1),
|
|
|
|
+ symuid.PlayCount('a', '0', utc_dt(1), 2),
|
|
|
|
+ symuid.PlayCount('a', '1', utc_dt(0), 3),
|
|
|
|
+ symuid.PlayCount('a', '1', utc_dt(2), 4),
|
|
|
|
+ symuid.PlayCount('b', '2', utc_dt(3), 5)]
|
|
|
|
+ for pc in counts:
|
|
|
|
+ empty_id3_track.register_play_count(
|
|
|
|
+ player=pc.player,
|
|
|
|
+ library_id=pc.library_id,
|
|
|
|
+ register_dt=pc.register_dt,
|
|
|
|
+ play_count=pc.count,
|
|
|
|
+ )
|
|
|
|
+ assert set(empty_id3_track._get_latest_play_counts()) \
|
|
|
|
+ == set([counts[1], counts[3], counts[4]])
|
|
|
|
+ assert set(empty_id3_track._get_latest_play_counts(player='a')) \
|
|
|
|
+ == set([counts[1], counts[3]])
|
|
|
|
+ assert set(empty_id3_track._get_latest_play_counts(player='a', library_id='0')) \
|
|
|
|
+ == set([counts[1]])
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def test_get_play_count_sum(empty_id3_track):
|
|
|
|
+ counts = [symuid.PlayCount('a', '0', utc_dt(0), 1),
|
|
|
|
+ symuid.PlayCount('a', '0', utc_dt(1), 2),
|
|
|
|
+ symuid.PlayCount('a', '1', utc_dt(0), 3),
|
|
|
|
+ symuid.PlayCount('a', '1', utc_dt(2), 4),
|
|
|
|
+ symuid.PlayCount('b', '2', utc_dt(3), 5)]
|
|
|
|
+ for pc in counts:
|
|
|
|
+ empty_id3_track.register_play_count(
|
|
|
|
+ player=pc.player,
|
|
|
|
+ library_id=pc.library_id,
|
|
|
|
+ register_dt=pc.register_dt,
|
|
|
|
+ play_count=pc.count,
|
|
|
|
+ )
|
|
|
|
+ assert 2 + 4 + 5 == empty_id3_track.get_play_count_sum()
|