123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398 |
- """
- A partial parser for WebVTT segments. Interprets enough of the WebVTT stream
- to be able to assemble a single stand-alone subtitle file, suitably adjusting
- timestamps on the way, while everything else is passed through unmodified.
- Regular expressions based on the W3C WebVTT specification
- <https://www.w3.org/TR/webvtt1/>. The X-TIMESTAMP-MAP extension is described
- in RFC 8216 §3.5 <https://tools.ietf.org/html/rfc8216#section-3.5>.
- """
- import io
- import re
- from .utils import int_or_none, timetuple_from_msec
- class _MatchParser:
- """
- An object that maintains the current parsing position and allows
- conveniently advancing it as syntax elements are successfully parsed.
- """
- def __init__(self, string):
- self._data = string
- self._pos = 0
- def match(self, r):
- if isinstance(r, re.Pattern):
- return r.match(self._data, self._pos)
- if isinstance(r, str):
- if self._data.startswith(r, self._pos):
- return len(r)
- return None
- raise ValueError(r)
- def advance(self, by):
- if by is None:
- amt = 0
- elif isinstance(by, re.Match):
- amt = len(by.group(0))
- elif isinstance(by, str):
- amt = len(by)
- elif isinstance(by, int):
- amt = by
- else:
- raise ValueError(by)
- self._pos += amt
- return by
- def consume(self, r):
- return self.advance(self.match(r))
- def child(self):
- return _MatchChildParser(self)
- class _MatchChildParser(_MatchParser):
- """
- A child parser state, which advances through the same data as
- its parent, but has an independent position. This is useful when
- advancing through syntax elements we might later want to backtrack
- from.
- """
- def __init__(self, parent):
- super().__init__(parent._data)
- self.__parent = parent
- self._pos = parent._pos
- def commit(self):
- """
- Advance the parent state to the current position of this child state.
- """
- self.__parent._pos = self._pos
- return self.__parent
- class ParseError(Exception):
- def __init__(self, parser):
- data = parser._data[parser._pos:parser._pos + 100]
- super().__init__(f'Parse error at position {parser._pos} (near {data!r})')
- # While the specification <https://www.w3.org/TR/webvtt1/#webvtt-timestamp>
- # prescribes that hours must be *2 or more* digits, timestamps with a single
- # digit for the hour part has been seen in the wild.
- # See https://github.com/yt-dlp/yt-dlp/issues/921
- _REGEX_TS = re.compile(r'''(?x)
- (?:([0-9]{1,}):)?
- ([0-9]{2}):
- ([0-9]{2})\.
- ([0-9]{3})?
- ''')
- _REGEX_EOF = re.compile(r'\Z')
- _REGEX_NL = re.compile(r'(?:\r\n|[\r\n]|$)')
- _REGEX_BLANK = re.compile(r'(?:\r\n|[\r\n])+')
- _REGEX_OPTIONAL_WHITESPACE = re.compile(r'[ \t]*')
- def _parse_ts(ts):
- """
- Convert a parsed WebVTT timestamp (a re.Match obtained from _REGEX_TS)
- into an MPEG PES timestamp: a tick counter at 90 kHz resolution.
- """
- return 90 * sum(
- int(part or 0) * mult for part, mult in zip(ts.groups(), (3600_000, 60_000, 1000, 1)))
- def _format_ts(ts):
- """
- Convert an MPEG PES timestamp into a WebVTT timestamp.
- This will lose sub-millisecond precision.
- """
- return '%02u:%02u:%02u.%03u' % timetuple_from_msec(int((ts + 45) // 90))
- class Block:
- """
- An abstract WebVTT block.
- """
- def __init__(self, **kwargs):
- for key, val in kwargs.items():
- setattr(self, key, val)
- @classmethod
- def parse(cls, parser):
- m = parser.match(cls._REGEX)
- if not m:
- return None
- parser.advance(m)
- return cls(raw=m.group(0))
- def write_into(self, stream):
- stream.write(self.raw)
- class HeaderBlock(Block):
- """
- A WebVTT block that may only appear in the header part of the file,
- i.e. before any cue blocks.
- """
- pass
- class Magic(HeaderBlock):
- _REGEX = re.compile(r'\ufeff?WEBVTT([ \t][^\r\n]*)?(?:\r\n|[\r\n])')
- # XXX: The X-TIMESTAMP-MAP extension is described in RFC 8216 §3.5
- # <https://tools.ietf.org/html/rfc8216#section-3.5>, but the RFC
- # doesn't specify the exact grammar nor where in the WebVTT
- # syntax it should be placed; the below has been devised based
- # on usage in the wild
- #
- # And strictly speaking, the presence of this extension violates
- # the W3C WebVTT spec. Oh well.
- _REGEX_TSMAP = re.compile(r'X-TIMESTAMP-MAP=')
- _REGEX_TSMAP_LOCAL = re.compile(r'LOCAL:')
- _REGEX_TSMAP_MPEGTS = re.compile(r'MPEGTS:([0-9]+)')
- _REGEX_TSMAP_SEP = re.compile(r'[ \t]*,[ \t]*')
- # This was removed from the spec in the 2017 revision;
- # the last spec draft to describe this syntax element is
- # <https://www.w3.org/TR/2015/WD-webvtt1-20151208/#webvtt-metadata-header>.
- # Nevertheless, YouTube keeps serving those
- _REGEX_META = re.compile(r'(?:(?!-->)[^\r\n])+:(?:(?!-->)[^\r\n])+(?:\r\n|[\r\n])')
- @classmethod
- def __parse_tsmap(cls, parser):
- parser = parser.child()
- while True:
- m = parser.consume(cls._REGEX_TSMAP_LOCAL)
- if m:
- m = parser.consume(_REGEX_TS)
- if m is None:
- raise ParseError(parser)
- local = _parse_ts(m)
- if local is None:
- raise ParseError(parser)
- else:
- m = parser.consume(cls._REGEX_TSMAP_MPEGTS)
- if m:
- mpegts = int_or_none(m.group(1))
- if mpegts is None:
- raise ParseError(parser)
- else:
- raise ParseError(parser)
- if parser.consume(cls._REGEX_TSMAP_SEP):
- continue
- if parser.consume(_REGEX_NL):
- break
- raise ParseError(parser)
- parser.commit()
- return local, mpegts
- @classmethod
- def parse(cls, parser):
- parser = parser.child()
- m = parser.consume(cls._REGEX)
- if not m:
- raise ParseError(parser)
- extra = m.group(1)
- local, mpegts, meta = None, None, ''
- while not parser.consume(_REGEX_NL):
- if parser.consume(cls._REGEX_TSMAP):
- local, mpegts = cls.__parse_tsmap(parser)
- continue
- m = parser.consume(cls._REGEX_META)
- if m:
- meta += m.group(0)
- continue
- raise ParseError(parser)
- parser.commit()
- return cls(extra=extra, mpegts=mpegts, local=local, meta=meta)
- def write_into(self, stream):
- stream.write('WEBVTT')
- if self.extra is not None:
- stream.write(self.extra)
- stream.write('\n')
- if self.local or self.mpegts:
- stream.write('X-TIMESTAMP-MAP=LOCAL:')
- stream.write(_format_ts(self.local if self.local is not None else 0))
- stream.write(',MPEGTS:')
- stream.write(str(self.mpegts if self.mpegts is not None else 0))
- stream.write('\n')
- if self.meta:
- stream.write(self.meta)
- stream.write('\n')
- class StyleBlock(HeaderBlock):
- _REGEX = re.compile(r'''(?x)
- STYLE[\ \t]*(?:\r\n|[\r\n])
- ((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
- (?:\r\n|[\r\n])
- ''')
- class RegionBlock(HeaderBlock):
- _REGEX = re.compile(r'''(?x)
- REGION[\ \t]*
- ((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
- (?:\r\n|[\r\n])
- ''')
- class CommentBlock(Block):
- _REGEX = re.compile(r'''(?x)
- NOTE(?:\r\n|[\ \t\r\n])
- ((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
- (?:\r\n|[\r\n])
- ''')
- class CueBlock(Block):
- """
- A cue block. The payload is not interpreted.
- """
- _REGEX_ID = re.compile(r'((?:(?!-->)[^\r\n])+)(?:\r\n|[\r\n])')
- _REGEX_ARROW = re.compile(r'[ \t]+-->[ \t]+')
- _REGEX_SETTINGS = re.compile(r'[ \t]+((?:(?!-->)[^\r\n])+)')
- _REGEX_PAYLOAD = re.compile(r'[^\r\n]+(?:\r\n|[\r\n])?')
- @classmethod
- def parse(cls, parser):
- parser = parser.child()
- id_ = None
- m = parser.consume(cls._REGEX_ID)
- if m:
- id_ = m.group(1)
- m0 = parser.consume(_REGEX_TS)
- if not m0:
- return None
- if not parser.consume(cls._REGEX_ARROW):
- return None
- m1 = parser.consume(_REGEX_TS)
- if not m1:
- return None
- m2 = parser.consume(cls._REGEX_SETTINGS)
- parser.consume(_REGEX_OPTIONAL_WHITESPACE)
- if not parser.consume(_REGEX_NL):
- return None
- start = _parse_ts(m0)
- end = _parse_ts(m1)
- settings = m2.group(1) if m2 is not None else None
- text = io.StringIO()
- while True:
- m = parser.consume(cls._REGEX_PAYLOAD)
- if not m:
- break
- text.write(m.group(0))
- parser.commit()
- return cls(
- id=id_,
- start=start, end=end, settings=settings,
- text=text.getvalue(),
- )
- def write_into(self, stream):
- if self.id is not None:
- stream.write(self.id)
- stream.write('\n')
- stream.write(_format_ts(self.start))
- stream.write(' --> ')
- stream.write(_format_ts(self.end))
- if self.settings is not None:
- stream.write(' ')
- stream.write(self.settings)
- stream.write('\n')
- stream.write(self.text)
- stream.write('\n')
- @property
- def as_json(self):
- return {
- 'id': self.id,
- 'start': self.start,
- 'end': self.end,
- 'text': self.text,
- 'settings': self.settings,
- }
- def __eq__(self, other):
- return self.as_json == other.as_json
- @classmethod
- def from_json(cls, json):
- return cls(
- id=json['id'],
- start=json['start'],
- end=json['end'],
- text=json['text'],
- settings=json['settings'],
- )
- def hinges(self, other):
- if self.text != other.text:
- return False
- if self.settings != other.settings:
- return False
- return self.start <= self.end == other.start <= other.end
- def parse_fragment(frag_content):
- """
- A generator that yields (partially) parsed WebVTT blocks when given
- a bytes object containing the raw contents of a WebVTT file.
- """
- parser = _MatchParser(frag_content.decode())
- yield Magic.parse(parser)
- while not parser.match(_REGEX_EOF):
- if parser.consume(_REGEX_BLANK):
- continue
- block = RegionBlock.parse(parser)
- if block:
- yield block
- continue
- block = StyleBlock.parse(parser)
- if block:
- yield block
- continue
- block = CommentBlock.parse(parser)
- if block:
- yield block # XXX: or skip
- continue
- break
- while not parser.match(_REGEX_EOF):
- if parser.consume(_REGEX_BLANK):
- continue
- block = CommentBlock.parse(parser)
- if block:
- yield block # XXX: or skip
- continue
- block = CueBlock.parse(parser)
- if block:
- yield block
- continue
- raise ParseError(parser)
|