123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- import abc
- import re
- import typing
- import mutagen.id3
- import mutagen.mp4
- from symuid._uuid import uuid_bytes_to_str, uuid_str_to_bytes
- class TagInterface(abc.ABC):
- @abc.abstractproperty
- def track_path(self):
- pass
- @abc.abstractmethod
- def get_comment(self):
- pass
- @abc.abstractmethod
- def set_comment(self, comment):
- pass
- @abc.abstractmethod
- def get_track_uuid(self):
- pass
- @abc.abstractmethod
- def set_track_uuid(self, uuid):
- pass
- @abc.abstractmethod
- def save(self):
- pass
- @abc.abstractmethod
- def get_free_int(self, tag_label):
- pass
- @abc.abstractmethod
- def set_free_int(self, tag_label, data):
- pass
- @abc.abstractmethod
- def get_free_ints(self, tag_label_prefix):
- pass
- class _MutagenTagInterface(TagInterface):
- # pylint: disable=abstract-method
- def __init__(self, mutagen_file):
- self._mutagen_file = mutagen_file
- @property
- def track_path(self):
- return self._mutagen_file.filename
- def save(self):
- self._mutagen_file.save()
- class ID3(_MutagenTagInterface):
- # http://id3.org/id3v2.4.0-frames#4.1.
- _UFID_OWNER_ID = "symuid"
- def __init__(self, mutagen_file):
- assert isinstance(mutagen_file.tags, mutagen.id3.ID3), mutagen_file.tags
- super().__init__(mutagen_file)
- def _get_str(self, tag_label) -> str:
- tag = self._mutagen_file.tags.get(tag_label, None)
- if tag is None:
- raise KeyError(tag_label)
- if len(tag.text) == 1:
- return tag.text[0]
- raise ValueError(tag)
- def _get_free_str(self, tag_label) -> str:
- return self._get_str("TXXX:" + tag_label)
- def get_free_ints(self, tag_label_prefix):
- for tag in self._mutagen_file.tags.getall("TXXX:" + tag_label_prefix):
- assert len(tag.text) == 1, tag
- yield (tag.desc, int(tag.text[0]))
- def get_free_int(self, tag_label) -> typing.Optional[int]:
- try:
- return int(self._get_free_str(tag_label))
- except KeyError:
- return None
- def get_free_int_ratio(self, tag_label) -> typing.Tuple[int, int]:
- nominator, denominator = map(int, self._get_free_str(tag_label).split("/"))
- return (nominator, denominator)
- def _set_free_str(self, tag_label: str, string: str) -> mutagen.id3.TXXX:
- # mutagen.id3._specs.EncodedTextSpec.write encodes 'desc' and 'text'
- tag = mutagen.id3.TXXX(
- encoding=mutagen.id3.Encoding.LATIN1,
- desc=tag_label,
- text=[string],
- )
- # TODO overwrite instead of add() ?
- self._mutagen_file.tags.add(tag)
- return tag
- def set_free_int(self, tag_label: str, data: str) -> mutagen.id3.TXXX:
- return self._set_free_str(tag_label=tag_label, string=str(data))
- def set_free_int_ratio(
- self, tag_label, numerator: int, denominator: int
- ) -> mutagen.id3.TXXX:
- return self._set_free_str(
- tag_label=tag_label, string="{}/{}".format(numerator, denominator)
- )
- def get_comment(self) -> typing.Optional[str]:
- try:
- return self._get_str("COMM::eng")
- except KeyError:
- return None
- def set_comment(self, comment) -> mutagen.id3.COMM:
- tag = mutagen.id3.COMM(
- encoding=mutagen.id3.Encoding.UTF8,
- lang="eng",
- text=[comment],
- )
- self._mutagen_file.tags.add(tag)
- return tag
- def get_track_uuid(self) -> typing.Optional:
- for ufid in self._mutagen_file.tags.getall("UFID"):
- if ufid.owner == self._UFID_OWNER_ID:
- return ufid.data
- return None
- def set_track_uuid(self, uuid) -> mutagen.id3.UFID:
- # mutagen.id3._specs.EncodedTextSpec.write encodes 'owner'
- tag = mutagen.id3.UFID(owner=self._UFID_OWNER_ID, data=uuid)
- self._mutagen_file.tags.add(tag)
- return tag
- class MP4(_MutagenTagInterface):
- _UUID_TAG_KEY = "symuid:uuid"
- _COMMENT_TAG_KEY = "\xa9cmt"
- def __init__(self, mutagen_file):
- assert mutagen_file.tags is not None, mutagen_file
- assert isinstance(mutagen_file.tags, mutagen.mp4.MP4Tags), mutagen_file.tags
- super().__init__(mutagen_file)
- @staticmethod
- def _freeform_to_int(freeform):
- # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes"
- assert freeform.dataformat == mutagen.mp4.AtomDataType.INTEGER, freeform
- return int.from_bytes(freeform, byteorder="big", signed=True)
- def get_free_ints(self, tag_label_prefix):
- label_pattern = re.compile(r"^----:{}(:|$)".format(re.escape(tag_label_prefix)))
- for label, values in self._mutagen_file.tags.items():
- # TODO overwrite instead of add() ?
- if label_pattern.match(label):
- assert len(values) == 1, (label, values)
- value = MP4._freeform_to_int(values[0])
- yield (re.sub(r"^----:", "", label), value)
- def _get_free(self, tag_label) -> typing.List[mutagen.mp4.MP4FreeForm]:
- # freeform keys start with '----'
- # http://mutagen.readthedocs.io/en/latest/api/mp4.html
- tag = self._mutagen_file.tags.get("----:" + tag_label, None)
- if not tag:
- raise KeyError(tag_label)
- return tag
- def _get_free_ints(self, tag_label) -> typing.Tuple[int]:
- return tuple(self._freeform_to_int(i) for i in self._get_free(tag_label))
- def get_free_int(self, tag_label):
- try:
- ints = self._get_free_ints(tag_label)
- except KeyError:
- return None
- if len(ints) != 1:
- raise ValueError((tag_label, ints))
- return ints[0]
- def get_free_int_ratio(self, tag_label) -> typing.Tuple[int, int]:
- ints = self._get_free_ints(tag_label)
- if len(ints) != 2:
- raise ValueError((tag_label, ints))
- return ints
- def _get_free_uuid(self, tag_label) -> mutagen.mp4.MP4FreeForm:
- (tag,) = self._get_free(tag_label)
- if tag.dataformat != mutagen.mp4.AtomDataType.UUID:
- raise ValueError(tag)
- return tag
- def _set_free(self, tag_label, dataformat, data):
- assert isinstance(data, bytes)
- tag = mutagen.mp4.MP4FreeForm(dataformat=dataformat, data=data)
- self._mutagen_file.tags["----:" + tag_label] = [tag]
- return tag
- @staticmethod
- def _int_to_freeform_data(integer: int) -> bytes:
- # "a signed big-endian integer with length one of { 1,2,3,4,8 } bytes"
- # TODO exclude 5-7, 9-
- byte_len = max((integer.bit_length() + 8) // 8, 1)
- return integer.to_bytes(byte_len, byteorder="big", signed=True)
- def set_free_int(self, tag_label, data):
- assert isinstance(data, int)
- return self._set_free(
- tag_label=tag_label,
- dataformat=mutagen.mp4.AtomDataType.INTEGER,
- data=self._int_to_freeform_data(data),
- )
- def set_free_int_ratio(
- self, tag_label, numerator: int, denominator: int
- ) -> typing.List[mutagen.mp4.MP4FreeForm]:
- tag = [
- mutagen.mp4.MP4FreeForm(
- dataformat=mutagen.mp4.AtomDataType.INTEGER,
- data=self._int_to_freeform_data(i),
- )
- for i in (numerator, denominator)
- ]
- self._mutagen_file.tags["----:" + tag_label] = tag
- return tag
- def _set_free_uuid(self, tag_label, data):
- return self._set_free(
- tag_label=tag_label,
- # https://mutagen.readthedocs.io/en/latest/api/mp4.html#mutagen.mp4.AtomDataType.UUID
- dataformat=mutagen.mp4.AtomDataType.UUID,
- data=data,
- )
- def get_comment(self):
- tag = self._mutagen_file.tags.get(self._COMMENT_TAG_KEY, None)
- if tag is None:
- return None
- if len(tag) == 1:
- return tag[0]
- raise ValueError(tag)
- def set_comment(self, comment: str) -> None:
- self._mutagen_file[self._COMMENT_TAG_KEY] = [comment]
- def get_track_uuid(self):
- try:
- return self._get_free_uuid(self._UUID_TAG_KEY)
- except KeyError:
- return None
- def set_track_uuid(self, uuid):
- return self._set_free_uuid(self._UUID_TAG_KEY, uuid)
- class Ogg(_MutagenTagInterface):
- # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/opus.c#L229
- # https://github.com/cmus/cmus/blob/9a0723f7a90dc7de0898be87963d5105a999aa6c/ip/vorbis.c#L319
- # https://github.com/cmus/cmus/blob/17bf542c6b120d9dcf6642b259d78badfc1143eb/comment.c#L224
- _COMMENT_TAG_KEY = "comment"
- _UUID_TAG_KEY = "symuid:uuid"
- def __init__(self, mutagen_file):
- assert isinstance(
- mutagen_file.tags,
- (mutagen.oggopus.OggOpusVComment, mutagen.oggvorbis.OggVCommentDict),
- ), (type(mutagen_file), type(mutagen_file.tags))
- super().__init__(mutagen_file)
- def _get_single_text(self, tag_label) -> typing.Optional[str]:
- tag = self._mutagen_file.get(tag_label, None) # type: list
- if tag is None:
- return None
- if len(tag) > 1:
- raise ValueError((self.track_path, tag))
- if not isinstance(tag[0], str):
- raise ValueError((self.track_path, tag))
- return tag[0]
- def get_comment(self) -> typing.Optional[str]:
- return self._get_single_text(self._COMMENT_TAG_KEY)
- def set_comment(self, comment: str) -> None:
- self._mutagen_file[self._COMMENT_TAG_KEY] = comment
- def get_track_uuid(self) -> typing.Optional[bytes]:
- uuid_str = self._get_single_text(self._UUID_TAG_KEY)
- return uuid_str_to_bytes(uuid_str) if uuid_str else None
- def set_track_uuid(self, uuid: bytes) -> None:
- self._mutagen_file[self._UUID_TAG_KEY] = uuid_bytes_to_str(uuid)
- def get_free_int(self, tag_label: str) -> typing.Optional[int]:
- dec = self._get_single_text(tag_label)
- return int(dec) if dec else None
- def get_free_ints(
- self, tag_label_prefix: str
- ) -> typing.Iterator[typing.Tuple[str, int]]:
- for tag_key, tag_value in self._mutagen_file.items():
- if tag_key == tag_label_prefix or tag_key.startswith(
- tag_label_prefix + ":"
- ):
- if len(tag_value) > 1:
- raise ValueError((self.track_path, tag_key, tag_value))
- yield (tag_key, int(tag_value[0]))
- def set_free_int(self, tag_label: str, data: int) -> typing.Tuple:
- self._mutagen_file[tag_label] = str(data)
- return (tag_label, self._mutagen_file[tag_label])
|