import abc import re import typing import mutagen.id3 import mutagen.mp4 from symuid._uuid import uuid_bytes_to_str, uuid_str_to_bytes class TagInterface(abc.ABC): @abc.abstractproperty def track_path(self): pass @abc.abstractmethod def get_comment(self): pass @abc.abstractmethod def set_comment(self, comment): pass @abc.abstractmethod def get_track_uuid(self): pass @abc.abstractmethod def set_track_uuid(self, uuid): pass @abc.abstractmethod def save(self): pass @abc.abstractmethod def get_free_int(self, tag_label): pass @abc.abstractmethod def set_free_int(self, tag_label, data): pass @abc.abstractmethod def get_free_ints(self, tag_label_prefix): pass class _MutagenTagInterface(TagInterface): # pylint: disable=abstract-method def __init__(self, mutagen_file): self._mutagen_file = mutagen_file @property def track_path(self): return self._mutagen_file.filename def save(self): self._mutagen_file.save() class ID3(_MutagenTagInterface): # http://id3.org/id3v2.4.0-frames#4.1. _UFID_OWNER_ID = "symuid" def __init__(self, mutagen_file): assert isinstance(mutagen_file.tags, mutagen.id3.ID3), mutagen_file.tags super().__init__(mutagen_file) def _get_str(self, tag_label) -> str: tag = self._mutagen_file.tags.get(tag_label, None) if tag is None: raise KeyError(tag_label) if len(tag.text) == 1: return tag.text[0] raise ValueError(tag) def _get_free_str(self, tag_label) -> str: return self._get_str("TXXX:" + tag_label) def get_free_ints(self, tag_label_prefix): for tag in self._mutagen_file.tags.getall("TXXX:" + tag_label_prefix): assert len(tag.text) == 1, tag yield (tag.desc, int(tag.text[0])) def get_free_int(self, tag_label) -> typing.Optional[int]: try: return int(self._get_free_str(tag_label)) except KeyError: return None def get_free_int_ratio(self, tag_label) -> typing.Tuple[int, int]: nominator, denominator = map(int, self._get_free_str(tag_label).split("/")) return (nominator, denominator) def _set_free_str(self, tag_label: str, string: str) -> mutagen.id3.TXXX: # mutagen.id3._specs.EncodedTextSpec.write encodes 'desc' and 'text' tag = mutagen.id3.TXXX( encoding=mutagen.id3.Encoding.LATIN1, desc=tag_label, text=[string], ) # TODO overwrite instead of add() ? self._mutagen_file.tags.add(tag) return tag def set_free_int(self, tag_label: str, data: str) -> mutagen.id3.TXXX: return self._set_free_str(tag_label=tag_label, string=str(data)) def set_free_int_ratio( self, tag_label, numerator: int, denominator: int ) -> mutagen.id3.TXXX: return self._set_free_str( tag_label=tag_label, string="{}/{}".format(numerator, denominator) ) def get_comment(self) -> typing.Optional[str]: try: return self._get_str("COMM::eng") except KeyError: return None def set_comment(self, comment) -> mutagen.id3.COMM: tag = mutagen.id3.COMM( encoding=mutagen.id3.Encoding.UTF8, lang="eng", text=[comment], ) self._mutagen_file.tags.add(tag) return tag def get_track_uuid(self) -> typing.Optional: for ufid in self._mutagen_file.tags.getall("UFID"): if ufid.owner == self._UFID_OWNER_ID: return ufid.data return None def set_track_uuid(self, uuid) -> mutagen.id3.UFID: # mutagen.id3._specs.EncodedTextSpec.write encodes 'owner' tag = mutagen.id3.UFID(owner=self._UFID_OWNER_ID, data=uuid) self._mutagen_file.tags.add(tag) return tag class MP4(_MutagenTagInterface): _UUID_TAG_KEY = "symuid:uuid" _COMMENT_TAG_KEY = "\xa9cmt" def __init__(self, mutagen_file): assert mutagen_file.tags is not None, mutagen_file assert isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags), mutagen_file.tags super().__init__(mutagen_file) @staticmethod def _freeform_to_int(freeform): # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes" assert freeform.dataformat == mutagen.mp4.AtomDataType.INTEGER, freeform return int.from_bytes(freeform, byteorder="big", signed=True) def get_free_ints(self, tag_label_prefix): label_pattern = re.compile( r"^----:{}(:|$)".format(re.escape(tag_label_prefix),) ) for label, values in self._mutagen_file.tags.items(): # TODO overwrite instead of add() ? if label_pattern.match(label): assert len(values) == 1, (label, values) value = MP4._freeform_to_int(values[0]) yield (re.sub(r"^----:", "", label), value) def _get_free(self, tag_label) -> typing.List[mutagen.mp4.MP4FreeForm]: # freeform keys start with '----' # http://mutagen.readthedocs.io/en/latest/api/mp4.html tag = self._mutagen_file.tags.get("----:" + tag_label, None) if not tag: raise KeyError(tag_label) return tag def _get_free_ints(self, tag_label) -> typing.Tuple[int]: return tuple(self._freeform_to_int(i) for i in self._get_free(tag_label)) def get_free_int(self, tag_label): try: ints = self._get_free_ints(tag_label) except KeyError: return None if len(ints) != 1: raise ValueError((tag_label, ints)) return ints[0] def get_free_int_ratio(self, tag_label) -> typing.Tuple[int, int]: ints = self._get_free_ints(tag_label) if len(ints) != 2: raise ValueError((tag_label, ints)) return ints def _get_free_uuid(self, tag_label) -> mutagen.mp4.MP4FreeForm: (tag,) = self._get_free(tag_label) if tag.dataformat != mutagen.mp4.AtomDataType.UUID: raise ValueError(tag) return tag def _set_free(self, tag_label, dataformat, data): assert isinstance(data, bytes) tag = mutagen.mp4.MP4FreeForm(dataformat=dataformat, data=data) self._mutagen_file.tags["----:" + tag_label] = [tag] return tag @staticmethod def _int_to_freeform_data(integer: int) -> bytes: # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes" # TODO exclude 5-7, 9- byte_len = max((integer.bit_length() + 8) // 8, 1) return integer.to_bytes(byte_len, byteorder="big", signed=True) def set_free_int(self, tag_label, data): assert isinstance(data, int) return self._set_free( tag_label=tag_label, dataformat=mutagen.mp4.AtomDataType.INTEGER, data=self._int_to_freeform_data(data), ) def set_free_int_ratio( self, tag_label, numerator: int, denominator: int ) -> typing.List[mutagen.mp4.MP4FreeForm]: tag = [ mutagen.mp4.MP4FreeForm( dataformat=mutagen.mp4.AtomDataType.INTEGER, data=self._int_to_freeform_data(i), ) for i in (numerator, denominator) ] self._mutagen_file.tags["----:" + tag_label] = tag return tag def _set_free_uuid(self, tag_label, data): return self._set_free( tag_label=tag_label, # https://mutagen.readthedocs.io/en/latest/api/mp4.html#mutagen.mp4.AtomDataType.UUID dataformat=mutagen.mp4.AtomDataType.UUID, data=data, ) def get_comment(self): tag = self._mutagen_file.tags.get(self._COMMENT_TAG_KEY, None) if tag is None: return None if len(tag) == 1: return tag[0] raise ValueError(tag) def set_comment(self, comment: str) -> None: self._mutagen_file[self._COMMENT_TAG_KEY] = [comment] def get_track_uuid(self): try: return self._get_free_uuid(self._UUID_TAG_KEY) except KeyError: return None def set_track_uuid(self, uuid): return self._set_free_uuid(self._UUID_TAG_KEY, uuid) class Ogg(_MutagenTagInterface): # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/opus.c#L229 # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/vorbis.c#L319 # https://github.com/cmus/cmus/blob/17bf542c6b120d9dcf6642b259d78badfc1143eb/comment.c#L224 _COMMENT_TAG_KEY = "comment" _UUID_TAG_KEY = "symuid:uuid" def __init__(self, mutagen_file): assert isinstance( mutagen_file.tags, (mutagen.oggopus.OggOpusVComment, mutagen.oggvorbis.OggVCommentDict), ), (type(mutagen_file), type(mutagen_file.tags)) super().__init__(mutagen_file) def _get_single_text(self, tag_label) -> typing.Optional[str]: tag = self._mutagen_file.get(tag_label, None) # type: list if tag is None: return None if len(tag) > 1: raise ValueError((self.track_path, tag)) if not isinstance(tag[0], str): raise ValueError((self.track_path, tag)) return tag[0] def get_comment(self) -> typing.Optional[str]: return self._get_single_text(self._COMMENT_TAG_KEY) def set_comment(self, comment: str) -> None: self._mutagen_file[self._COMMENT_TAG_KEY] = comment def get_track_uuid(self) -> typing.Optional[bytes]: uuid_str = self._get_single_text(self._UUID_TAG_KEY) return uuid_str_to_bytes(uuid_str) if uuid_str else None def set_track_uuid(self, uuid: bytes) -> None: self._mutagen_file[self._UUID_TAG_KEY] = uuid_bytes_to_str(uuid) def get_free_int(self, tag_label: str) -> typing.Optional[int]: dec = self._get_single_text(tag_label) return int(dec) if dec else None def get_free_ints( self, tag_label_prefix: str ) -> typing.Iterator[typing.Tuple[str, int]]: for tag_key, tag_value in self._mutagen_file.items(): if tag_key == tag_label_prefix or tag_key.startswith( tag_label_prefix + ":" ): if len(tag_value) > 1: raise ValueError((self.track_path, tag_key, tag_value)) yield (tag_key, int(tag_value[0])) def set_free_int(self, tag_label: str, data: int) -> typing.Tuple: self._mutagen_file[tag_label] = str(data) return (tag_label, self._mutagen_file[tag_label])