import abc import re import typing import mutagen.id3 import mutagen.mp4 from symuid._uuid import uuid_bytes_to_str, uuid_str_to_bytes class TagInterface(abc.ABC): @abc.abstractproperty def track_path(self): pass @abc.abstractmethod def get_comment(self): pass @abc.abstractmethod def set_comment(self, comment): pass @abc.abstractmethod def get_track_uuid(self): pass @abc.abstractmethod def set_track_uuid(self, uuid): pass @abc.abstractmethod def save(self): pass @abc.abstractmethod def get_free_int(self, tag_label): pass @abc.abstractmethod def set_free_int(self, tag_label, data): pass @abc.abstractmethod def get_free_ints(self, tag_label_prefix): pass class _MutagenTagInterface(TagInterface): # pylint: disable=abstract-method def __init__(self, mutagen_file): self._mutagen_file = mutagen_file @property def track_path(self): return self._mutagen_file.filename def save(self): self._mutagen_file.save() class ID3(_MutagenTagInterface): # http://id3.org/id3v2.4.0-frames#4.1. _UFID_OWNER_ID = 'symuid' def __init__(self, mutagen_file): assert isinstance(mutagen_file.tags, mutagen.id3.ID3), \ mutagen_file.tags super().__init__(mutagen_file) def _get_single_text(self, tag_label): tag = self._mutagen_file.tags.get(tag_label, None) if tag is None: # {}.get('a') == None return None if len(tag.text) == 1: return tag.text[0] raise ValueError(tag) def get_free_ints(self, tag_label_prefix): for tag in self._mutagen_file.tags.getall('TXXX:' + tag_label_prefix): assert len(tag.text) == 1, tag yield (tag.desc, int(tag.text[0])) def get_free_int(self, tag_label): text = self._get_single_text('TXXX:' + tag_label) return int(text) if text else None def set_free_int(self, tag_label, data): # mutagen.id3._specs.EncodedTextSpec.write encodes 'desc' and 'text' tag = mutagen.id3.TXXX( encoding=mutagen.id3.Encoding.LATIN1, desc=tag_label, text=[str(data)], ) # TODO overwrite instead of add() ? self._mutagen_file.tags.add(tag) return tag def get_comment(self): return self._get_single_text('COMM::eng') def set_comment(self, comment): tag = mutagen.id3.COMM( encoding=mutagen.id3.Encoding.UTF8, lang='eng', text=[comment], ) self._mutagen_file.tags.add(tag) return tag def get_track_uuid(self): for ufid in self._mutagen_file.tags.getall('UFID'): if ufid.owner == self._UFID_OWNER_ID: return ufid.data return None def set_track_uuid(self, uuid): # mutagen.id3._specs.EncodedTextSpec.write encodes 'owner' tag = mutagen.id3.UFID(owner=self._UFID_OWNER_ID, data=uuid) self._mutagen_file.tags.add(tag) return tag class MP4(_MutagenTagInterface): _UUID_TAG_KEY = 'symuid:uuid' _COMMENT_TAG_KEY = '\xa9cmt' def __init__(self, mutagen_file): assert mutagen_file.tags is not None, mutagen_file assert isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags), \ mutagen_file.tags super().__init__(mutagen_file) def _get_single(self, tag_label): tag = self._mutagen_file.tags.get(tag_label, None) if tag is None: # {}.get('a') == None return None if len(tag) == 1: return tag[0] raise ValueError(tag) @staticmethod def _freeform_to_int(freeform): # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes" assert freeform.dataformat == mutagen.mp4.AtomDataType.INTEGER, freeform return int.from_bytes(freeform, byteorder='big', signed=True) def get_free_ints(self, tag_label_prefix): label_pattern = re.compile(r'^----:{}(:|$)'.format( re.escape(tag_label_prefix), )) for label, values in self._mutagen_file.tags.items(): # TODO overwrite instead of add() ? if label_pattern.match(label): assert len(values) == 1, (label, values) value = MP4._freeform_to_int(values[0]) yield (re.sub(r'^----:', '', label), value) def _get_free(self, tag_label): # freeform keys start with '----' # http://mutagen.readthedocs.io/en/latest/api/mp4.html return self._get_single('----:' + tag_label) def get_free_int(self, tag_label): tag = self._get_free(tag_label) return None if tag is None else MP4._freeform_to_int(tag) def _get_free_uuid(self, tag_label): tag = self._get_free(tag_label) assert tag is None or tag.dataformat == mutagen.mp4.AtomDataType.UUID, tag.dataformat return tag def _set_free(self, tag_label, dataformat, data): assert isinstance(data, bytes) tag = mutagen.mp4.MP4FreeForm(dataformat=dataformat, data=data) self._mutagen_file.tags['----:' + tag_label] = [tag] return tag def set_free_int(self, tag_label, data): assert isinstance(data, int) return self._set_free( tag_label=tag_label, # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes" dataformat=mutagen.mp4.AtomDataType.INTEGER, # TODO set byte length properly data=data.to_bytes(1, byteorder='big', signed=True), ) def _set_free_uuid(self, tag_label, data): return self._set_free( tag_label=tag_label, # https://mutagen.readthedocs.io/en/latest/api/mp4.html#mutagen.mp4.AtomDataType.UUID dataformat=mutagen.mp4.AtomDataType.UUID, data=data, ) def get_comment(self): return self._get_single(self._COMMENT_TAG_KEY) def set_comment(self, comment: str) -> None: self._mutagen_file[self._COMMENT_TAG_KEY] = [comment] def get_track_uuid(self): return self._get_free_uuid(self._UUID_TAG_KEY) def set_track_uuid(self, uuid): return self._set_free_uuid(self._UUID_TAG_KEY, uuid) class Ogg(_MutagenTagInterface): # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/opus.c#L229 # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/vorbis.c#L319 # https://github.com/cmus/cmus/blob/17bf542c6b120d9dcf6642b259d78badfc1143eb/comment.c#L224 _COMMENT_TAG_KEY = 'comment' _UUID_TAG_KEY = 'symuid:uuid' def __init__(self, mutagen_file): assert isinstance(mutagen_file.tags, (mutagen.oggopus.OggOpusVComment, mutagen.oggvorbis.OggVCommentDict)), \ (type(mutagen_file), type(mutagen_file.tags)) super().__init__(mutagen_file) def _get_single_text(self, tag_label) -> typing.Optional[str]: tag = self._mutagen_file.get(tag_label, None) # type: list if tag is None: return None if len(tag) > 1: raise ValueError((self.track_path, tag)) if not isinstance(tag[0], str): raise ValueError((self.track_path, tag)) return tag[0] def get_comment(self) -> typing.Optional[str]: return self._get_single_text(self._COMMENT_TAG_KEY) def set_comment(self, comment: str) -> None: self._mutagen_file[self._COMMENT_TAG_KEY] = comment def get_track_uuid(self) -> typing.Optional[bytes]: uuid_str = self._get_single_text(self._UUID_TAG_KEY) return uuid_str_to_bytes(uuid_str) if uuid_str else None def set_track_uuid(self, uuid: bytes) -> None: self._mutagen_file[self._UUID_TAG_KEY] = uuid_bytes_to_str(uuid) def get_free_int(self, tag_label: str) -> typing.Optional[int]: dec = self._get_single_text(tag_label) return int(dec) if dec else None def get_free_ints(self, tag_label_prefix: str) \ -> typing.Iterator[typing.Tuple[str, int]]: for tag_key, tag_value in self._mutagen_file.items(): if tag_key == tag_label_prefix \ or tag_key.startswith(tag_label_prefix + ':'): if len(tag_value) > 1: raise ValueError((self.track_path, tag_key, tag_value)) yield (tag_key, int(tag_value[0])) def set_free_int(self, tag_label: str, data: int) -> typing.Tuple: self._mutagen_file[tag_label] = str(data) return (tag_label, self._mutagen_file[tag_label])