diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index b986cddaf68..9406c24eaf6 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -20,3 +20,6 @@ MIN_SEGMENT_DURATION = 1.5 # Each segment is at least this many seconds PACKETS_TO_WAIT_FOR_AUDIO = 20 # Some streams have an audio stream with no audio MAX_TIMESTAMP_GAP = 10000 # seconds - anything from 10 to 50000 is probably reasonable + +MAX_MISSING_DTS = 6 # Number of packets missing DTS to allow +STREAM_TIMEOUT = 30 # Timeout for reading stream diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 6769413bafd..4f972774fcc 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -6,7 +6,13 @@ import time import av -from .const import MAX_TIMESTAMP_GAP, MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO +from .const import ( + MAX_MISSING_DTS, + MAX_TIMESTAMP_GAP, + MIN_SEGMENT_DURATION, + PACKETS_TO_WAIT_FOR_AUDIO, + STREAM_TIMEOUT, +) from .core import Segment, StreamBuffer _LOGGER = logging.getLogger(__name__) @@ -62,7 +68,7 @@ def stream_worker(hass, stream, quit_event): def _stream_worker_internal(hass, stream, quit_event): """Handle consuming streams.""" - container = av.open(stream.source, options=stream.options) + container = av.open(stream.source, options=stream.options, timeout=STREAM_TIMEOUT) try: video_stream = container.streams.video[0] except (KeyError, IndexError): @@ -81,13 +87,15 @@ def _stream_worker_internal(hass, stream, quit_event): if audio_stream and audio_stream.profile is None: audio_stream = None + # Iterator for demuxing + container_packets = None # The presentation timestamps of the first packet in each stream we receive # Use to adjust before muxing or outputting, but we don't adjust internally first_pts = {} # The decoder timestamps of the latest packet in each stream we processed last_dts = None # Keep track of consecutive packets without a dts to detect end of stream. - last_packet_was_without_dts = False + missing_dts = 0 # Holds the buffers for each stream provider outputs = None # Keep track of the number of segments we've processed @@ -102,8 +110,8 @@ def _stream_worker_internal(hass, stream, quit_event): # 2 - seeking can be problematic https://trac.ffmpeg.org/ticket/7815 def peek_first_pts(): - nonlocal first_pts, audio_stream - missing_dts = False + nonlocal first_pts, audio_stream, container_packets + missing_dts = 0 def empty_stream_dict(): return { @@ -112,17 +120,20 @@ def _stream_worker_internal(hass, stream, quit_event): } try: + container_packets = container.demux((video_stream, audio_stream)) first_packet = empty_stream_dict() first_pts = empty_stream_dict() # Get to first video keyframe while first_packet[video_stream] is None: - packet = next(container.demux()) + packet = next(container_packets) if ( packet.dts is None - ): # Allow single packet with no dts, raise error on second - if missing_dts: - raise av.AVError - missing_dts = True + ): # Allow MAX_MISSING_DTS packets with no dts, raise error on the next one + if missing_dts >= MAX_MISSING_DTS: + raise StopIteration( + f"Invalid data - got {MAX_MISSING_DTS+1} packets with missing DTS while initializing" + ) + missing_dts += 1 continue if packet.stream == video_stream and packet.is_keyframe: first_packet[video_stream] = packet @@ -131,13 +142,15 @@ def _stream_worker_internal(hass, stream, quit_event): while any( [pts is None for pts in {**first_packet, **first_pts}.values()] ) and (len(initial_packets) < PACKETS_TO_WAIT_FOR_AUDIO): - packet = next(container.demux((video_stream, audio_stream))) + packet = next(container_packets) if ( packet.dts is None - ): # Allow single packet with no dts, raise error on second - if missing_dts: - raise av.AVError - missing_dts = True + ): # Allow MAX_MISSING_DTS packet with no dts, raise error on the next one + if missing_dts >= MAX_MISSING_DTS: + raise StopIteration( + f"Invalid data - got {MAX_MISSING_DTS+1} packets with missing DTS while initializing" + ) + missing_dts += 1 continue if ( first_packet[packet.stream] is None @@ -223,16 +236,16 @@ def _stream_worker_internal(hass, stream, quit_event): if len(initial_packets) > 0: packet = initial_packets.popleft() else: - packet = next(container.demux((video_stream, audio_stream))) + packet = next(container_packets) if packet.dts is None: - _LOGGER.error("Stream packet without dts detected, skipping...") - # Allow a single packet without dts before terminating the stream. - if last_packet_was_without_dts: - # If we get a "flushing" packet, the stream is done - raise StopIteration("No dts in consecutive packets") - last_packet_was_without_dts = True + # Allow MAX_MISSING_DTS consecutive packets without dts. Terminate the stream on the next one. + if missing_dts >= MAX_MISSING_DTS: + raise StopIteration( + f"No dts in {MAX_MISSING_DTS+1} consecutive packets" + ) + missing_dts += 1 continue - last_packet_was_without_dts = False + missing_dts = 0 except (av.AVError, StopIteration) as ex: _LOGGER.error("Error demuxing stream: %s", str(ex)) finalize_stream()