_tag_interface.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. import abc
  2. import re
  3. import typing
  4. import mutagen.id3
  5. import mutagen.mp4
  6. from symuid._uuid import uuid_bytes_to_str, uuid_str_to_bytes
  7. class TagInterface(abc.ABC):
  8. @abc.abstractproperty
  9. def track_path(self):
  10. pass
  11. @abc.abstractmethod
  12. def get_comment(self):
  13. pass
  14. @abc.abstractmethod
  15. def set_comment(self, comment):
  16. pass
  17. @abc.abstractmethod
  18. def get_track_uuid(self):
  19. pass
  20. @abc.abstractmethod
  21. def set_track_uuid(self, uuid):
  22. pass
  23. @abc.abstractmethod
  24. def save(self):
  25. pass
  26. @abc.abstractmethod
  27. def get_free_int(self, tag_label):
  28. pass
  29. @abc.abstractmethod
  30. def set_free_int(self, tag_label, data):
  31. pass
  32. @abc.abstractmethod
  33. def get_free_ints(self, tag_label_prefix):
  34. pass
  35. class _MutagenTagInterface(TagInterface):
  36. # pylint: disable=abstract-method
  37. def __init__(self, mutagen_file):
  38. self._mutagen_file = mutagen_file
  39. @property
  40. def track_path(self):
  41. return self._mutagen_file.filename
  42. def save(self):
  43. self._mutagen_file.save()
  44. class ID3(_MutagenTagInterface):
  45. # http://id3.org/id3v2.4.0-frames#4.1.
  46. _UFID_OWNER_ID = "symuid"
  47. def __init__(self, mutagen_file):
  48. assert isinstance(mutagen_file.tags, mutagen.id3.ID3), mutagen_file.tags
  49. super().__init__(mutagen_file)
  50. def _get_str(self, tag_label) -> str:
  51. tag = self._mutagen_file.tags.get(tag_label, None)
  52. if tag is None:
  53. raise KeyError(tag_label)
  54. if len(tag.text) == 1:
  55. return tag.text[0]
  56. raise ValueError(tag)
  57. def _get_free_str(self, tag_label) -> str:
  58. return self._get_str("TXXX:" + tag_label)
  59. def get_free_ints(self, tag_label_prefix):
  60. for tag in self._mutagen_file.tags.getall("TXXX:" + tag_label_prefix):
  61. assert len(tag.text) == 1, tag
  62. yield (tag.desc, int(tag.text[0]))
  63. def get_free_int(self, tag_label) -> typing.Optional[int]:
  64. try:
  65. return int(self._get_free_str(tag_label))
  66. except KeyError:
  67. return None
  68. def get_free_int_ratio(self, tag_label) -> typing.Tuple[int, int]:
  69. nominator, denominator = map(int, self._get_free_str(tag_label).split("/"))
  70. return (nominator, denominator)
  71. def _set_free_str(self, tag_label: str, string: str) -> mutagen.id3.TXXX:
  72. # mutagen.id3._specs.EncodedTextSpec.write encodes 'desc' and 'text'
  73. tag = mutagen.id3.TXXX(
  74. encoding=mutagen.id3.Encoding.LATIN1, desc=tag_label, text=[string],
  75. )
  76. # TODO overwrite instead of add() ?
  77. self._mutagen_file.tags.add(tag)
  78. return tag
  79. def set_free_int(self, tag_label: str, data: str) -> mutagen.id3.TXXX:
  80. return self._set_free_str(tag_label=tag_label, string=str(data))
  81. def set_free_int_ratio(
  82. self, tag_label, numerator: int, denominator: int
  83. ) -> mutagen.id3.TXXX:
  84. return self._set_free_str(
  85. tag_label=tag_label, string="{}/{}".format(numerator, denominator)
  86. )
  87. def get_comment(self) -> typing.Optional[str]:
  88. try:
  89. return self._get_str("COMM::eng")
  90. except KeyError:
  91. return None
  92. def set_comment(self, comment) -> mutagen.id3.COMM:
  93. tag = mutagen.id3.COMM(
  94. encoding=mutagen.id3.Encoding.UTF8, lang="eng", text=[comment],
  95. )
  96. self._mutagen_file.tags.add(tag)
  97. return tag
  98. def get_track_uuid(self) -> typing.Optional:
  99. for ufid in self._mutagen_file.tags.getall("UFID"):
  100. if ufid.owner == self._UFID_OWNER_ID:
  101. return ufid.data
  102. return None
  103. def set_track_uuid(self, uuid) -> mutagen.id3.UFID:
  104. # mutagen.id3._specs.EncodedTextSpec.write encodes 'owner'
  105. tag = mutagen.id3.UFID(owner=self._UFID_OWNER_ID, data=uuid)
  106. self._mutagen_file.tags.add(tag)
  107. return tag
  108. class MP4(_MutagenTagInterface):
  109. _UUID_TAG_KEY = "symuid:uuid"
  110. _COMMENT_TAG_KEY = "\xa9cmt"
  111. def __init__(self, mutagen_file):
  112. assert mutagen_file.tags is not None, mutagen_file
  113. assert isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags), mutagen_file.tags
  114. super().__init__(mutagen_file)
  115. @staticmethod
  116. def _freeform_to_int(freeform):
  117. # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes"
  118. assert freeform.dataformat == mutagen.mp4.AtomDataType.INTEGER, freeform
  119. return int.from_bytes(freeform, byteorder="big", signed=True)
  120. def get_free_ints(self, tag_label_prefix):
  121. label_pattern = re.compile(
  122. r"^----:{}(:|$)".format(re.escape(tag_label_prefix),)
  123. )
  124. for label, values in self._mutagen_file.tags.items():
  125. # TODO overwrite instead of add() ?
  126. if label_pattern.match(label):
  127. assert len(values) == 1, (label, values)
  128. value = MP4._freeform_to_int(values[0])
  129. yield (re.sub(r"^----:", "", label), value)
  130. def _get_free(self, tag_label) -> typing.List[mutagen.mp4.MP4FreeForm]:
  131. # freeform keys start with '----'
  132. # http://mutagen.readthedocs.io/en/latest/api/mp4.html
  133. tag = self._mutagen_file.tags.get("----:" + tag_label, None)
  134. if not tag:
  135. raise KeyError(tag_label)
  136. return tag
  137. def _get_free_ints(self, tag_label) -> typing.Tuple[int]:
  138. return tuple(self._freeform_to_int(i) for i in self._get_free(tag_label))
  139. def get_free_int(self, tag_label):
  140. try:
  141. ints = self._get_free_ints(tag_label)
  142. except KeyError:
  143. return None
  144. if len(ints) != 1:
  145. raise ValueError((tag_label, ints))
  146. return ints[0]
  147. def get_free_int_ratio(self, tag_label) -> typing.Tuple[int, int]:
  148. ints = self._get_free_ints(tag_label)
  149. if len(ints) != 2:
  150. raise ValueError((tag_label, ints))
  151. return ints
  152. def _get_free_uuid(self, tag_label) -> mutagen.mp4.MP4FreeForm:
  153. (tag,) = self._get_free(tag_label)
  154. if tag.dataformat != mutagen.mp4.AtomDataType.UUID:
  155. raise ValueError(tag)
  156. return tag
  157. def _set_free(self, tag_label, dataformat, data):
  158. assert isinstance(data, bytes)
  159. tag = mutagen.mp4.MP4FreeForm(dataformat=dataformat, data=data)
  160. self._mutagen_file.tags["----:" + tag_label] = [tag]
  161. return tag
  162. @staticmethod
  163. def _int_to_freeform_data(integer: int) -> bytes:
  164. # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes"
  165. # TODO exclude 5-7, 9-
  166. byte_len = max((integer.bit_length() + 8) // 8, 1)
  167. return integer.to_bytes(byte_len, byteorder="big", signed=True)
  168. def set_free_int(self, tag_label, data):
  169. assert isinstance(data, int)
  170. return self._set_free(
  171. tag_label=tag_label,
  172. dataformat=mutagen.mp4.AtomDataType.INTEGER,
  173. data=self._int_to_freeform_data(data),
  174. )
  175. def set_free_int_ratio(
  176. self, tag_label, numerator: int, denominator: int
  177. ) -> typing.List[mutagen.mp4.MP4FreeForm]:
  178. tag = [
  179. mutagen.mp4.MP4FreeForm(
  180. dataformat=mutagen.mp4.AtomDataType.INTEGER,
  181. data=self._int_to_freeform_data(i),
  182. )
  183. for i in (numerator, denominator)
  184. ]
  185. self._mutagen_file.tags["----:" + tag_label] = tag
  186. return tag
  187. def _set_free_uuid(self, tag_label, data):
  188. return self._set_free(
  189. tag_label=tag_label,
  190. # https://mutagen.readthedocs.io/en/latest/api/mp4.html#mutagen.mp4.AtomDataType.UUID
  191. dataformat=mutagen.mp4.AtomDataType.UUID,
  192. data=data,
  193. )
  194. def get_comment(self):
  195. tag = self._mutagen_file.tags.get(self._COMMENT_TAG_KEY, None)
  196. if tag is None:
  197. return None
  198. if len(tag) == 1:
  199. return tag[0]
  200. raise ValueError(tag)
  201. def set_comment(self, comment: str) -> None:
  202. self._mutagen_file[self._COMMENT_TAG_KEY] = [comment]
  203. def get_track_uuid(self):
  204. try:
  205. return self._get_free_uuid(self._UUID_TAG_KEY)
  206. except KeyError:
  207. return None
  208. def set_track_uuid(self, uuid):
  209. return self._set_free_uuid(self._UUID_TAG_KEY, uuid)
  210. class Ogg(_MutagenTagInterface):
  211. # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/opus.c#L229
  212. # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/vorbis.c#L319
  213. # https://github.com/cmus/cmus/blob/17bf542c6b120d9dcf6642b259d78badfc1143eb/comment.c#L224
  214. _COMMENT_TAG_KEY = "comment"
  215. _UUID_TAG_KEY = "symuid:uuid"
  216. def __init__(self, mutagen_file):
  217. assert isinstance(
  218. mutagen_file.tags,
  219. (mutagen.oggopus.OggOpusVComment, mutagen.oggvorbis.OggVCommentDict),
  220. ), (type(mutagen_file), type(mutagen_file.tags))
  221. super().__init__(mutagen_file)
  222. def _get_single_text(self, tag_label) -> typing.Optional[str]:
  223. tag = self._mutagen_file.get(tag_label, None) # type: list
  224. if tag is None:
  225. return None
  226. if len(tag) > 1:
  227. raise ValueError((self.track_path, tag))
  228. if not isinstance(tag[0], str):
  229. raise ValueError((self.track_path, tag))
  230. return tag[0]
  231. def get_comment(self) -> typing.Optional[str]:
  232. return self._get_single_text(self._COMMENT_TAG_KEY)
  233. def set_comment(self, comment: str) -> None:
  234. self._mutagen_file[self._COMMENT_TAG_KEY] = comment
  235. def get_track_uuid(self) -> typing.Optional[bytes]:
  236. uuid_str = self._get_single_text(self._UUID_TAG_KEY)
  237. return uuid_str_to_bytes(uuid_str) if uuid_str else None
  238. def set_track_uuid(self, uuid: bytes) -> None:
  239. self._mutagen_file[self._UUID_TAG_KEY] = uuid_bytes_to_str(uuid)
  240. def get_free_int(self, tag_label: str) -> typing.Optional[int]:
  241. dec = self._get_single_text(tag_label)
  242. return int(dec) if dec else None
  243. def get_free_ints(
  244. self, tag_label_prefix: str
  245. ) -> typing.Iterator[typing.Tuple[str, int]]:
  246. for tag_key, tag_value in self._mutagen_file.items():
  247. if tag_key == tag_label_prefix or tag_key.startswith(
  248. tag_label_prefix + ":"
  249. ):
  250. if len(tag_value) > 1:
  251. raise ValueError((self.track_path, tag_key, tag_value))
  252. yield (tag_key, int(tag_value[0]))
  253. def set_free_int(self, tag_label: str, data: int) -> typing.Tuple:
  254. self._mutagen_file[tag_label] = str(data)
  255. return (tag_label, self._mutagen_file[tag_label])