Combine StreamBuffer into SegmentBuffer in stream (#51041)

* Combine StreamBuffer into SegmentBuffer in stream

* Use new style type hint in comment
Remove unused member self._segment

* Change reset_av to static helper function

* Change make_new_av to only return OutputContainer
This commit is contained in:
uvjustin 2021-05-25 13:57:07 +08:00 committed by GitHub
parent 0fb2504e0c
commit 2eb87b8806
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 63 deletions

View file

@ -4,7 +4,7 @@ from __future__ import annotations
import asyncio
from collections import deque
import io
from typing import TYPE_CHECKING, Callable
from typing import Callable
from aiohttp import web
import attr
@ -16,23 +16,9 @@ from homeassistant.util.decorator import Registry
from .const import ATTR_STREAMS, DOMAIN
if TYPE_CHECKING:
import av.container
import av.video
PROVIDERS = Registry()
@attr.s
class StreamBuffer:
"""Represent a segment."""
segment: io.BytesIO = attr.ib()
output: av.container.OutputContainer = attr.ib()
vstream: av.video.VideoStream = attr.ib()
astream = attr.ib(default=None) # type=Optional[av.audio.AudioStream]
@attr.s
class Segment:
"""Represent a segment."""

View file

@ -2,8 +2,9 @@
from __future__ import annotations
from collections import deque
import io
from io import BytesIO
import logging
from typing import cast
import av
@ -17,38 +18,11 @@ from .const import (
SEGMENT_CONTAINER_FORMAT,
STREAM_TIMEOUT,
)
from .core import Segment, StreamBuffer, StreamOutput
from .core import Segment, StreamOutput
_LOGGER = logging.getLogger(__name__)
def create_stream_buffer(video_stream, audio_stream, sequence):
"""Create a new StreamBuffer."""
segment = io.BytesIO()
container_options = {
# Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970
"movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets",
"avoid_negative_ts": "disabled",
"fragment_index": str(sequence),
}
output = av.open(
segment,
mode="w",
format=SEGMENT_CONTAINER_FORMAT,
container_options={
"video_track_timescale": str(int(1 / video_stream.time_base)),
**container_options,
},
)
vstream = output.add_stream(template=video_stream)
# Check if audio is requested
astream = None
if audio_stream and audio_stream.name in AUDIO_CODECS:
astream = output.add_stream(template=audio_stream)
return StreamBuffer(segment, output, vstream, astream)
class SegmentBuffer:
"""Buffer for writing a sequence of packets to the output as a segment."""
@ -61,12 +35,41 @@ class SegmentBuffer:
self._outputs: list[StreamOutput] = []
self._sequence = 0
self._segment_start_pts = None
self._stream_buffer = None
self._memory_file: BytesIO = cast(BytesIO, None)
self._av_output: av.container.OutputContainer = None
self._input_video_stream: av.video.VideoStream = None
self._input_audio_stream = None # av.audio.AudioStream | None
self._output_video_stream: av.video.VideoStream = None
self._output_audio_stream = None # av.audio.AudioStream | None
def set_streams(self, video_stream, audio_stream):
@staticmethod
def make_new_av(
memory_file, sequence: int, input_vstream: av.video.VideoStream
) -> av.container.OutputContainer:
"""Make a new av OutputContainer."""
return av.open(
memory_file,
mode="w",
format=SEGMENT_CONTAINER_FORMAT,
container_options={
# Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970
# "cmaf" flag replaces several of the movflags used, but too recent to use for now
"movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer",
"avoid_negative_ts": "disabled",
"fragment_index": str(sequence + 1),
"video_track_timescale": str(int(1 / input_vstream.time_base)),
},
)
def set_streams(
self,
video_stream: av.video.VideoStream,
audio_stream,
# no type hint for audio_stream until https://github.com/PyAV-Org/PyAV/pull/775 is merged
) -> None:
"""Initialize output buffer with streams from container."""
self._video_stream = video_stream
self._audio_stream = audio_stream
self._input_video_stream = video_stream
self._input_audio_stream = audio_stream
def reset(self, video_pts):
"""Initialize a new stream segment."""
@ -77,15 +80,27 @@ class SegmentBuffer:
# Fetch the latest StreamOutputs, which may have changed since the
# worker started.
self._outputs = self._outputs_callback().values()
self._stream_buffer = create_stream_buffer(
self._video_stream, self._audio_stream, self._sequence
self._memory_file = BytesIO()
self._av_output = self.make_new_av(
memory_file=self._memory_file,
sequence=self._sequence,
input_vstream=self._input_video_stream,
)
self._output_video_stream = self._av_output.add_stream(
template=self._input_video_stream
)
# Check if audio is requested
self._output_audio_stream = None
if self._input_audio_stream and self._input_audio_stream.name in AUDIO_CODECS:
self._output_audio_stream = self._av_output.add_stream(
template=self._input_audio_stream
)
def mux_packet(self, packet):
"""Mux a packet to the appropriate StreamBuffers."""
"""Mux a packet to the appropriate output stream."""
# Check for end of segment
if packet.stream == self._video_stream and packet.is_keyframe:
if packet.stream == self._input_video_stream and packet.is_keyframe:
duration = (packet.pts - self._segment_start_pts) * packet.time_base
if duration >= MIN_SEGMENT_DURATION:
# Save segment to outputs
@ -95,19 +110,17 @@ class SegmentBuffer:
self.reset(packet.pts)
# Mux the packet
if packet.stream == self._video_stream:
packet.stream = self._stream_buffer.vstream
self._stream_buffer.output.mux(packet)
elif packet.stream == self._audio_stream:
packet.stream = self._stream_buffer.astream
self._stream_buffer.output.mux(packet)
if packet.stream == self._input_video_stream:
packet.stream = self._output_video_stream
self._av_output.mux(packet)
elif packet.stream == self._input_audio_stream:
packet.stream = self._output_audio_stream
self._av_output.mux(packet)
def flush(self, duration):
"""Create a segment from the buffered packets and write to output."""
self._stream_buffer.output.close()
segment = Segment(
self._sequence, self._stream_buffer.segment, duration, self._stream_id
)
self._av_output.close()
segment = Segment(self._sequence, self._memory_file, duration, self._stream_id)
for stream_output in self._outputs:
stream_output.put(segment)
@ -120,7 +133,7 @@ class SegmentBuffer:
def close(self):
"""Close stream buffer."""
self._stream_buffer.output.close()
self._av_output.close()
def stream_worker(source, options, segment_buffer, quit_event): # noqa: C901