hass-core/tests/components/stream/test_hls.py

225 lines
6.8 KiB
Python
Raw Normal View History

"""The tests for hls streams.
The tests encode stream (as an h264 video), then load the stream and verify
that it is decoded properly. The background worker thread responsible for
decoding will decode the stream as fast as possible, and when completed
clears all output buffers. This can be a problem for the test that wishes
to retrieve and verify decoded segments. If the worker finishes first, there is
nothing for the test to verify. The solution is the WorkerSync class that
allows the tests to pause the worker thread before finalizing the stream
so that it can inspect the output.
"""
from datetime import timedelta
import threading
2021-01-01 22:31:56 +01:00
from unittest.mock import patch
from urllib.parse import urlparse
2020-09-03 09:22:00 -07:00
import av
import pytest
from homeassistant.components.stream import request_stream
from homeassistant.components.stream.core import Segment, StreamOutput
2020-04-09 00:57:47 +02:00
from homeassistant.const import HTTP_NOT_FOUND
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.common import async_fire_time_changed
2019-07-31 12:25:30 -07:00
from tests.components.stream.common import generate_h264_video, preload_stream
class WorkerSync:
"""Test fixture that intercepts stream worker calls to StreamOutput."""
def __init__(self):
"""Initialize WorkerSync."""
self._event = None
self._put_original = StreamOutput.put
def pause(self):
"""Pause the worker before it finalizes the stream."""
self._event = threading.Event()
def resume(self):
"""Allow the worker thread to finalize the stream."""
self._event.set()
def blocking_put(self, stream_output: StreamOutput, segment: Segment):
"""Proxy StreamOutput.put, intercepted for test to pause worker."""
if segment is None and self._event:
# Worker is ending the stream, which clears all output buffers.
# Block the worker thread until the test has a chance to verify
# the segments under test.
self._event.wait()
# Forward to actual StreamOutput.put
self._put_original(stream_output, segment)
@pytest.fixture()
def worker_sync(hass):
"""Patch StreamOutput to allow test to synchronize worker stream end."""
sync = WorkerSync()
with patch(
"homeassistant.components.stream.core.StreamOutput.put",
side_effect=sync.blocking_put,
autospec=True,
):
yield sync
async def test_hls_stream(hass, hass_client, worker_sync):
"""
Test hls stream.
Purposefully not mocking anything here to test full
integration with the stream component.
"""
2019-07-31 12:25:30 -07:00
await async_setup_component(hass, "stream", {"stream": {}})
worker_sync.pause()
# Setup demo HLS track
source = generate_h264_video()
stream = preload_stream(hass, source)
2019-07-31 12:25:30 -07:00
stream.add_provider("hls")
# Request stream
url = request_stream(hass, source)
http_client = await hass_client()
# Fetch playlist
parsed_url = urlparse(url)
playlist_response = await http_client.get(parsed_url.path)
assert playlist_response.status == 200
# Fetch init
playlist = await playlist_response.text()
playlist_url = "/".join(parsed_url.path.split("/")[:-1])
init_url = playlist_url + "/init.mp4"
init_response = await http_client.get(init_url)
assert init_response.status == 200
# Fetch segment
playlist = await playlist_response.text()
2019-07-31 12:25:30 -07:00
playlist_url = "/".join(parsed_url.path.split("/")[:-1])
segment_url = playlist_url + "/" + playlist.splitlines()[-1]
segment_response = await http_client.get(segment_url)
assert segment_response.status == 200
worker_sync.resume()
# Stop stream, if it hasn't quit already
stream.stop()
# Ensure playlist not accessible after stream ends
fail_response = await http_client.get(parsed_url.path)
2020-04-09 00:57:47 +02:00
assert fail_response.status == HTTP_NOT_FOUND
async def test_stream_timeout(hass, hass_client, worker_sync):
"""Test hls stream timeout."""
2019-07-31 12:25:30 -07:00
await async_setup_component(hass, "stream", {"stream": {}})
worker_sync.pause()
# Setup demo HLS track
source = generate_h264_video()
stream = preload_stream(hass, source)
2019-07-31 12:25:30 -07:00
stream.add_provider("hls")
# Request stream
url = request_stream(hass, source)
http_client = await hass_client()
# Fetch playlist
parsed_url = urlparse(url)
playlist_response = await http_client.get(parsed_url.path)
assert playlist_response.status == 200
# Wait a minute
future = dt_util.utcnow() + timedelta(minutes=1)
async_fire_time_changed(hass, future)
# Fetch again to reset timer
playlist_response = await http_client.get(parsed_url.path)
assert playlist_response.status == 200
worker_sync.resume()
# Wait 5 minutes
future = dt_util.utcnow() + timedelta(minutes=5)
async_fire_time_changed(hass, future)
# Ensure playlist not accessible
fail_response = await http_client.get(parsed_url.path)
2020-04-09 00:57:47 +02:00
assert fail_response.status == HTTP_NOT_FOUND
async def test_stream_ended(hass, worker_sync):
"""Test hls stream packets ended."""
2019-07-31 12:25:30 -07:00
await async_setup_component(hass, "stream", {"stream": {}})
worker_sync.pause()
# Setup demo HLS track
source = generate_h264_video()
stream = preload_stream(hass, source)
2019-07-31 12:25:30 -07:00
track = stream.add_provider("hls")
# Request stream
request_stream(hass, source)
# Run it dead
while True:
segment = await track.recv()
if segment is None:
break
segments = segment.sequence
# Allow worker to finalize once enough of the stream is been consumed
if segments > 1:
worker_sync.resume()
2019-04-07 12:42:16 -07:00
assert segments > 1
assert not track.get_segment()
# Stop stream, if it hasn't quit already
stream.stop()
2020-09-03 09:22:00 -07:00
async def test_stream_keepalive(hass):
"""Test hls stream retries the stream when keepalive=True."""
await async_setup_component(hass, "stream", {"stream": {}})
# Setup demo HLS track
source = "test_stream_keepalive_source"
stream = preload_stream(hass, source)
track = stream.add_provider("hls")
track.num_segments = 2
cur_time = 0
def time_side_effect():
nonlocal cur_time
if cur_time >= 80:
stream.keepalive = False # Thread should exit and be joinable.
cur_time += 40
return cur_time
with patch("av.open") as av_open, patch(
"homeassistant.components.stream.worker.time"
) as mock_time, patch(
"homeassistant.components.stream.worker.STREAM_RESTART_INCREMENT", 0
):
2020-09-03 09:22:00 -07:00
av_open.side_effect = av.error.InvalidDataError(-2, "error")
mock_time.time.side_effect = time_side_effect
# Request stream
request_stream(hass, source, keepalive=True)
stream._thread.join()
stream._thread = None
assert av_open.call_count == 2
# Stop stream, if it hasn't quit already
stream.stop()