_tag_interface.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. import abc
  2. import re
  3. import typing
  4. import mutagen.id3
  5. import mutagen.mp4
  6. from symuid._uuid import uuid_bytes_to_str, uuid_str_to_bytes
  7. class TagInterface(abc.ABC):
  8. @abc.abstractproperty
  9. def track_path(self):
  10. pass
  11. @abc.abstractmethod
  12. def get_comment(self):
  13. pass
  14. @abc.abstractmethod
  15. def set_comment(self, comment):
  16. pass
  17. @abc.abstractmethod
  18. def get_track_uuid(self):
  19. pass
  20. @abc.abstractmethod
  21. def set_track_uuid(self, uuid):
  22. pass
  23. @abc.abstractmethod
  24. def save(self):
  25. pass
  26. @abc.abstractmethod
  27. def get_free_int(self, tag_label):
  28. pass
  29. @abc.abstractmethod
  30. def set_free_int(self, tag_label, data):
  31. pass
  32. @abc.abstractmethod
  33. def get_free_ints(self, tag_label_prefix):
  34. pass
  35. class _MutagenTagInterface(TagInterface):
  36. # pylint: disable=abstract-method
  37. def __init__(self, mutagen_file):
  38. self._mutagen_file = mutagen_file
  39. @property
  40. def track_path(self):
  41. return self._mutagen_file.filename
  42. def save(self):
  43. self._mutagen_file.save()
  44. class ID3(_MutagenTagInterface):
  45. # http://id3.org/id3v2.4.0-frames#4.1.
  46. _UFID_OWNER_ID = 'symuid'
  47. def __init__(self, mutagen_file):
  48. assert isinstance(mutagen_file.tags, mutagen.id3.ID3), \
  49. mutagen_file.tags
  50. super().__init__(mutagen_file)
  51. def _get_str(self, tag_label) -> str:
  52. tag = self._mutagen_file.tags.get(tag_label, None)
  53. if tag is None:
  54. raise KeyError(tag_label)
  55. if len(tag.text) == 1:
  56. return tag.text[0]
  57. raise ValueError(tag)
  58. def _get_free_str(self, tag_label) -> str:
  59. return self._get_str('TXXX:' + tag_label)
  60. def get_free_ints(self, tag_label_prefix):
  61. for tag in self._mutagen_file.tags.getall('TXXX:' + tag_label_prefix):
  62. assert len(tag.text) == 1, tag
  63. yield (tag.desc, int(tag.text[0]))
  64. def get_free_int(self, tag_label) -> typing.Optional[int]:
  65. try:
  66. return int(self._get_free_str(tag_label))
  67. except KeyError:
  68. return None
  69. def get_free_int_ratio(self, tag_label) -> typing.Tuple[int, int]:
  70. nominator, denominator = map(int, self._get_free_str(tag_label).split('/'))
  71. return (nominator, denominator)
  72. def _set_free_str(self, tag_label: str, string: str) -> mutagen.id3.TXXX:
  73. # mutagen.id3._specs.EncodedTextSpec.write encodes 'desc' and 'text'
  74. tag = mutagen.id3.TXXX(
  75. encoding=mutagen.id3.Encoding.LATIN1,
  76. desc=tag_label,
  77. text=[string],
  78. )
  79. # TODO overwrite instead of add() ?
  80. self._mutagen_file.tags.add(tag)
  81. return tag
  82. def set_free_int(self, tag_label: str, data: str) -> mutagen.id3.TXXX:
  83. return self._set_free_str(tag_label=tag_label, string=str(data))
  84. def set_free_int_ratio(self, tag_label, numerator: int, denominator: int) -> mutagen.id3.TXXX:
  85. return self._set_free_str(tag_label=tag_label,
  86. string='{}/{}'.format(numerator, denominator))
  87. def get_comment(self) -> typing.Optional[str]:
  88. try:
  89. return self._get_str('COMM::eng')
  90. except KeyError:
  91. return None
  92. def set_comment(self, comment) -> mutagen.id3.COMM:
  93. tag = mutagen.id3.COMM(
  94. encoding=mutagen.id3.Encoding.UTF8,
  95. lang='eng',
  96. text=[comment],
  97. )
  98. self._mutagen_file.tags.add(tag)
  99. return tag
  100. def get_track_uuid(self) -> typing.Optional:
  101. for ufid in self._mutagen_file.tags.getall('UFID'):
  102. if ufid.owner == self._UFID_OWNER_ID:
  103. return ufid.data
  104. return None
  105. def set_track_uuid(self, uuid) -> mutagen.id3.UFID:
  106. # mutagen.id3._specs.EncodedTextSpec.write encodes 'owner'
  107. tag = mutagen.id3.UFID(owner=self._UFID_OWNER_ID, data=uuid)
  108. self._mutagen_file.tags.add(tag)
  109. return tag
  110. class MP4(_MutagenTagInterface):
  111. _UUID_TAG_KEY = 'symuid:uuid'
  112. _COMMENT_TAG_KEY = '\xa9cmt'
  113. def __init__(self, mutagen_file):
  114. assert mutagen_file.tags is not None, mutagen_file
  115. assert isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags), \
  116. mutagen_file.tags
  117. super().__init__(mutagen_file)
  118. @staticmethod
  119. def _freeform_to_int(freeform):
  120. # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes"
  121. assert freeform.dataformat == mutagen.mp4.AtomDataType.INTEGER, freeform
  122. return int.from_bytes(freeform, byteorder='big', signed=True)
  123. def get_free_ints(self, tag_label_prefix):
  124. label_pattern = re.compile(r'^----:{}(:|$)'.format(
  125. re.escape(tag_label_prefix),
  126. ))
  127. for label, values in self._mutagen_file.tags.items():
  128. # TODO overwrite instead of add() ?
  129. if label_pattern.match(label):
  130. assert len(values) == 1, (label, values)
  131. value = MP4._freeform_to_int(values[0])
  132. yield (re.sub(r'^----:', '', label), value)
  133. def _get_free(self, tag_label) -> typing.List[mutagen.mp4.MP4FreeForm]:
  134. # freeform keys start with '----'
  135. # http://mutagen.readthedocs.io/en/latest/api/mp4.html
  136. tag = self._mutagen_file.tags.get('----:' + tag_label, None)
  137. if not tag:
  138. raise KeyError(tag_label)
  139. return tag
  140. def _get_free_ints(self, tag_label) -> typing.Tuple[int]:
  141. return tuple(self._freeform_to_int(i) for i in self._get_free(tag_label))
  142. def get_free_int(self, tag_label):
  143. try:
  144. ints = self._get_free_ints(tag_label)
  145. except KeyError:
  146. return None
  147. if len(ints) != 1:
  148. raise ValueError((tag_label, ints))
  149. return ints[0]
  150. def get_free_int_ratio(self, tag_label) -> typing.Tuple[int, int]:
  151. ints = self._get_free_ints(tag_label)
  152. if len(ints) != 2:
  153. raise ValueError((tag_label, ints))
  154. return ints
  155. def _get_free_uuid(self, tag_label) -> mutagen.mp4.MP4FreeForm:
  156. tag, = self._get_free(tag_label)
  157. if tag.dataformat != mutagen.mp4.AtomDataType.UUID:
  158. raise ValueError(tag)
  159. return tag
  160. def _set_free(self, tag_label, dataformat, data):
  161. assert isinstance(data, bytes)
  162. tag = mutagen.mp4.MP4FreeForm(dataformat=dataformat, data=data)
  163. self._mutagen_file.tags['----:' + tag_label] = [tag]
  164. return tag
  165. def set_free_int(self, tag_label, data):
  166. assert isinstance(data, int)
  167. return self._set_free(
  168. tag_label=tag_label,
  169. # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes"
  170. dataformat=mutagen.mp4.AtomDataType.INTEGER,
  171. # TODO set byte length properly
  172. data=data.to_bytes(1, byteorder='big', signed=True),
  173. )
  174. def _set_free_uuid(self, tag_label, data):
  175. return self._set_free(
  176. tag_label=tag_label,
  177. # https://mutagen.readthedocs.io/en/latest/api/mp4.html#mutagen.mp4.AtomDataType.UUID
  178. dataformat=mutagen.mp4.AtomDataType.UUID,
  179. data=data,
  180. )
  181. def get_comment(self):
  182. tag = self._mutagen_file.tags.get(self._COMMENT_TAG_KEY, None)
  183. if tag is None:
  184. return None
  185. if len(tag) == 1:
  186. return tag[0]
  187. raise ValueError(tag)
  188. def set_comment(self, comment: str) -> None:
  189. self._mutagen_file[self._COMMENT_TAG_KEY] = [comment]
  190. def get_track_uuid(self):
  191. try:
  192. return self._get_free_uuid(self._UUID_TAG_KEY)
  193. except KeyError:
  194. return None
  195. def set_track_uuid(self, uuid):
  196. return self._set_free_uuid(self._UUID_TAG_KEY, uuid)
  197. class Ogg(_MutagenTagInterface):
  198. # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/opus.c#L229
  199. # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/vorbis.c#L319
  200. # https://github.com/cmus/cmus/blob/17bf542c6b120d9dcf6642b259d78badfc1143eb/comment.c#L224
  201. _COMMENT_TAG_KEY = 'comment'
  202. _UUID_TAG_KEY = 'symuid:uuid'
  203. def __init__(self, mutagen_file):
  204. assert isinstance(mutagen_file.tags, (mutagen.oggopus.OggOpusVComment,
  205. mutagen.oggvorbis.OggVCommentDict)), \
  206. (type(mutagen_file), type(mutagen_file.tags))
  207. super().__init__(mutagen_file)
  208. def _get_single_text(self, tag_label) -> typing.Optional[str]:
  209. tag = self._mutagen_file.get(tag_label, None) # type: list
  210. if tag is None:
  211. return None
  212. if len(tag) > 1:
  213. raise ValueError((self.track_path, tag))
  214. if not isinstance(tag[0], str):
  215. raise ValueError((self.track_path, tag))
  216. return tag[0]
  217. def get_comment(self) -> typing.Optional[str]:
  218. return self._get_single_text(self._COMMENT_TAG_KEY)
  219. def set_comment(self, comment: str) -> None:
  220. self._mutagen_file[self._COMMENT_TAG_KEY] = comment
  221. def get_track_uuid(self) -> typing.Optional[bytes]:
  222. uuid_str = self._get_single_text(self._UUID_TAG_KEY)
  223. return uuid_str_to_bytes(uuid_str) if uuid_str else None
  224. def set_track_uuid(self, uuid: bytes) -> None:
  225. self._mutagen_file[self._UUID_TAG_KEY] = uuid_bytes_to_str(uuid)
  226. def get_free_int(self, tag_label: str) -> typing.Optional[int]:
  227. dec = self._get_single_text(tag_label)
  228. return int(dec) if dec else None
  229. def get_free_ints(self, tag_label_prefix: str) \
  230. -> typing.Iterator[typing.Tuple[str, int]]:
  231. for tag_key, tag_value in self._mutagen_file.items():
  232. if tag_key == tag_label_prefix \
  233. or tag_key.startswith(tag_label_prefix + ':'):
  234. if len(tag_value) > 1:
  235. raise ValueError((self.track_path, tag_key, tag_value))
  236. yield (tag_key, int(tag_value[0]))
  237. def set_free_int(self, tag_label: str, data: int) -> typing.Tuple:
  238. self._mutagen_file[tag_label] = str(data)
  239. return (tag_label, self._mutagen_file[tag_label])