test_track.py 12 KB


  1. import datetime as dt
  2. import os
  3. import re
  4. import shlex
  5. import stat
  6. import subprocess
  7. import unittest.mock
  8. import mutagen
  9. import pytest
  10. import symuid
  11. from symuid._datetime import datetime_utc_now
  12. # pylint: disable=redefined-outer-name,protected-access
  13. # TODO test aac / m4a itunes tags
  14. TRACKS_DIR_PATH = os.path.join(os.path.dirname(__file__), "tracks")
  15. def utc_dt(hour=0):
  16. return dt.datetime(2018, 9, 26, hour, tzinfo=dt.timezone.utc)
  17. @pytest.fixture
  18. def empty_id3_track(empty_id3_path):
  19. return symuid.Track(empty_id3_path)
  20. @pytest.mark.parametrize(
  21. "track_name",
  22. [
  23. "id3v2.4-typical.mp3",
  24. "id3v2.4-empty.mp3",
  25. "ogg-opus-typical.opus",
  26. "ogg-vorbis-typical.ogg",
  27. ],
  28. )
  29. def test_init(tracks_dir_path, track_name):
  30. symuid.Track(os.path.join(tracks_dir_path, track_name))
  31. @pytest.mark.parametrize(
  32. ("track_name", "expected_comment"),
  33. [
  34. ("id3v2.4-typical.mp3", "some comment"),
  35. ("id3v2.4-empty.mp3", None),
  36. ("ogg-opus-typical.opus", "some comment"),
  37. ("ogg-vorbis-typical.ogg", "some further information"),
  38. ],
  39. )
  40. def test_get_comment(tracks_dir_path, track_name, expected_comment):
  41. track = symuid.Track(os.path.join(tracks_dir_path, track_name))
  42. assert expected_comment == track.comment
  43. def test_set_comment(empty_id3_track):
  44. assert empty_id3_track.comment is None
  45. empty_id3_track.comment = "note"
  46. assert empty_id3_track.comment == "note"
  47. empty_id3_track.comment += "s"
  48. assert empty_id3_track.comment == "notes"
  49. assert symuid.Track(empty_id3_track.path).comment == "notes"
  50. @pytest.mark.parametrize(
  51. ("play_count"),
  52. [
  53. symuid.PlayCount("pytest", "lib", utc_dt(), 7),
  54. ],
  55. )
  56. def test_register_play_count_id3(empty_id3_track, play_count):
  57. empty_id3_track.register_play_count(play_count)
  58. tags = mutagen.File(empty_id3_track.path).tags
  59. assert len(tags) == 1
  60. expected_desc = "symuid:pcnt:{}:{}:{}".format(
  61. play_count.player,
  62. play_count.library_id,
  63. int(play_count.register_dt.timestamp()),
  64. )
  65. tag = tags["TXXX:" + expected_desc]
  66. assert tag.desc == expected_desc
  67. assert tag.text == [str(play_count.count)]
  68. @pytest.mark.parametrize(
  69. ("play_count"),
  70. [
  71. symuid.PlayCount("pytest", "lib", utc_dt(), 7),
  72. ],
  73. )
  74. def test_register_play_count_opus(empty_ogg_opus_path, play_count):
  75. track = symuid.Track(empty_ogg_opus_path)
  76. track.register_play_count(play_count)
  77. tags = mutagen.File(track.path).tags
  78. assert len(tags) == 1
  79. expected_desc = "symuid:pcnt:{}:{}:{}".format(
  80. play_count.player,
  81. play_count.library_id,
  82. int(play_count.register_dt.timestamp()),
  83. )
  84. assert tags[expected_desc] == [str(play_count.count)]
  85. @pytest.mark.parametrize(
  86. ("expected_counts"),
  87. [
  88. [],
  89. [symuid.PlayCount("player", "lib", utc_dt(), 3)],
  90. [
  91. symuid.PlayCount("a", "0", utc_dt(0), 1),
  92. symuid.PlayCount("b", "1", utc_dt(1), 2),
  93. ],
  94. [
  95. symuid.PlayCount("a", "0", utc_dt(0), 1),
  96. symuid.PlayCount("a", "2", utc_dt(1), 2),
  97. symuid.PlayCount("b", "3", utc_dt(3), 3),
  98. ],
  99. ],
  100. )
  101. def test__get_play_counts_all(empty_id3_track, expected_counts):
  102. for play_count in expected_counts:
  103. empty_id3_track.register_play_count(play_count)
  104. assert expected_counts == list(empty_id3_track._get_play_counts())
  105. def test__get_play_counts_filtered(empty_id3_track):
  106. counts = [
  107. symuid.PlayCount("a", "0", utc_dt(0), 1),
  108. symuid.PlayCount("a", "0", utc_dt(1), 2),
  109. symuid.PlayCount("a", "1", utc_dt(0), 3),
  110. symuid.PlayCount("b", "2", utc_dt(1), 4),
  111. ]
  112. for play_count in counts:
  113. empty_id3_track.register_play_count(play_count)
  114. assert set(empty_id3_track._get_play_counts(player="a")) == set(
  115. filter(lambda pc: pc.player == "a", counts)
  116. )
  117. assert set(empty_id3_track._get_play_counts(player="b")) == set(
  118. filter(lambda pc: pc.player == "b", counts)
  119. )
  120. assert set(empty_id3_track._get_play_counts(player="a", library_id="0")) == set(
  121. filter(lambda pc: pc.library_id == "0", counts)
  122. )
  123. assert set(empty_id3_track._get_play_counts(player="a", library_id="2")) == set()
  124. def test__get_latest_play_counts(empty_id3_track):
  125. counts = [
  126. symuid.PlayCount("a", "0", utc_dt(0), 1),
  127. symuid.PlayCount("a", "0", utc_dt(1), 2),
  128. symuid.PlayCount("a", "1", utc_dt(0), 3),
  129. symuid.PlayCount("a", "1", utc_dt(2), 4),
  130. symuid.PlayCount("b", "2", utc_dt(3), 5),
  131. ]
  132. for play_count in counts:
  133. empty_id3_track.register_play_count(play_count)
  134. assert set(empty_id3_track._get_latest_play_counts()) == set(
  135. [counts[1], counts[3], counts[4]]
  136. )
  137. assert set(empty_id3_track._get_latest_play_counts(player="a")) == set(
  138. [counts[1], counts[3]]
  139. )
  140. assert set(
  141. empty_id3_track._get_latest_play_counts(player="a", library_id="0")
  142. ) == set([counts[1]])
  143. def test_get_play_count_sum(empty_id3_track):
  144. counts = [
  145. symuid.PlayCount("a", "0", utc_dt(0), 1),
  146. symuid.PlayCount("a", "0", utc_dt(1), 2),
  147. symuid.PlayCount("a", "1", utc_dt(0), 3),
  148. symuid.PlayCount("a", "1", utc_dt(2), 4),
  149. symuid.PlayCount("b", "2", utc_dt(3), 5),
  150. ]
  151. for play_count in counts:
  152. empty_id3_track.register_play_count(play_count)
  153. assert 2 + 4 + 5 == empty_id3_track.get_play_count_sum()
  154. def test_increase_play_count(empty_id3_track):
  155. init_count = symuid.PlayCount("a", "0", utc_dt(0), 3)
  156. empty_id3_track.register_play_count(init_count)
  157. assert empty_id3_track.get_play_count_sum() == 3
  158. empty_id3_track.increase_play_count("a", "0")
  159. assert empty_id3_track.get_play_count_sum() == 4
  160. counts = set(empty_id3_track._get_play_counts())
  161. assert len(counts) == 2
  162. counts.remove(init_count)
  163. new_count = counts.pop()
  164. assert new_count.player == "a"
  165. assert new_count.library_id == "0"
  166. assert abs(new_count.register_dt - datetime_utc_now()).total_seconds() < 5
  167. assert new_count.count == 4
  168. def test_increase_play_count_init(empty_id3_track):
  169. empty_id3_track.increase_play_count("a", "0")
  170. assert empty_id3_track.get_play_count_sum() == 1
  171. (count,) = empty_id3_track._get_play_counts()
  172. assert count.player == "a"
  173. assert count.library_id == "0"
  174. assert abs(count.register_dt - datetime_utc_now()).total_seconds() < 5
  175. assert count.count == 1
  176. def test_increase_play_count_others(empty_id3_track):
  177. empty_id3_track.register_play_count(
  178. symuid.PlayCount("a", "0", utc_dt(0), 1),
  179. )
  180. empty_id3_track.register_play_count(
  181. symuid.PlayCount("a", "1", utc_dt(0), 2),
  182. )
  183. empty_id3_track.register_play_count(
  184. symuid.PlayCount("b", "0", utc_dt(0), 3),
  185. )
  186. assert empty_id3_track.get_play_count_sum() == 6
  187. empty_id3_track.increase_play_count("a", "1")
  188. assert empty_id3_track.get_play_count_sum() == 7
  189. assert len(list(empty_id3_track._get_play_counts(player="b"))) == 1
  190. assert len(list(empty_id3_track._get_play_counts(player="a", library_id="0"))) == 1
  191. assert len(list(empty_id3_track._get_play_counts(player="a", library_id="1"))) == 2
  192. def test_walk(tracks_dir_path):
  193. tracks = symuid.Track.walk(
  194. tracks_dir_path, path_ignore_regex=re.compile(r"typical")
  195. )
  196. track_names = set(os.path.basename(t.path) for t in tracks)
  197. assert track_names == {
  198. "id3v2.4-empty.mp3",
  199. "mp4-aac-empty.m4a",
  200. "ogg-opus-empty.opus",
  201. "ogg-vorbis-empty.ogg",
  202. }
  203. @pytest.mark.parametrize(
  204. ("status_stdout", "expected_path"),
  205. [
  206. (
  207. b"status playing\n"
  208. b"file /some/path/possibly including/spaces.mp3\n"
  209. b"duration 210\n"
  210. b"position 42\n"
  211. b"tag artist some artist\n"
  212. b"tag album some album\n"
  213. b"tag title some title\n"
  214. b"tag date 2012-03-04\n"
  215. b"tag bpm 0\n"
  216. b"set aaa_mode all\n"
  217. b"set continue true\n"
  218. b"set play_library true\n"
  219. b"set play_sorted false\n"
  220. b"set replaygain disabled\n"
  221. b"set replaygain_limit true\n"
  222. b"set replaygain_preamp 0.000000\n"
  223. b"set repeat false\n"
  224. b"set repeat_current false\n"
  225. b"set shuffle true\n"
  226. b"set softvol false\n"
  227. b"set vol_left 42\n"
  228. b"set vol_right 42\n",
  229. "/some/path/possibly including/spaces.mp3",
  230. ),
  231. ],
  232. )
  233. def test_get_active_cmus(tmpdir, status_stdout, expected_path):
  234. status_path = tmpdir.join("status")
  235. with status_path.open("wb") as status_file:
  236. status_file.write(status_stdout)
  237. cmd_mock_path = tmpdir.join("cmus-remote-mock")
  238. with cmd_mock_path.open("w") as cmd_file:
  239. cmd_file.write("#!/bin/sh\n")
  240. cmd_file.write('[ "$@" = "-Q" ] || exit 1\n')
  241. cmd_file.write("cat {}\n".format(shlex.quote(status_path.strpath)))
  242. cmd_mock_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
  243. with unittest.mock.patch("symuid.Track.__init__", return_value=None) as init_mock:
  244. with unittest.mock.patch(
  245. "symuid.Track._CMUS_REMOTE_COMMAND", cmd_mock_path.strpath
  246. ):
  247. assert isinstance(symuid.Track.get_active_cmus(), symuid.Track)
  248. init_mock.assert_called_once()
  249. init_args, init_kwargs = init_mock.call_args
  250. assert init_args == tuple()
  251. assert init_kwargs == {"path": expected_path}
  252. def test_get_active_cmus_none(tmpdir):
  253. status_path = tmpdir.join("status")
  254. with status_path.open("wb") as status_file:
  255. status_file.write(
  256. b"status stopped\n"
  257. b"set aaa_mode all\n"
  258. b"set continue true\n"
  259. b"set play_library true\n"
  260. b"set play_sorted false\n"
  261. b"set replaygain disabled\n"
  262. b"set replaygain_limit true\n"
  263. b"set replaygain_preamp 0.000000\n"
  264. b"set repeat false\n"
  265. b"set repeat_current false\n"
  266. b"set shuffle true\n"
  267. b"set softvol false\n"
  268. b"set vol_left 0\n"
  269. b"set vol_right 0\n"
  270. )
  271. cmd_mock_path = tmpdir.join("cmus-remote-mock")
  272. with cmd_mock_path.open("w") as cmd_file:
  273. cmd_file.write("#!/bin/sh\n")
  274. cmd_file.write('[ "$@" = "-Q" ] || exit 1\n')
  275. cmd_file.write("cat {}\n".format(shlex.quote(status_path.strpath)))
  276. cmd_mock_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
  277. with unittest.mock.patch(
  278. "symuid.Track._CMUS_REMOTE_COMMAND", cmd_mock_path.strpath
  279. ):
  280. assert symuid.Track.get_active_cmus() is None
  281. def test_get_active_cmus_not_running(tmpdir):
  282. cmd_mock_path = tmpdir.join("cmus-remote-mock")
  283. with cmd_mock_path.open("w") as cmd_file:
  284. cmd_file.write("#!/bin/sh\n")
  285. cmd_file.write("echo cmus-remote: cmus is not running >> /dev/stderr\n")
  286. cmd_file.write("exit 1\n")
  287. cmd_mock_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
  288. with unittest.mock.patch(
  289. "symuid.Track._CMUS_REMOTE_COMMAND", cmd_mock_path.strpath
  290. ):
  291. assert symuid.Track.get_active_cmus() is None
  292. def test_get_active_cmus_unexpected_error(tmpdir):
  293. cmd_mock_path = tmpdir.join("cmus-remote-mock")
  294. with cmd_mock_path.open("w") as cmd_file:
  295. cmd_file.write("#!/bin/sh\n")
  296. cmd_file.write("exit 42\n")
  297. cmd_mock_path.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
  298. with unittest.mock.patch(
  299. "symuid.Track._CMUS_REMOTE_COMMAND", cmd_mock_path.strpath
  300. ):
  301. with pytest.raises(subprocess.CalledProcessError):
  302. symuid.Track.get_active_cmus()