Restart keepalive streams (#38863)

This commit is contained in:
Eric Severance 2020-09-03 09:22:00 -07:00 committed by GitHub
parent fbbfd46fb8
commit 9baa7c6c24
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 8 deletions

View file

@ -171,6 +171,10 @@ class Stream:
from .worker import stream_worker
if self._thread is None or not self._thread.isAlive():
if self._thread is not None:
# The thread must have crashed/exited. Join to clean up the
# previous thread.
self._thread.join(timeout=0)
self._thread_quit = threading.Event()
self._thread = threading.Thread(
name="stream_worker",

View file

@ -2,6 +2,7 @@
from collections import deque
import io
import logging
import time
import av
@ -35,6 +36,25 @@ def create_stream_buffer(stream_output, video_stream, audio_stream, sequence):
def stream_worker(hass, stream, quit_event):
"""Handle consuming streams and restart keepalive streams."""
wait_timeout = 0
while not quit_event.wait(timeout=wait_timeout):
start_time = time.time()
try:
_stream_worker_internal(hass, stream, quit_event)
except av.error.FFmpegError: # pylint: disable=c-extension-no-member
_LOGGER.exception("Stream connection failed: %s", stream.source)
if not stream.keepalive or quit_event.is_set():
break
# To avoid excessive restarts, don't restart faster than once every 40 seconds.
wait_timeout = max(40 - (time.time() - start_time), 0)
_LOGGER.debug(
"Restarting stream worker in %d seconds: %s", wait_timeout, stream.source,
)
def _stream_worker_internal(hass, stream, quit_event):
"""Handle consuming streams."""
container = av.open(stream.source, options=stream.options)
@ -112,13 +132,15 @@ def stream_worker(hass, stream, quit_event):
audio_stream = None
except (av.AVError, StopIteration) as ex:
# End of stream, clear listeners and stop thread
for fmt, _ in outputs.items():
hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None)
if not stream.keepalive:
# End of stream, clear listeners and stop thread
for fmt, _ in outputs.items():
hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None)
_LOGGER.error(
"Error demuxing stream while finding first packet: %s", str(ex)
)
quit_event.set()
return False
return True
def initialize_segment(video_pts):
"""Reset some variables and initialize outputs for each segment."""
@ -159,7 +181,9 @@ def stream_worker(hass, stream, quit_event):
packet.stream = output_streams[audio_stream]
buffer.output.mux(packet)
peek_first_pts()
if not peek_first_pts():
container.close()
return
last_dts = {k: v - 1 for k, v in first_pts.items()}
initialize_segment(first_pts[video_stream])
@ -179,9 +203,10 @@ def stream_worker(hass, stream, quit_event):
continue
last_packet_was_without_dts = False
except (av.AVError, StopIteration) as ex:
# End of stream, clear listeners and stop thread
for fmt, _ in outputs.items():
hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None)
if not stream.keepalive:
# End of stream, clear listeners and stop thread
for fmt, _ in outputs.items():
hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None)
_LOGGER.error("Error demuxing stream: %s", str(ex))
break

View file

@ -2,6 +2,7 @@
from datetime import timedelta
from urllib.parse import urlparse
import av
import pytest
from homeassistant.components.stream import request_stream
@ -9,6 +10,7 @@ from homeassistant.const import HTTP_NOT_FOUND
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.async_mock import patch
from tests.common import async_fire_time_changed
from tests.components.stream.common import generate_h264_video, preload_stream
@ -122,3 +124,37 @@ async def test_stream_ended(hass):
# Stop stream, if it hasn't quit already
stream.stop()
async def test_stream_keepalive(hass):
"""Test hls stream retries the stream when keepalive=True."""
await async_setup_component(hass, "stream", {"stream": {}})
# Setup demo HLS track
source = "test_stream_keepalive_source"
stream = preload_stream(hass, source)
track = stream.add_provider("hls")
track.num_segments = 2
cur_time = 0
def time_side_effect():
nonlocal cur_time
if cur_time >= 80:
stream.keepalive = False # Thread should exit and be joinable.
cur_time += 40
return cur_time
with patch("av.open") as av_open, patch(
"homeassistant.components.stream.worker.time"
) as mock_time:
av_open.side_effect = av.error.InvalidDataError(-2, "error")
mock_time.time.side_effect = time_side_effect
# Request stream
request_stream(hass, source, keepalive=True)
stream._thread.join()
stream._thread = None
assert av_open.call_count == 2
# Stop stream, if it hasn't quit already
stream.stop()