import datetime as dt import os import re import shlex import stat import subprocess import unittest.mock import mutagen import pytest import symuid from symuid._datetime import datetime_utc_now # pylint: disable=redefined-outer-name,protected-access # TODO test aac / m4a itunes tags TRACKS_DIR_PATH = os.path.join(os.path.dirname(__file__), "tracks") def utc_dt(hour=0): return dt.datetime(2018, 9, 26, hour, tzinfo=dt.timezone.utc) @pytest.fixture def empty_id3_track(empty_id3_path): return symuid.Track(empty_id3_path) @pytest.mark.parametrize( "track_name", [ "id3v2.4-typical.mp3", "id3v2.4-empty.mp3", "ogg-opus-typical.opus", "ogg-vorbis-typical.ogg", ], ) def test_init(tracks_dir_path, track_name): symuid.Track(os.path.join(tracks_dir_path, track_name)) @pytest.mark.parametrize( ("track_name", "expected_comment"), [ ("id3v2.4-typical.mp3", "some comment"), ("id3v2.4-empty.mp3", None), ("ogg-opus-typical.opus", "some comment"), ("ogg-vorbis-typical.ogg", "some further information"), ], ) def test_get_comment(tracks_dir_path, track_name, expected_comment): track = symuid.Track(os.path.join(tracks_dir_path, track_name)) assert expected_comment == track.comment def test_set_comment(empty_id3_track): assert empty_id3_track.comment is None empty_id3_track.comment = "note" assert empty_id3_track.comment == "note" empty_id3_track.comment += "s" assert empty_id3_track.comment == "notes" assert symuid.Track(empty_id3_track.path).comment == "notes" @pytest.mark.parametrize( ("play_count"), [ symuid.PlayCount("pytest", "lib", utc_dt(), 7), ], ) def test_register_play_count_id3(empty_id3_track, play_count): empty_id3_track.register_play_count(play_count) tags = mutagen.File(empty_id3_track.path).tags assert len(tags) == 1 expected_desc = "symuid:pcnt:{}:{}:{}".format( play_count.player, play_count.library_id, int(play_count.register_dt.timestamp()), ) tag = tags["TXXX:" + expected_desc] assert tag.desc == expected_desc assert tag.text == [str(play_count.count)] @pytest.mark.parametrize( ("play_count"), [ symuid.PlayCount("pytest", "lib", utc_dt(), 7), ], ) def test_register_play_count_opus(empty_ogg_opus_path, play_count): track = symuid.Track(empty_ogg_opus_path) track.register_play_count(play_count) tags = mutagen.File(track.path).tags assert len(tags) == 1 expected_desc = "symuid:pcnt:{}:{}:{}".format( play_count.player, play_count.library_id, int(play_count.register_dt.timestamp()), ) assert tags[expected_desc] == [str(play_count.count)] @pytest.mark.parametrize( ("expected_counts"), [ [], [symuid.PlayCount("player", "lib", utc_dt(), 3)], [ symuid.PlayCount("a", "0", utc_dt(0), 1), symuid.PlayCount("b", "1", utc_dt(1), 2), ], [ symuid.PlayCount("a", "0", utc_dt(0), 1), symuid.PlayCount("a", "2", utc_dt(1), 2), symuid.PlayCount("b", "3", utc_dt(3), 3), ], ], ) def test__get_play_counts_all(empty_id3_track, expected_counts): for play_count in expected_counts: empty_id3_track.register_play_count(play_count) assert expected_counts == list(empty_id3_track._get_play_counts()) def test__get_play_counts_filtered(empty_id3_track): counts = [ symuid.PlayCount("a", "0", utc_dt(0), 1), symuid.PlayCount("a", "0", utc_dt(1), 2), symuid.PlayCount("a", "1", utc_dt(0), 3), symuid.PlayCount("b", "2", utc_dt(1), 4), ] for play_count in counts: empty_id3_track.register_play_count(play_count) assert set(empty_id3_track._get_play_counts(player="a")) == set( filter(lambda pc: pc.player == "a", counts) ) assert set(empty_id3_track._get_play_counts(player="b")) == set( filter(lambda pc: pc.player == "b", counts) ) assert set(empty_id3_track._get_play_counts(player="a", library_id="0")) == set( filter(lambda pc: pc.library_id == "0", counts) ) assert set(empty_id3_track._get_play_counts(player="a", library_id="2")) == set() def test__get_latest_play_counts(empty_id3_track): counts = [ symuid.PlayCount("a", "0", utc_dt(0), 1), symuid.PlayCount("a", "0", utc_dt(1), 2), symuid.PlayCount("a", "1", utc_dt(0), 3), symuid.PlayCount("a", "1", utc_dt(2), 4), symuid.PlayCount("b", "2", utc_dt(3), 5), ] for play_count in counts: empty_id3_track.register_play_count(play_count) assert set(empty_id3_track._get_latest_play_counts()) == set( [counts[1], counts[3], counts[4]] ) assert set(empty_id3_track._get_latest_play_counts(player="a")) == set( [counts[1], counts[3]] ) assert set( empty_id3_track._get_latest_play_counts(player="a", library_id="0") ) == set([counts[1]]) def test_get_play_count_sum(empty_id3_track): counts = [ symuid.PlayCount("a", "0", utc_dt(0), 1), symuid.PlayCount("a", "0", utc_dt(1), 2), symuid.PlayCount("a", "1", utc_dt(0), 3), symuid.PlayCount("a", "1", utc_dt(2), 4), symuid.PlayCount("b", "2", utc_dt(3), 5), ] for play_count in counts: empty_id3_track.register_play_count(play_count) assert 2 + 4 + 5 == empty_id3_track.get_play_count_sum() def test_increase_play_count(empty_id3_track): init_count = symuid.PlayCount("a", "0", utc_dt(0), 3) empty_id3_track.register_play_count(init_count) assert empty_id3_track.get_play_count_sum() == 3 empty_id3_track.increase_play_count("a", "0") assert empty_id3_track.get_play_count_sum() == 4 counts = set(empty_id3_track._get_play_counts()) assert len(counts) == 2 counts.remove(init_count) new_count = counts.pop() assert new_count.player == "a" assert new_count.library_id == "0" assert abs(new_count.register_dt - datetime_utc_now()).total_seconds() < 5 assert new_count.count == 4 def test_increase_play_count_init(empty_id3_track): empty_id3_track.increase_play_count("a", "0") assert empty_id3_track.get_play_count_sum() == 1 (count,) = empty_id3_track._get_play_counts() assert count.player == "a" assert count.library_id == "0" assert abs(count.register_dt - datetime_utc_now()).total_seconds() < 5 assert count.count == 1 def test_increase_play_count_others(empty_id3_track): empty_id3_track.register_play_count( symuid.PlayCount("a", "0", utc_dt(0), 1), ) empty_id3_track.register_play_count( symuid.PlayCount("a", "1", utc_dt(0), 2), ) empty_id3_track.register_play_count( symuid.PlayCount("b", "0", utc_dt(0), 3), ) assert empty_id3_track.get_play_count_sum() == 6 empty_id3_track.increase_play_count("a", "1") assert empty_id3_track.get_play_count_sum() == 7 assert len(list(empty_id3_track._get_play_counts(player="b"))) == 1 assert len(list(empty_id3_track._get_play_counts(player="a", library_id="0"))) == 1 assert len(list(empty_id3_track._get_play_counts(player="a", library_id="1"))) == 2 def test_walk(tracks_dir_path): tracks = symuid.Track.walk( tracks_dir_path, path_ignore_regex=re.compile(r"typical") ) track_names = set(os.path.basename(t.path) for t in tracks) assert track_names == { "id3v2.4-empty.mp3", "mp4-aac-empty.m4a", "ogg-opus-empty.opus", "ogg-vorbis-empty.ogg", } @pytest.mark.parametrize( ("status_stdout", "expected_path"), [ ( b"status playing\n" b"file /some/path/possibly including/spaces.mp3\n" b"duration 210\n" b"position 42\n" b"tag artist some artist\n" b"tag album some album\n" b"tag title some title\n" b"tag date 2012-03-04\n" b"tag bpm 0\n" b"set aaa_mode all\n" b"set continue true\n" b"set play_library true\n" b"set play_sorted false\n" b"set replaygain disabled\n" b"set replaygain_limit true\n" b"set replaygain_preamp 0.000000\n" b"set repeat false\n" b"set repeat_current false\n" b"set shuffle true\n" b"set softvol false\n" b"set vol_left 42\n" b"set vol_right 42\n", "/some/path/possibly including/spaces.mp3", ), ], ) def test_get_active_cmus(tmpdir, status_stdout, expected_path): status_path = tmpdir.join("status") with status_path.open("wb") as status_file: status_file.write(status_stdout) cmd_mock_path = tmpdir.join("cmus-remote-mock") with cmd_mock_path.open("w") as cmd_file: cmd_file.write("#!/bin/sh\n") cmd_file.write('[ "$@" = "-Q" ] || exit 1\n') cmd_file.write("cat {}\n".format(shlex.quote(status_path.strpath))) cmd_mock_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) with unittest.mock.patch("symuid.Track.__init__", return_value=None) as init_mock: with unittest.mock.patch( "symuid.Track._CMUS_REMOTE_COMMAND", cmd_mock_path.strpath ): assert isinstance(symuid.Track.get_active_cmus(), symuid.Track) init_mock.assert_called_once() init_args, init_kwargs = init_mock.call_args assert init_args == tuple() assert init_kwargs == {"path": expected_path} def test_get_active_cmus_none(tmpdir): status_path = tmpdir.join("status") with status_path.open("wb") as status_file: status_file.write( b"status stopped\n" b"set aaa_mode all\n" b"set continue true\n" b"set play_library true\n" b"set play_sorted false\n" b"set replaygain disabled\n" b"set replaygain_limit true\n" b"set replaygain_preamp 0.000000\n" b"set repeat false\n" b"set repeat_current false\n" b"set shuffle true\n" b"set softvol false\n" b"set vol_left 0\n" b"set vol_right 0\n" ) cmd_mock_path = tmpdir.join("cmus-remote-mock") with cmd_mock_path.open("w") as cmd_file: cmd_file.write("#!/bin/sh\n") cmd_file.write('[ "$@" = "-Q" ] || exit 1\n') cmd_file.write("cat {}\n".format(shlex.quote(status_path.strpath))) cmd_mock_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) with unittest.mock.patch( "symuid.Track._CMUS_REMOTE_COMMAND", cmd_mock_path.strpath ): assert symuid.Track.get_active_cmus() is None def test_get_active_cmus_not_running(tmpdir): cmd_mock_path = tmpdir.join("cmus-remote-mock") with cmd_mock_path.open("w") as cmd_file: cmd_file.write("#!/bin/sh\n") cmd_file.write("echo cmus-remote: cmus is not running >> /dev/stderr\n") cmd_file.write("exit 1\n") cmd_mock_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) with unittest.mock.patch( "symuid.Track._CMUS_REMOTE_COMMAND", cmd_mock_path.strpath ): assert symuid.Track.get_active_cmus() is None def test_get_active_cmus_unexpected_error(tmpdir): cmd_mock_path = tmpdir.join("cmus-remote-mock") with cmd_mock_path.open("w") as cmd_file: cmd_file.write("#!/bin/sh\n") cmd_file.write("exit 42\n") cmd_mock_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) with unittest.mock.patch( "symuid.Track._CMUS_REMOTE_COMMAND", cmd_mock_path.strpath ): with pytest.raises(subprocess.CalledProcessError): symuid.Track.get_active_cmus()