_tag_interface.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. import abc
  2. import re
  3. import typing
  4. import mutagen.id3
  5. import mutagen.mp4
  6. from symuid._uuid import uuid_str_to_bytes
  7. class TagInterface(abc.ABC):
  8. @abc.abstractproperty
  9. def track_path(self):
  10. pass
  11. @abc.abstractmethod
  12. def get_comment(self):
  13. pass
  14. @abc.abstractmethod
  15. def set_comment(self, comment):
  16. pass
  17. @abc.abstractmethod
  18. def get_track_uuid(self):
  19. pass
  20. @abc.abstractmethod
  21. def set_track_uuid(self, uuid):
  22. pass
  23. @abc.abstractmethod
  24. def save(self):
  25. pass
  26. @abc.abstractmethod
  27. def get_free_int(self, tag_label):
  28. pass
  29. @abc.abstractmethod
  30. def set_free_int(self, tag_label, data):
  31. pass
  32. @abc.abstractmethod
  33. def get_free_ints(self, tag_label_prefix):
  34. pass
  35. class _MutagenTagInterface(TagInterface):
  36. # pylint: disable=abstract-method
  37. def __init__(self, mutagen_file):
  38. self._mutagen_file = mutagen_file
  39. @property
  40. def track_path(self):
  41. return self._mutagen_file.filename
  42. def save(self):
  43. self._mutagen_file.save()
  44. class ID3(_MutagenTagInterface):
  45. # http://id3.org/id3v2.4.0-frames#4.1.
  46. _UFID_OWNER_ID = 'symuid'
  47. def __init__(self, mutagen_file):
  48. assert isinstance(mutagen_file.tags, mutagen.id3.ID3), \
  49. mutagen_file.tags
  50. super().__init__(mutagen_file)
  51. def _get_single_text(self, tag_label):
  52. tag = self._mutagen_file.tags.get(tag_label, None)
  53. if tag is None:
  54. # {}.get('a') == None
  55. return None
  56. if len(tag.text) == 1:
  57. return tag.text[0]
  58. raise ValueError(tag)
  59. def get_free_ints(self, tag_label_prefix):
  60. for tag in self._mutagen_file.tags.getall('TXXX:' + tag_label_prefix):
  61. assert len(tag.text) == 1, tag
  62. yield (tag.desc, int(tag.text[0]))
  63. def get_free_int(self, tag_label):
  64. text = self._get_single_text('TXXX:' + tag_label)
  65. return int(text) if text else None
  66. def set_free_int(self, tag_label, data):
  67. # mutagen.id3._specs.EncodedTextSpec.write encodes 'desc' and 'text'
  68. tag = mutagen.id3.TXXX(
  69. encoding=mutagen.id3.Encoding.LATIN1,
  70. desc=tag_label,
  71. text=[str(data)],
  72. )
  73. # TODO overwrite instead of add() ?
  74. self._mutagen_file.tags.add(tag)
  75. return tag
  76. def get_comment(self):
  77. return self._get_single_text('COMM::eng')
  78. def set_comment(self, comment):
  79. tag = mutagen.id3.COMM(
  80. encoding=mutagen.id3.Encoding.UTF8,
  81. lang='eng',
  82. text=[comment],
  83. )
  84. self._mutagen_file.tags.add(tag)
  85. return tag
  86. def get_track_uuid(self):
  87. for ufid in self._mutagen_file.tags.getall('UFID'):
  88. if ufid.owner == self._UFID_OWNER_ID:
  89. return ufid.data
  90. return None
  91. def set_track_uuid(self, uuid):
  92. # mutagen.id3._specs.EncodedTextSpec.write encodes 'owner'
  93. tag = mutagen.id3.UFID(owner=self._UFID_OWNER_ID, data=uuid)
  94. self._mutagen_file.tags.add(tag)
  95. return tag
  96. class MP4(_MutagenTagInterface):
  97. _UUID_TAG_KEY = 'symuid:uuid'
  98. def __init__(self, mutagen_file):
  99. assert mutagen_file.tags, mutagen_file
  100. assert isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags), \
  101. mutagen_file.tags
  102. super().__init__(mutagen_file)
  103. def _get_single(self, tag_label):
  104. tag = self._mutagen_file.tags.get(tag_label, None)
  105. if tag is None:
  106. # {}.get('a') == None
  107. return None
  108. if len(tag) == 1:
  109. return tag[0]
  110. raise ValueError(tag)
  111. @staticmethod
  112. def _freeform_to_int(freeform):
  113. # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes"
  114. assert freeform.dataformat == mutagen.mp4.AtomDataType.INTEGER, freeform
  115. return int.from_bytes(freeform, byteorder='big', signed=True)
  116. def get_free_ints(self, tag_label_prefix):
  117. label_pattern = re.compile(r'^----:{}(:|$)'.format(
  118. re.escape(tag_label_prefix),
  119. ))
  120. for label, values in self._mutagen_file.tags.items():
  121. # TODO overwrite instead of add() ?
  122. if label_pattern.match(label):
  123. assert len(values) == 1, (label, values)
  124. value = MP4._freeform_to_int(values[0])
  125. yield (re.sub(r'^----:', '', label), value)
  126. def _get_free(self, tag_label):
  127. # freeform keys start with '----'
  128. # http://mutagen.readthedocs.io/en/latest/api/mp4.html
  129. return self._get_single('----:' + tag_label)
  130. def get_free_int(self, tag_label):
  131. tag = self._get_free(tag_label)
  132. return None if tag is None else MP4._freeform_to_int(tag)
  133. def _get_free_uuid(self, tag_label):
  134. tag = self._get_free(tag_label)
  135. assert tag is None or tag.dataformat == mutagen.mp4.AtomDataType.UUID, tag.dataformat
  136. return tag
  137. def _set_free(self, tag_label, dataformat, data):
  138. assert isinstance(data, bytes)
  139. tag = mutagen.mp4.MP4FreeForm(dataformat=dataformat, data=data)
  140. self._mutagen_file.tags['----:' + tag_label] = [tag]
  141. return tag
  142. def set_free_int(self, tag_label, data):
  143. assert isinstance(data, int)
  144. return self._set_free(
  145. tag_label=tag_label,
  146. # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes"
  147. dataformat=mutagen.mp4.AtomDataType.INTEGER,
  148. # TODO set byte length properly
  149. data=data.to_bytes(1, byteorder='big', signed=True),
  150. )
  151. def _set_free_uuid(self, tag_label, data):
  152. return self._set_free(
  153. tag_label=tag_label,
  154. # https://mutagen.readthedocs.io/en/latest/api/mp4.html#mutagen.mp4.AtomDataType.UUID
  155. dataformat=mutagen.mp4.AtomDataType.UUID,
  156. data=data,
  157. )
  158. def get_comment(self):
  159. return self._get_single('\xa9cmt')
  160. def set_comment(self, comment):
  161. raise NotImplementedError()
  162. def get_track_uuid(self):
  163. return self._get_free_uuid(self._UUID_TAG_KEY)
  164. def set_track_uuid(self, uuid):
  165. return self._set_free_uuid(self._UUID_TAG_KEY, uuid)
  166. class OggOpus(_MutagenTagInterface):
  167. # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/opus.c#L229
  168. # https://github.com/cmus/cmus/blob/17bf542c6b120d9dcf6642b259d78badfc1143eb/comment.c#L224
  169. _COMMENT_TAG_KEY = 'comment'
  170. _UUID_TAG_KEY = 'symuid:uuid'
  171. def __init__(self, mutagen_file):
  172. assert isinstance(mutagen_file.tags, mutagen.oggopus.OggOpusVComment), \
  173. (mutagen_file, mutagen_file.tags)
  174. super().__init__(mutagen_file)
  175. def _get_single_text(self, tag_label) -> typing.Optional[str]:
  176. tag = self._mutagen_file.get(tag_label, None) # type: list
  177. if tag is None:
  178. return None
  179. if len(tag) > 1:
  180. raise ValueError((self.track_path, tag))
  181. if not isinstance(tag[0], str):
  182. raise ValueError((self.track_path, tag))
  183. return tag[0]
  184. def get_comment(self) -> typing.Optional[str]:
  185. return self._get_single_text(self._COMMENT_TAG_KEY)
  186. def set_comment(self, comment: str) -> None:
  187. self._mutagen_file[self._COMMENT_TAG_KEY] = comment
  188. def get_track_uuid(self) -> typing.Optional[bytes]:
  189. uuid_str = self._get_single_text(self._UUID_TAG_KEY)
  190. return uuid_str_to_bytes(uuid_str) if uuid_str else None
  191. def set_track_uuid(self, uuid):
  192. raise NotImplementedError()
  193. def get_free_int(self, tag_label):
  194. raise NotImplementedError()
  195. def set_free_int(self, tag_label, data):
  196. raise NotImplementedError()
  197. def get_free_ints(self, tag_label_prefix):
  198. raise NotImplementedError()