_tag_interface.py 11 KB


  1. import abc
  2. import re
  3. import typing
  4. import mutagen.id3
  5. import mutagen.mp4
  6. from symuid._uuid import uuid_bytes_to_str, uuid_str_to_bytes
  7. class TagInterface(abc.ABC):
  8. @abc.abstractproperty
  9. def track_path(self):
  10. pass
  11. @abc.abstractmethod
  12. def get_comment(self):
  13. pass
  14. @abc.abstractmethod
  15. def set_comment(self, comment):
  16. pass
  17. @abc.abstractmethod
  18. def get_track_uuid(self):
  19. pass
  20. @abc.abstractmethod
  21. def set_track_uuid(self, uuid):
  22. pass
  23. @abc.abstractmethod
  24. def save(self):
  25. pass
  26. @abc.abstractmethod
  27. def get_free_int(self, tag_label):
  28. pass
  29. @abc.abstractmethod
  30. def set_free_int(self, tag_label, data):
  31. pass
  32. @abc.abstractmethod
  33. def get_free_ints(self, tag_label_prefix):
  34. pass
  35. class _MutagenTagInterface(TagInterface):
  36. # pylint: disable=abstract-method
  37. def __init__(self, mutagen_file):
  38. self._mutagen_file = mutagen_file
  39. @property
  40. def track_path(self):
  41. return self._mutagen_file.filename
  42. def save(self):
  43. self._mutagen_file.save()
  44. class ID3(_MutagenTagInterface):
  45. # http://id3.org/id3v2.4.0-frames#4.1.
  46. _UFID_OWNER_ID = "symuid"
  47. def __init__(self, mutagen_file):
  48. assert isinstance(mutagen_file.tags, mutagen.id3.ID3), mutagen_file.tags
  49. super().__init__(mutagen_file)
  50. def _get_str(self, tag_label) -> str:
  51. tag = self._mutagen_file.tags.get(tag_label, None)
  52. if tag is None:
  53. raise KeyError(tag_label)
  54. if len(tag.text) == 1:
  55. return tag.text[0]
  56. raise ValueError(tag)
  57. def _get_free_str(self, tag_label) -> str:
  58. return self._get_str("TXXX:" + tag_label)
  59. def get_free_ints(self, tag_label_prefix):
  60. for tag in self._mutagen_file.tags.getall("TXXX:" + tag_label_prefix):
  61. assert len(tag.text) == 1, tag
  62. yield (tag.desc, int(tag.text[0]))
  63. def get_free_int(self, tag_label) -> typing.Optional[int]:
  64. try:
  65. return int(self._get_free_str(tag_label))
  66. except KeyError:
  67. return None
  68. def get_free_int_ratio(self, tag_label) -> typing.Tuple[int, int]:
  69. nominator, denominator = map(int, self._get_free_str(tag_label).split("/"))
  70. return (nominator, denominator)
  71. def _set_free_str(self, tag_label: str, string: str) -> mutagen.id3.TXXX:
  72. # mutagen.id3._specs.EncodedTextSpec.write encodes 'desc' and 'text'
  73. tag = mutagen.id3.TXXX(
  74. encoding=mutagen.id3.Encoding.LATIN1,
  75. desc=tag_label,
  76. text=[string],
  77. )
  78. # TODO overwrite instead of add() ?
  79. self._mutagen_file.tags.add(tag)
  80. return tag
  81. def set_free_int(self, tag_label: str, data: str) -> mutagen.id3.TXXX:
  82. return self._set_free_str(tag_label=tag_label, string=str(data))
  83. def set_free_int_ratio(
  84. self, tag_label, numerator: int, denominator: int
  85. ) -> mutagen.id3.TXXX:
  86. return self._set_free_str(
  87. tag_label=tag_label, string="{}/{}".format(numerator, denominator)
  88. )
  89. def get_comment(self) -> typing.Optional[str]:
  90. try:
  91. return self._get_str("COMM::eng")
  92. except KeyError:
  93. return None
  94. def set_comment(self, comment) -> mutagen.id3.COMM:
  95. tag = mutagen.id3.COMM(
  96. encoding=mutagen.id3.Encoding.UTF8,
  97. lang="eng",
  98. text=[comment],
  99. )
  100. self._mutagen_file.tags.add(tag)
  101. return tag
  102. def get_track_uuid(self) -> typing.Optional:
  103. for ufid in self._mutagen_file.tags.getall("UFID"):
  104. if ufid.owner == self._UFID_OWNER_ID:
  105. return ufid.data
  106. return None
  107. def set_track_uuid(self, uuid) -> mutagen.id3.UFID:
  108. # mutagen.id3._specs.EncodedTextSpec.write encodes 'owner'
  109. tag = mutagen.id3.UFID(owner=self._UFID_OWNER_ID, data=uuid)
  110. self._mutagen_file.tags.add(tag)
  111. return tag
  112. class MP4(_MutagenTagInterface):
  113. _UUID_TAG_KEY = "symuid:uuid"
  114. _COMMENT_TAG_KEY = "\xa9cmt"
  115. def __init__(self, mutagen_file):
  116. assert mutagen_file.tags is not None, mutagen_file
  117. assert isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags), mutagen_file.tags
  118. super().__init__(mutagen_file)
  119. @staticmethod
  120. def _freeform_to_int(freeform):
  121. # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes"
  122. assert freeform.dataformat == mutagen.mp4.AtomDataType.INTEGER, freeform
  123. return int.from_bytes(freeform, byteorder="big", signed=True)
  124. def get_free_ints(self, tag_label_prefix):
  125. label_pattern = re.compile(r"^----:{}(:|$)".format(re.escape(tag_label_prefix)))
  126. for label, values in self._mutagen_file.tags.items():
  127. # TODO overwrite instead of add() ?
  128. if label_pattern.match(label):
  129. assert len(values) == 1, (label, values)
  130. value = MP4._freeform_to_int(values[0])
  131. yield (re.sub(r"^----:", "", label), value)
  132. def _get_free(self, tag_label) -> typing.List[mutagen.mp4.MP4FreeForm]:
  133. # freeform keys start with '----'
  134. # http://mutagen.readthedocs.io/en/latest/api/mp4.html
  135. tag = self._mutagen_file.tags.get("----:" + tag_label, None)
  136. if not tag:
  137. raise KeyError(tag_label)
  138. return tag
  139. def _get_free_ints(self, tag_label) -> typing.Tuple[int]:
  140. return tuple(self._freeform_to_int(i) for i in self._get_free(tag_label))
  141. def get_free_int(self, tag_label):
  142. try:
  143. ints = self._get_free_ints(tag_label)
  144. except KeyError:
  145. return None
  146. if len(ints) != 1:
  147. raise ValueError((tag_label, ints))
  148. return ints[0]
  149. def get_free_int_ratio(self, tag_label) -> typing.Tuple[int, int]:
  150. ints = self._get_free_ints(tag_label)
  151. if len(ints) != 2:
  152. raise ValueError((tag_label, ints))
  153. return ints
  154. def _get_free_uuid(self, tag_label) -> mutagen.mp4.MP4FreeForm:
  155. (tag,) = self._get_free(tag_label)
  156. if tag.dataformat != mutagen.mp4.AtomDataType.UUID:
  157. raise ValueError(tag)
  158. return tag
  159. def _set_free(self, tag_label, dataformat, data):
  160. assert isinstance(data, bytes)
  161. tag = mutagen.mp4.MP4FreeForm(dataformat=dataformat, data=data)
  162. self._mutagen_file.tags["----:" + tag_label] = [tag]
  163. return tag
  164. @staticmethod
  165. def _int_to_freeform_data(integer: int) -> bytes:
  166. # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes"
  167. # TODO exclude 5-7, 9-
  168. byte_len = max((integer.bit_length() + 8) // 8, 1)
  169. return integer.to_bytes(byte_len, byteorder="big", signed=True)
  170. def set_free_int(self, tag_label, data):
  171. assert isinstance(data, int)
  172. return self._set_free(
  173. tag_label=tag_label,
  174. dataformat=mutagen.mp4.AtomDataType.INTEGER,
  175. data=self._int_to_freeform_data(data),
  176. )
  177. def set_free_int_ratio(
  178. self, tag_label, numerator: int, denominator: int
  179. ) -> typing.List[mutagen.mp4.MP4FreeForm]:
  180. tag = [
  181. mutagen.mp4.MP4FreeForm(
  182. dataformat=mutagen.mp4.AtomDataType.INTEGER,
  183. data=self._int_to_freeform_data(i),
  184. )
  185. for i in (numerator, denominator)
  186. ]
  187. self._mutagen_file.tags["----:" + tag_label] = tag
  188. return tag
  189. def _set_free_uuid(self, tag_label, data):
  190. return self._set_free(
  191. tag_label=tag_label,
  192. # https://mutagen.readthedocs.io/en/latest/api/mp4.html#mutagen.mp4.AtomDataType.UUID
  193. dataformat=mutagen.mp4.AtomDataType.UUID,
  194. data=data,
  195. )
  196. def get_comment(self):
  197. tag = self._mutagen_file.tags.get(self._COMMENT_TAG_KEY, None)
  198. if tag is None:
  199. return None
  200. if len(tag) == 1:
  201. return tag[0]
  202. raise ValueError(tag)
  203. def set_comment(self, comment: str) -> None:
  204. self._mutagen_file[self._COMMENT_TAG_KEY] = [comment]
  205. def get_track_uuid(self):
  206. try:
  207. return self._get_free_uuid(self._UUID_TAG_KEY)
  208. except KeyError:
  209. return None
  210. def set_track_uuid(self, uuid):
  211. return self._set_free_uuid(self._UUID_TAG_KEY, uuid)
  212. class Ogg(_MutagenTagInterface):
  213. # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/opus.c#L229
  214. # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/vorbis.c#L319
  215. # https://github.com/cmus/cmus/blob/17bf542c6b120d9dcf6642b259d78badfc1143eb/comment.c#L224
  216. _COMMENT_TAG_KEY = "comment"
  217. _UUID_TAG_KEY = "symuid:uuid"
  218. def __init__(self, mutagen_file):
  219. assert isinstance(
  220. mutagen_file.tags,
  221. (mutagen.oggopus.OggOpusVComment, mutagen.oggvorbis.OggVCommentDict),
  222. ), (type(mutagen_file), type(mutagen_file.tags))
  223. super().__init__(mutagen_file)
  224. def _get_single_text(self, tag_label) -> typing.Optional[str]:
  225. tag = self._mutagen_file.get(tag_label, None) # type: list
  226. if tag is None:
  227. return None
  228. if len(tag) > 1:
  229. raise ValueError((self.track_path, tag))
  230. if not isinstance(tag[0], str):
  231. raise ValueError((self.track_path, tag))
  232. return tag[0]
  233. def get_comment(self) -> typing.Optional[str]:
  234. return self._get_single_text(self._COMMENT_TAG_KEY)
  235. def set_comment(self, comment: str) -> None:
  236. self._mutagen_file[self._COMMENT_TAG_KEY] = comment
  237. def get_track_uuid(self) -> typing.Optional[bytes]:
  238. uuid_str = self._get_single_text(self._UUID_TAG_KEY)
  239. return uuid_str_to_bytes(uuid_str) if uuid_str else None
  240. def set_track_uuid(self, uuid: bytes) -> None:
  241. self._mutagen_file[self._UUID_TAG_KEY] = uuid_bytes_to_str(uuid)
  242. def get_free_int(self, tag_label: str) -> typing.Optional[int]:
  243. dec = self._get_single_text(tag_label)
  244. return int(dec) if dec else None
  245. def get_free_ints(
  246. self, tag_label_prefix: str
  247. ) -> typing.Iterator[typing.Tuple[str, int]]:
  248. for tag_key, tag_value in self._mutagen_file.items():
  249. if tag_key == tag_label_prefix or tag_key.startswith(
  250. tag_label_prefix + ":"
  251. ):
  252. if len(tag_value) > 1:
  253. raise ValueError((self.track_path, tag_key, tag_value))
  254. yield (tag_key, int(tag_value[0]))
  255. def set_free_int(self, tag_label: str, data: int) -> typing.Tuple:
  256. self._mutagen_file[tag_label] = str(data)
  257. return (tag_label, self._mutagen_file[tag_label])