Make stream recorder work concurrently (#73478)

This commit is contained in:
uvjustin 2022-06-18 05:13:07 +10:00 committed by GitHub
parent 600d23e052
commit 7a3f632c1d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 297 additions and 306 deletions

View file

@ -502,7 +502,6 @@ class Stream:
recorder.video_path = video_path
await self.start()
self._logger.debug("Started a stream recording of %s seconds", duration)
# Take advantage of lookback
hls: HlsStreamOutput = cast(HlsStreamOutput, self.outputs().get(HLS_PROVIDER))
@ -512,6 +511,9 @@ class Stream:
await hls.recv()
recorder.prepend(list(hls.get_segments())[-num_segments - 1 : -1])
self._logger.debug("Started a stream recording of %s seconds", duration)
await recorder.async_record()
async def async_get_image(
self,
width: int | None = None,

View file

@ -327,7 +327,6 @@ class StreamOutput:
"""Handle cleanup."""
self._event.set()
self.idle_timer.clear()
self._segments = deque(maxlen=self._segments.maxlen)
class StreamView(HomeAssistantView):

View file

@ -60,6 +60,11 @@ class HlsStreamOutput(StreamOutput):
"""Return provider name."""
return HLS_PROVIDER
def cleanup(self) -> None:
"""Handle cleanup."""
super().cleanup()
self._segments.clear()
@property
def target_duration(self) -> float:
"""Return the target duration."""

View file

@ -1,14 +1,11 @@
"""Provide functionality to record stream."""
from __future__ import annotations
from collections import deque
from io import BytesIO
import logging
import os
import threading
import av
from av.container import OutputContainer
from homeassistant.core import HomeAssistant, callback
@ -27,31 +24,58 @@ def async_setup_recorder(hass: HomeAssistant) -> None:
"""Only here so Provider Registry works."""
def recorder_save_worker(file_out: str, segments: deque[Segment]) -> None:
@PROVIDERS.register(RECORDER_PROVIDER)
class RecorderOutput(StreamOutput):
"""Represents the Recorder Output format."""
def __init__(
self,
hass: HomeAssistant,
idle_timer: IdleTimer,
stream_settings: StreamSettings,
) -> None:
"""Initialize recorder output."""
super().__init__(hass, idle_timer, stream_settings)
self.video_path: str
@property
def name(self) -> str:
"""Return provider name."""
return RECORDER_PROVIDER
def prepend(self, segments: list[Segment]) -> None:
"""Prepend segments to existing list."""
self._segments.extendleft(reversed(segments))
def cleanup(self) -> None:
"""Handle cleanup."""
self.idle_timer.idle = True
super().cleanup()
async def async_record(self) -> None:
"""Handle saving stream."""
if not segments:
_LOGGER.error("Recording failed to capture anything")
return
os.makedirs(os.path.dirname(file_out), exist_ok=True)
os.makedirs(os.path.dirname(self.video_path), exist_ok=True)
pts_adjuster: dict[str, int | None] = {"video": None, "audio": None}
output: OutputContainer | None = None
output: av.container.OutputContainer | None = None
output_v = None
output_a = None
last_stream_id = None
last_stream_id = -1
# The running duration of processed segments. Note that this is in av.time_base
# units which seem to be defined inversely to how stream time_bases are defined
running_duration = 0
last_sequence = float("-inf")
for segment in segments:
def write_segment(segment: Segment) -> None:
"""Write a segment to output."""
nonlocal output, output_v, output_a, last_stream_id, running_duration, last_sequence
# Because the stream_worker is in a different thread from the record service,
# the lookback segments may still have some overlap with the recorder segments
if segment.sequence <= last_sequence:
continue
return
last_sequence = segment.sequence
# Open segment
@ -63,14 +87,16 @@ def recorder_save_worker(file_out: str, segments: deque[Segment]) -> None:
# Skip this segment if it doesn't have data
if source.duration is None:
source.close()
continue
return
source_v = source.streams.video[0]
source_a = source.streams.audio[0] if len(source.streams.audio) > 0 else None
source_a = (
source.streams.audio[0] if len(source.streams.audio) > 0 else None
)
# Create output on first segment
if not output:
output = av.open(
file_out,
self.video_path + ".tmp",
"w",
format=RECORDER_CONTAINER_FORMAT,
container_options={
@ -113,41 +139,28 @@ def recorder_save_worker(file_out: str, segments: deque[Segment]) -> None:
source.close()
if output is not None:
output.close()
@PROVIDERS.register(RECORDER_PROVIDER)
class RecorderOutput(StreamOutput):
"""Represents HLS Output formats."""
def __init__(
self,
hass: HomeAssistant,
idle_timer: IdleTimer,
stream_settings: StreamSettings,
) -> None:
"""Initialize recorder output."""
super().__init__(hass, idle_timer, stream_settings)
self.video_path: str
@property
def name(self) -> str:
"""Return provider name."""
return RECORDER_PROVIDER
def prepend(self, segments: list[Segment]) -> None:
"""Prepend segments to existing list."""
self._segments.extendleft(reversed(segments))
def cleanup(self) -> None:
"""Write recording and clean up."""
_LOGGER.debug("Starting recorder worker thread")
thread = threading.Thread(
name="recorder_save_worker",
target=recorder_save_worker,
args=(self.video_path, self._segments.copy()),
# Write lookback segments
while len(self._segments) > 1: # The last segment is in progress
await self._hass.async_add_executor_job(
write_segment, self._segments.popleft()
)
thread.start()
super().cleanup()
# Make sure the first segment has been added
if not self._segments:
await self.recv()
# Write segments as soon as they are completed
while not self.idle:
await self.recv()
await self._hass.async_add_executor_job(
write_segment, self._segments.popleft()
)
# Write remaining segments
# Should only have 0 or 1 segments, but loop through just in case
while self._segments:
await self._hass.async_add_executor_job(
write_segment, self._segments.popleft()
)
if output is None:
_LOGGER.error("Recording failed to capture anything")
else:
output.close()
os.rename(self.video_path + ".tmp", self.video_path)

View file

@ -12,7 +12,6 @@ so that it can inspect the output.
from __future__ import annotations
import asyncio
from collections import deque
from http import HTTPStatus
import logging
import threading
@ -20,10 +19,9 @@ from typing import Generator
from unittest.mock import Mock, patch
from aiohttp import web
import async_timeout
import pytest
from homeassistant.components.stream.core import Segment, StreamOutput
from homeassistant.components.stream.core import StreamOutput
from homeassistant.components.stream.worker import StreamState
from .common import generate_h264_video, stream_teardown
@ -73,61 +71,6 @@ def stream_worker_sync(hass):
yield sync
class SaveRecordWorkerSync:
"""
Test fixture to manage RecordOutput thread for recorder_save_worker.
This is used to assert that the worker is started and stopped cleanly
to avoid thread leaks in tests.
"""
def __init__(self, hass):
"""Initialize SaveRecordWorkerSync."""
self._hass = hass
self._save_event = None
self._segments = None
self._save_thread = None
self.reset()
def recorder_save_worker(self, file_out: str, segments: deque[Segment]):
"""Mock method for patch."""
logging.debug("recorder_save_worker thread started")
assert self._save_thread is None
self._segments = segments
self._save_thread = threading.current_thread()
self._hass.loop.call_soon_threadsafe(self._save_event.set)
async def get_segments(self):
"""Return the recorded video segments."""
async with async_timeout.timeout(TEST_TIMEOUT):
await self._save_event.wait()
return self._segments
async def join(self):
"""Verify save worker was invoked and block on shutdown."""
async with async_timeout.timeout(TEST_TIMEOUT):
await self._save_event.wait()
self._save_thread.join(timeout=TEST_TIMEOUT)
assert not self._save_thread.is_alive()
def reset(self):
"""Reset callback state for reuse in tests."""
self._save_thread = None
self._save_event = asyncio.Event()
@pytest.fixture()
def record_worker_sync(hass):
"""Patch recorder_save_worker for clean thread shutdown for test."""
sync = SaveRecordWorkerSync(hass)
with patch(
"homeassistant.components.stream.recorder.recorder_save_worker",
side_effect=sync.recorder_save_worker,
autospec=True,
):
yield sync
class HLSSync:
"""Test fixture that intercepts stream worker calls to StreamOutput."""

View file

@ -506,6 +506,8 @@ async def test_remove_incomplete_segment_on_exit(
assert len(segments) == 3
assert not segments[-1].complete
stream_worker_sync.resume()
with patch("homeassistant.components.stream.Stream.remove_provider"):
# Patch remove_provider so the deque is not cleared
stream._thread_quit.set()
stream._thread.join()
stream._thread = None

View file

@ -1,4 +1,5 @@
"""The tests for hls streams."""
"""The tests for recording streams."""
import asyncio
from datetime import timedelta
from io import BytesIO
import os
@ -7,11 +8,14 @@ from unittest.mock import patch
import av
import pytest
from homeassistant.components.stream import create_stream
from homeassistant.components.stream.const import HLS_PROVIDER, RECORDER_PROVIDER
from homeassistant.components.stream import Stream, create_stream
from homeassistant.components.stream.const import (
HLS_PROVIDER,
OUTPUT_IDLE_TIMEOUT,
RECORDER_PROVIDER,
)
from homeassistant.components.stream.core import Part
from homeassistant.components.stream.fmp4utils import find_box
from homeassistant.components.stream.recorder import recorder_save_worker
from homeassistant.exceptions import HomeAssistantError
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
@ -20,40 +24,55 @@ from .common import DefaultSegment as Segment, generate_h264_video, remux_with_a
from tests.common import async_fire_time_changed
MAX_ABORT_SEGMENTS = 20 # Abort test to avoid looping forever
async def test_record_stream(hass, hass_client, record_worker_sync, h264_video):
"""
Test record stream.
Tests full integration with the stream component, and captures the
stream worker and save worker to allow for clean shutdown of background
threads. The actual save logic is tested in test_recorder_save below.
"""
@pytest.fixture(autouse=True)
async def stream_component(hass):
"""Set up the component before each test."""
await async_setup_component(hass, "stream", {"stream": {}})
# Setup demo track
@pytest.fixture
def filename(tmpdir):
"""Use this filename for the tests."""
return f"{tmpdir}/test.mp4"
async def test_record_stream(hass, filename, h264_video):
"""Test record stream."""
worker_finished = asyncio.Event()
class MockStream(Stream):
"""Mock Stream so we can patch remove_provider."""
async def remove_provider(self, provider):
"""Add a finished event to Stream.remove_provider."""
await Stream.remove_provider(self, provider)
worker_finished.set()
with patch("homeassistant.components.stream.Stream", wraps=MockStream):
stream = create_stream(hass, h264_video, {})
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path")
make_recording = hass.async_create_task(stream.async_record(filename))
# After stream decoding finishes, the record worker thread starts
segments = await record_worker_sync.get_segments()
assert len(segments) >= 1
# In general usage the recorder will only include what has already been
# processed by the worker. To guarantee we have some output for the test,
# wait until the worker has finished before firing
await worker_finished.wait()
# Verify that the save worker was invoked, then block until its
# thread completes and is shutdown completely to avoid thread leaks.
await record_worker_sync.join()
# Fire the IdleTimer
future = dt_util.utcnow() + timedelta(seconds=30)
async_fire_time_changed(hass, future)
await stream.stop()
await make_recording
# Assert
assert os.path.exists(filename)
async def test_record_lookback(
hass, hass_client, stream_worker_sync, record_worker_sync, h264_video
):
async def test_record_lookback(hass, h264_video):
"""Exercise record with loopback."""
await async_setup_component(hass, "stream", {"stream": {}})
stream = create_stream(hass, h264_video, {})
@ -69,42 +88,8 @@ async def test_record_lookback(
await stream.stop()
async def test_recorder_timeout(hass, hass_client, stream_worker_sync, h264_video):
"""
Test recorder timeout.
Mocks out the cleanup to assert that it is invoked after a timeout.
This test does not start the recorder save thread.
"""
await async_setup_component(hass, "stream", {"stream": {}})
stream_worker_sync.pause()
with patch("homeassistant.components.stream.IdleTimer.fire") as mock_timeout:
# Setup demo track
stream = create_stream(hass, h264_video, {})
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path")
recorder = stream.add_provider(RECORDER_PROVIDER)
await recorder.recv()
# Wait a minute
future = dt_util.utcnow() + timedelta(minutes=1)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert mock_timeout.called
stream_worker_sync.resume()
await stream.stop()
await hass.async_block_till_done()
await hass.async_block_till_done()
async def test_record_path_not_allowed(hass, hass_client, h264_video):
async def test_record_path_not_allowed(hass, h264_video):
"""Test where the output path is not allowed by home assistant configuration."""
await async_setup_component(hass, "stream", {"stream": {}})
stream = create_stream(hass, h264_video, {})
with patch.object(
@ -127,25 +112,8 @@ def add_parts_to_segment(segment, source):
]
async def test_recorder_save(tmpdir, h264_video):
"""Test recorder save."""
# Setup
filename = f"{tmpdir}/test.mp4"
# Run
segment = Segment(sequence=1)
add_parts_to_segment(segment, h264_video)
segment.duration = 4
recorder_save_worker(filename, [segment])
# Assert
assert os.path.exists(filename)
async def test_recorder_discontinuity(tmpdir, h264_video):
async def test_recorder_discontinuity(hass, filename, h264_video):
"""Test recorder save across a discontinuity."""
# Setup
filename = f"{tmpdir}/test.mp4"
# Run
segment_1 = Segment(sequence=1, stream_id=0)
@ -154,18 +122,50 @@ async def test_recorder_discontinuity(tmpdir, h264_video):
segment_2 = Segment(sequence=2, stream_id=1)
add_parts_to_segment(segment_2, h264_video)
segment_2.duration = 4
recorder_save_worker(filename, [segment_1, segment_2])
provider_ready = asyncio.Event()
class MockStream(Stream):
"""Mock Stream so we can patch add_provider."""
async def start(self):
"""Make Stream.start a noop that gives up async context."""
await asyncio.sleep(0)
def add_provider(self, fmt, timeout=OUTPUT_IDLE_TIMEOUT):
"""Add a finished event to Stream.add_provider."""
provider = Stream.add_provider(self, fmt, timeout)
provider_ready.set()
return provider
with patch.object(hass.config, "is_allowed_path", return_value=True), patch(
"homeassistant.components.stream.Stream", wraps=MockStream
), patch("homeassistant.components.stream.recorder.RecorderOutput.recv"):
stream = create_stream(hass, "blank", {})
make_recording = hass.async_create_task(stream.async_record(filename))
await provider_ready.wait()
recorder_output = stream.outputs()[RECORDER_PROVIDER]
recorder_output.idle_timer.start()
recorder_output._segments.extend([segment_1, segment_2])
# Fire the IdleTimer
future = dt_util.utcnow() + timedelta(seconds=30)
async_fire_time_changed(hass, future)
await make_recording
# Assert
assert os.path.exists(filename)
async def test_recorder_no_segments(tmpdir):
async def test_recorder_no_segments(hass, filename):
"""Test recorder behavior with a stream failure which causes no segments."""
# Setup
filename = f"{tmpdir}/test.mp4"
stream = create_stream(hass, BytesIO(), {})
# Run
recorder_save_worker("unused-file", [])
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record(filename)
# Assert
assert not os.path.exists(filename)
@ -188,9 +188,7 @@ def h264_mov_video():
)
async def test_record_stream_audio(
hass,
hass_client,
stream_worker_sync,
record_worker_sync,
filename,
audio_codec,
expected_audio_streams,
h264_mov_video,
@ -201,28 +199,42 @@ async def test_record_stream_audio(
Record stream output should have an audio channel when input has
a valid codec and audio packets and no audio channel otherwise.
"""
await async_setup_component(hass, "stream", {"stream": {}})
# Remux source video with new audio
source = remux_with_audio(h264_mov_video, "mov", audio_codec) # mov can store PCM
record_worker_sync.reset()
stream_worker_sync.pause()
worker_finished = asyncio.Event()
class MockStream(Stream):
"""Mock Stream so we can patch remove_provider."""
async def remove_provider(self, provider):
"""Add a finished event to Stream.remove_provider."""
await Stream.remove_provider(self, provider)
worker_finished.set()
with patch("homeassistant.components.stream.Stream", wraps=MockStream):
stream = create_stream(hass, source, {})
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path")
recorder = stream.add_provider(RECORDER_PROVIDER)
while True:
await recorder.recv()
if not (segment := recorder.last_segment):
break
last_segment = segment
stream_worker_sync.resume()
with patch.object(hass.config, "is_allowed_path", return_value=True):
make_recording = hass.async_create_task(stream.async_record(filename))
# In general usage the recorder will only include what has already been
# processed by the worker. To guarantee we have some output for the test,
# wait until the worker has finished before firing
await worker_finished.wait()
# Fire the IdleTimer
future = dt_util.utcnow() + timedelta(seconds=30)
async_fire_time_changed(hass, future)
await make_recording
# Assert
assert os.path.exists(filename)
result = av.open(
BytesIO(last_segment.init + last_segment.get_data()),
filename,
"r",
format="mp4",
)
@ -232,14 +244,9 @@ async def test_record_stream_audio(
await stream.stop()
await hass.async_block_till_done()
# Verify that the save worker was invoked, then block until its
# thread completes and is shutdown completely to avoid thread leaks.
await record_worker_sync.join()
async def test_recorder_log(hass, caplog):
"""Test starting a stream to record logs the url without username and password."""
await async_setup_component(hass, "stream", {"stream": {}})
stream = create_stream(hass, "https://abcd:efgh@foo.bar", {})
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path")

View file

@ -13,6 +13,7 @@ pushed to the output streams. The packet sequence can be used to exercise
failure modes or corner cases like how out of order packets are handled.
"""
import asyncio
import fractions
import io
import logging
@ -33,6 +34,7 @@ from homeassistant.components.stream.const import (
HLS_PROVIDER,
MAX_MISSING_DTS,
PACKETS_TO_WAIT_FOR_AUDIO,
RECORDER_PROVIDER,
SEGMENT_DURATION_ADJUSTER,
TARGET_SEGMENT_DURATION_NON_LL_HLS,
)
@ -732,7 +734,23 @@ async def test_worker_log(hass, caplog):
assert "https://abcd:efgh@foo.bar" not in caplog.text
async def test_durations(hass, record_worker_sync):
@pytest.fixture
def worker_finished_stream():
"""Fixture that helps call a stream and wait for the worker to finish."""
worker_finished = asyncio.Event()
class MockStream(Stream):
"""Mock Stream so we can patch remove_provider."""
async def remove_provider(self, provider):
"""Add a finished event to Stream.remove_provider."""
await Stream.remove_provider(self, provider)
worker_finished.set()
return worker_finished, MockStream
async def test_durations(hass, worker_finished_stream):
"""Test that the duration metadata matches the media."""
# Use a target part duration which has a slight mismatch
@ -751,13 +769,17 @@ async def test_durations(hass, record_worker_sync):
)
source = generate_h264_video(duration=SEGMENT_DURATION + 1)
worker_finished, mock_stream = worker_finished_stream
with patch("homeassistant.components.stream.Stream", wraps=mock_stream):
stream = create_stream(hass, source, {}, stream_label="camera")
# use record_worker_sync to grab output segments
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path")
recorder_output = stream.add_provider(RECORDER_PROVIDER, timeout=30)
await stream.start()
await worker_finished.wait()
complete_segments = list(recorder_output.get_segments())[:-1]
complete_segments = list(await record_worker_sync.get_segments())[:-1]
assert len(complete_segments) >= 1
# check that the Part duration metadata matches the durations in the media
@ -803,12 +825,10 @@ async def test_durations(hass, record_worker_sync):
abs_tol=1e-6,
)
await record_worker_sync.join()
await stream.stop()
async def test_has_keyframe(hass, record_worker_sync, h264_video):
async def test_has_keyframe(hass, h264_video, worker_finished_stream):
"""Test that the has_keyframe metadata matches the media."""
await async_setup_component(
hass,
@ -824,13 +844,17 @@ async def test_has_keyframe(hass, record_worker_sync, h264_video):
},
)
worker_finished, mock_stream = worker_finished_stream
with patch("homeassistant.components.stream.Stream", wraps=mock_stream):
stream = create_stream(hass, h264_video, {}, stream_label="camera")
# use record_worker_sync to grab output segments
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path")
recorder_output = stream.add_provider(RECORDER_PROVIDER, timeout=30)
await stream.start()
await worker_finished.wait()
complete_segments = list(recorder_output.get_segments())[:-1]
complete_segments = list(await record_worker_sync.get_segments())[:-1]
assert len(complete_segments) >= 1
# check that the Part has_keyframe metadata matches the keyframes in the media
@ -843,12 +867,10 @@ async def test_has_keyframe(hass, record_worker_sync, h264_video):
av_part.close()
assert part.has_keyframe == media_has_keyframe
await record_worker_sync.join()
await stream.stop()
async def test_h265_video_is_hvc1(hass, record_worker_sync):
async def test_h265_video_is_hvc1(hass, worker_finished_stream):
"""Test that a h265 video gets muxed as hvc1."""
await async_setup_component(
hass,
@ -863,13 +885,16 @@ async def test_h265_video_is_hvc1(hass, record_worker_sync):
)
source = generate_h265_video()
worker_finished, mock_stream = worker_finished_stream
with patch("homeassistant.components.stream.Stream", wraps=mock_stream):
stream = create_stream(hass, source, {}, stream_label="camera")
# use record_worker_sync to grab output segments
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path")
recorder_output = stream.add_provider(RECORDER_PROVIDER, timeout=30)
await stream.start()
await worker_finished.wait()
complete_segments = list(await record_worker_sync.get_segments())[:-1]
complete_segments = list(recorder_output.get_segments())[:-1]
assert len(complete_segments) >= 1
segment = complete_segments[0]
@ -878,8 +903,6 @@ async def test_h265_video_is_hvc1(hass, record_worker_sync):
assert av_part.streams.video[0].codec_tag == "hvc1"
av_part.close()
await record_worker_sync.join()
await stream.stop()
assert stream.get_diagnostics() == {
@ -891,7 +914,7 @@ async def test_h265_video_is_hvc1(hass, record_worker_sync):
}
async def test_get_image(hass, record_worker_sync):
async def test_get_image(hass):
"""Test that the has_keyframe metadata matches the media."""
await async_setup_component(hass, "stream", {"stream": {}})
@ -904,14 +927,11 @@ async def test_get_image(hass, record_worker_sync):
mock_turbo_jpeg_singleton.instance.return_value = mock_turbo_jpeg()
stream = create_stream(hass, source, {})
# use record_worker_sync to grab output segments
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path")
make_recording = hass.async_create_task(stream.async_record("/example/path"))
await make_recording
assert stream._keyframe_converter._image is None
await record_worker_sync.join()
assert await stream.async_get_image() == EMPTY_8_6_JPEG
await stream.stop()