Improve part metadata in stream (#58822)

This commit is contained in:
uvjustin 2021-11-01 11:23:01 +08:00 committed by GitHub
parent 7126c9b0de
commit 9aaa92f366
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 106 additions and 63 deletions

View file

@ -66,9 +66,15 @@ class SegmentBuffer:
memory_file: BytesIO, memory_file: BytesIO,
sequence: int, sequence: int,
input_vstream: av.video.VideoStream, input_vstream: av.video.VideoStream,
) -> av.container.OutputContainer: input_astream: av.audio.stream.AudioStream,
"""Make a new av OutputContainer.""" ) -> tuple[
return av.open( av.container.OutputContainer,
av.video.VideoStream,
av.audio.stream.AudioStream | None,
]:
"""Make a new av OutputContainer and add output streams."""
add_audio = input_astream and input_astream.name in AUDIO_CODECS
container = av.open(
memory_file, memory_file,
mode="w", mode="w",
format=SEGMENT_CONTAINER_FORMAT, format=SEGMENT_CONTAINER_FORMAT,
@ -93,19 +99,21 @@ class SegmentBuffer:
# Create a fragment every TARGET_PART_DURATION. The data from each fragment is stored in # Create a fragment every TARGET_PART_DURATION. The data from each fragment is stored in
# a "Part" that can be combined with the data from all the other "Part"s, plus an init # a "Part" that can be combined with the data from all the other "Part"s, plus an init
# section, to reconstitute the data in a "Segment". # section, to reconstitute the data in a "Segment".
# frag_duration is the threshold for determining part boundaries, and the dts of the last # The LL-HLS spec allows for a fragment's duration to be within the range [0.85x,1.0x]
# packet in the part should correspond to a duration that is smaller than this value. # of the part target duration. We use the frag_duration option to tell ffmpeg to try to
# However, as the part duration includes the duration of the last frame, the part duration # cut the fragments when they reach frag_duration. However, the resulting fragments can
# will be equal to or greater than this value. # have variability in their durations and can end up being too short or too long. If
# We previously scaled this number down by .85 to account for this while keeping within # there are two tracks, as in the case of a video feed with audio, the fragment cut seems
# the 15% variance allowed in part duration. However, this did not work when inputs had # to be done on the first track that crosses the desired threshold, and cutting on the
# an audio stream - sometimes the fragment would get cut on the audio packet, causing # audio track may result in a shorter video fragment than desired. Conversely, with a
# the durations to actually be to short. # video track with no audio, the discrete nature of frames means that the frame at the
# The current approach is to use this frag_duration for creating the media while # end of a fragment will sometimes extend slightly beyond the desired frag_duration.
# adjusting the metadata duration to keep the durations in the metadata below the # Given this, our approach is to use a frag_duration near the upper end of the range for
# part_target_duration threshold. # outputs with audio using a frag_duration at the lower end of the range for outputs with
# only video.
"frag_duration": str( "frag_duration": str(
self._stream_settings.part_target_duration * 1e6 self._stream_settings.part_target_duration
* (98e4 if add_audio else 9e5)
), ),
} }
if self._stream_settings.ll_hls if self._stream_settings.ll_hls
@ -113,6 +121,12 @@ class SegmentBuffer:
), ),
}, },
) )
output_vstream = container.add_stream(template=input_vstream)
# Check if audio is requested
output_astream = None
if add_audio:
output_astream = container.add_stream(template=input_astream)
return container, output_vstream, output_astream
def set_streams( def set_streams(
self, self,
@ -128,26 +142,22 @@ class SegmentBuffer:
"""Initialize a new stream segment.""" """Initialize a new stream segment."""
# Keep track of the number of segments we've processed # Keep track of the number of segments we've processed
self._sequence += 1 self._sequence += 1
self._segment_start_dts = video_dts self._part_start_dts = self._segment_start_dts = video_dts
self._segment = None self._segment = None
self._memory_file = BytesIO() self._memory_file = BytesIO()
self._memory_file_pos = 0 self._memory_file_pos = 0
self._av_output = self.make_new_av( (
self._av_output,
self._output_video_stream,
self._output_audio_stream,
) = self.make_new_av(
memory_file=self._memory_file, memory_file=self._memory_file,
sequence=self._sequence, sequence=self._sequence,
input_vstream=self._input_video_stream, input_vstream=self._input_video_stream,
) input_astream=self._input_audio_stream,
self._output_video_stream = self._av_output.add_stream(
template=self._input_video_stream
) )
if self._output_video_stream.name == "hevc": if self._output_video_stream.name == "hevc":
self._output_video_stream.codec_tag = "hvc1" self._output_video_stream.codec_tag = "hvc1"
# Check if audio is requested
self._output_audio_stream = None
if self._input_audio_stream and self._input_audio_stream.name in AUDIO_CODECS:
self._output_audio_stream = self._av_output.add_stream(
template=self._input_audio_stream
)
def mux_packet(self, packet: av.Packet) -> None: def mux_packet(self, packet: av.Packet) -> None:
"""Mux a packet to the appropriate output stream.""" """Mux a packet to the appropriate output stream."""
@ -186,13 +196,9 @@ class SegmentBuffer:
# Fetch the latest StreamOutputs, which may have changed since the # Fetch the latest StreamOutputs, which may have changed since the
# worker started. # worker started.
stream_outputs=self._outputs_callback().values(), stream_outputs=self._outputs_callback().values(),
start_time=self._start_time start_time=self._start_time,
+ datetime.timedelta(
seconds=float(self._segment_start_dts * packet.time_base)
),
) )
self._memory_file_pos = self._memory_file.tell() self._memory_file_pos = self._memory_file.tell()
self._part_start_dts = self._segment_start_dts
else: # These are the ends of the part segments else: # These are the ends of the part segments
self.flush(packet, last_part=False) self.flush(packet, last_part=False)
@ -201,17 +207,23 @@ class SegmentBuffer:
If last_part is True, also close the segment, give it a duration, If last_part is True, also close the segment, give it a duration,
and clean up the av_output and memory_file. and clean up the av_output and memory_file.
There are two different ways to enter this function, and when
last_part is True, packet has not yet been muxed, while when
last_part is False, the packet has already been muxed. However,
in both cases, packet is the next packet and is not included in
the Part.
This function writes the duration metadata for the Part and
for the Segment. However, as the fragmentation done by ffmpeg
may result in fragment durations which fall outside the
[0.85x,1.0x] tolerance band allowed by LL-HLS, we need to fudge
some durations a bit by reporting them as being within that
range.
Note that repeated adjustments may cause drift between the part
durations in the metadata and those in the media and result in
playback issues in some clients.
""" """
# In some cases using the current packet's dts (which is the start # Part durations should not exceed the part target duration
# dts of the next part) to calculate the part duration will result in a adjusted_dts = min(
# value which exceeds the part_target_duration. This can muck up the
# duration of both this part and the next part. An easy fix is to just
# use the current packet dts and cap it by the part target duration.
# The adjustment may cause a drift between this adjusted duration
# (used in the metadata) and the media duration, but the drift should be
# automatically corrected when the part duration cleanly divides the
# framerate.
current_dts = min(
packet.dts, packet.dts,
self._part_start_dts self._part_start_dts
+ self._stream_settings.part_target_duration / packet.time_base, + self._stream_settings.part_target_duration / packet.time_base,
@ -220,29 +232,44 @@ class SegmentBuffer:
# Closing the av_output will write the remaining buffered data to the # Closing the av_output will write the remaining buffered data to the
# memory_file as a new moof/mdat. # memory_file as a new moof/mdat.
self._av_output.close() self._av_output.close()
elif not self._part_has_keyframe:
# Parts which are not the last part or an independent part should
# not have durations below 0.85 of the part target duration.
adjusted_dts = max(
adjusted_dts,
self._part_start_dts
+ 0.85 * self._stream_settings.part_target_duration / packet.time_base,
)
assert self._segment assert self._segment
self._memory_file.seek(self._memory_file_pos) self._memory_file.seek(self._memory_file_pos)
self._hass.loop.call_soon_threadsafe( self._hass.loop.call_soon_threadsafe(
self._segment.async_add_part, self._segment.async_add_part,
Part( Part(
duration=float((current_dts - self._part_start_dts) * packet.time_base), duration=float(
(adjusted_dts - self._part_start_dts) * packet.time_base
),
has_keyframe=self._part_has_keyframe, has_keyframe=self._part_has_keyframe,
data=self._memory_file.read(), data=self._memory_file.read(),
), ),
float((current_dts - self._segment_start_dts) * packet.time_base) (
segment_duration := float(
(adjusted_dts - self._segment_start_dts) * packet.time_base
)
)
if last_part if last_part
else 0, else 0,
) )
if last_part: if last_part:
# If we've written the last part, we can close the memory_file. # If we've written the last part, we can close the memory_file.
self._memory_file.close() # We don't need the BytesIO object anymore self._memory_file.close() # We don't need the BytesIO object anymore
self._start_time += datetime.timedelta(seconds=segment_duration)
# Reinitialize # Reinitialize
self.reset(current_dts) self.reset(packet.dts)
else: else:
# For the last part, these will get set again elsewhere so we can skip # For the last part, these will get set again elsewhere so we can skip
# setting them here. # setting them here.
self._memory_file_pos = self._memory_file.tell() self._memory_file_pos = self._memory_file.tell()
self._part_start_dts = current_dts self._part_start_dts = adjusted_dts
self._part_has_keyframe = False self._part_has_keyframe = False
def discontinuity(self) -> None: def discontinuity(self) -> None:

View file

@ -677,6 +677,10 @@ async def test_worker_log(hass, caplog):
async def test_durations(hass, record_worker_sync): async def test_durations(hass, record_worker_sync):
"""Test that the duration metadata matches the media.""" """Test that the duration metadata matches the media."""
# Use a target part duration which has a slight mismatch
# with the incoming frame rate to better expose problems.
target_part_duration = TEST_PART_DURATION - 0.01
await async_setup_component( await async_setup_component(
hass, hass,
"stream", "stream",
@ -684,12 +688,12 @@ async def test_durations(hass, record_worker_sync):
"stream": { "stream": {
CONF_LL_HLS: True, CONF_LL_HLS: True,
CONF_SEGMENT_DURATION: SEGMENT_DURATION, CONF_SEGMENT_DURATION: SEGMENT_DURATION,
CONF_PART_DURATION: TEST_PART_DURATION, CONF_PART_DURATION: target_part_duration,
} }
}, },
) )
source = generate_h264_video() source = generate_h264_video(duration=SEGMENT_DURATION + 1)
stream = create_stream(hass, source, {}) stream = create_stream(hass, source, {})
# use record_worker_sync to grab output segments # use record_worker_sync to grab output segments
@ -702,25 +706,37 @@ async def test_durations(hass, record_worker_sync):
# check that the Part duration metadata matches the durations in the media # check that the Part duration metadata matches the durations in the media
running_metadata_duration = 0 running_metadata_duration = 0
for segment in complete_segments: for segment in complete_segments:
for part in segment.parts: av_segment = av.open(io.BytesIO(segment.init + segment.get_data()))
av_segment.close()
for part_num, part in enumerate(segment.parts):
av_part = av.open(io.BytesIO(segment.init + part.data)) av_part = av.open(io.BytesIO(segment.init + part.data))
running_metadata_duration += part.duration running_metadata_duration += part.duration
# av_part.duration actually returns the dts of the first packet of # av_part.duration actually returns the dts of the first packet of the next
# the next av_part. When we normalize this by av.time_base we get # av_part. When we normalize this by av.time_base we get the running
# the running duration of the media. # duration of the media.
# The metadata duration is slightly different. The worker has # The metadata duration may differ slightly from the media duration.
# some flexibility of where to set each metadata boundary, and # The worker has some flexibility of where to set each metadata boundary,
# when the media's duration is slightly too long, the metadata # and when the media's duration is slightly too long or too short, the
# duration is adjusted down. This means that the running metadata # metadata duration may be adjusted up or down.
# duration may be up to one video frame duration smaller than the # We check here that the divergence between the metadata duration and the
# part duration. # media duration is not too large (2 frames seems reasonable here).
assert running_metadata_duration < av_part.duration / av.time_base + 1e-6 assert math.isclose(
assert ( (av_part.duration - av_part.start_time) / av.time_base,
running_metadata_duration part.duration,
> av_part.duration / av.time_base abs_tol=2 / av_part.streams.video[0].rate + 1e-6,
- 1 / av_part.streams.video[0].rate
- 1e-6
) )
# Also check that the sum of the durations so far matches the last dts
# in the media.
assert math.isclose(
running_metadata_duration,
av_part.duration / av.time_base,
abs_tol=1e-6,
)
# And check that the metadata duration is between 0.85x and 1.0x of
# the part target duration
if not (part.has_keyframe or part_num == len(segment.parts) - 1):
assert part.duration > 0.85 * target_part_duration - 1e-6
assert part.duration < target_part_duration + 1e-6
av_part.close() av_part.close()
# check that the Part durations are consistent with the Segment durations # check that the Part durations are consistent with the Segment durations
for segment in complete_segments: for segment in complete_segments: