From 2bcf87b980b7f394b842c5af6b592049d2fff09b Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Mon, 8 Feb 2021 19:53:28 -0800 Subject: [PATCH] Change the API boundary between camera and stream with initial improvement for nest expiring stream urls (#45431) * Change the API boundary between stream and camera Shift more of the stream lifecycle management to the camera. The motivation is to support stream urls that expire giving the camera the ability to change the stream once it is created. * Document stream lifecycle and simplify stream/camera interaction * Reorder create_stream function to reduce diffs * Increase test coverage for camera_sdm.py * Fix ffmpeg typo. * Add a stream identifier for each stream, managed by camera * Remove stream record service * Update homeassistant/components/stream/__init__.py Co-authored-by: Paulus Schoutsen * Unroll changes to Stream interface back into camera component * Fix preload stream to actually start the background worker * Reduce unncessary diffs for readability * Remove redundant camera stream start code Co-authored-by: Paulus Schoutsen --- homeassistant/components/camera/__init__.py | 132 ++++++---------- homeassistant/components/camera/const.py | 5 + homeassistant/components/nest/camera_sdm.py | 7 + homeassistant/components/stream/__init__.py | 143 +++++++----------- homeassistant/components/stream/const.py | 7 - homeassistant/components/stream/core.py | 6 +- homeassistant/components/stream/services.yaml | 15 -- tests/components/camera/test_init.py | 56 +++---- tests/components/generic/test_camera.py | 49 ++++-- tests/components/nest/camera_sdm_test.py | 6 + tests/components/stream/common.py | 10 -- tests/components/stream/test_hls.py | 30 ++-- tests/components/stream/test_init.py | 86 ----------- tests/components/stream/test_recorder.py | 58 +++++-- 14 files changed, 254 insertions(+), 356 deletions(-) delete mode 100644 homeassistant/components/stream/services.yaml delete mode 100644 tests/components/stream/test_init.py diff --git a/homeassistant/components/camera/__init__.py b/homeassistant/components/camera/__init__.py index 25505800709..cbc98edf19b 100644 --- a/homeassistant/components/camera/__init__.py +++ b/homeassistant/components/camera/__init__.py @@ -23,16 +23,8 @@ from homeassistant.components.media_player.const import ( DOMAIN as DOMAIN_MP, SERVICE_PLAY_MEDIA, ) -from homeassistant.components.stream import request_stream -from homeassistant.components.stream.const import ( - CONF_DURATION, - CONF_LOOKBACK, - CONF_STREAM_SOURCE, - DOMAIN as DOMAIN_STREAM, - FORMAT_CONTENT_TYPE, - OUTPUT_FORMATS, - SERVICE_RECORD, -) +from homeassistant.components.stream import Stream, create_stream +from homeassistant.components.stream.const import FORMAT_CONTENT_TYPE, OUTPUT_FORMATS from homeassistant.const import ( ATTR_ENTITY_ID, CONF_FILENAME, @@ -53,7 +45,13 @@ from homeassistant.helpers.entity_component import EntityComponent from homeassistant.helpers.network import get_url from homeassistant.loader import bind_hass -from .const import DATA_CAMERA_PREFS, DOMAIN +from .const import ( + CONF_DURATION, + CONF_LOOKBACK, + DATA_CAMERA_PREFS, + DOMAIN, + SERVICE_RECORD, +) from .prefs import CameraPreferences # mypy: allow-untyped-calls, allow-untyped-defs @@ -130,23 +128,7 @@ class Image: async def async_request_stream(hass, entity_id, fmt): """Request a stream for a camera entity.""" camera = _get_camera_from_entity_id(hass, entity_id) - camera_prefs = hass.data[DATA_CAMERA_PREFS].get(entity_id) - - async with async_timeout.timeout(10): - source = await camera.stream_source() - - if not source: - raise HomeAssistantError( - f"{camera.entity_id} does not support play stream service" - ) - - return request_stream( - hass, - source, - fmt=fmt, - keepalive=camera_prefs.preload_stream, - options=camera.stream_options, - ) + return await _async_stream_endpoint_url(hass, camera, fmt) @bind_hass @@ -267,14 +249,11 @@ async def async_setup(hass, config): camera_prefs = prefs.get(camera.entity_id) if not camera_prefs.preload_stream: continue - - async with async_timeout.timeout(10): - source = await camera.stream_source() - - if not source: + stream = await camera.create_stream() + if not stream: continue - - request_stream(hass, source, keepalive=True, options=camera.stream_options) + stream.add_provider("hls") + stream.start() hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, preload_stream) @@ -330,6 +309,7 @@ class Camera(Entity): def __init__(self): """Initialize a camera.""" self.is_streaming = False + self.stream = None self.stream_options = {} self.content_type = DEFAULT_CONTENT_TYPE self.access_tokens: collections.deque = collections.deque([], 2) @@ -375,6 +355,17 @@ class Camera(Entity): """Return the interval between frames of the mjpeg stream.""" return 0.5 + async def create_stream(self) -> Stream: + """Create a Stream for stream_source.""" + # There is at most one stream (a decode worker) per camera + if not self.stream: + async with async_timeout.timeout(10): + source = await self.stream_source() + if not source: + return None + self.stream = create_stream(self.hass, source, options=self.stream_options) + return self.stream + async def stream_source(self): """Return the source of the stream.""" return None @@ -586,24 +577,7 @@ async def ws_camera_stream(hass, connection, msg): try: entity_id = msg["entity_id"] camera = _get_camera_from_entity_id(hass, entity_id) - camera_prefs = hass.data[DATA_CAMERA_PREFS].get(entity_id) - - async with async_timeout.timeout(10): - source = await camera.stream_source() - - if not source: - raise HomeAssistantError( - f"{camera.entity_id} does not support play stream service" - ) - - fmt = msg["format"] - url = request_stream( - hass, - source, - fmt=fmt, - keepalive=camera_prefs.preload_stream, - options=camera.stream_options, - ) + url = await _async_stream_endpoint_url(hass, camera, fmt=msg["format"]) connection.send_result(msg["id"], {"url": url}) except HomeAssistantError as ex: _LOGGER.error("Error requesting stream: %s", ex) @@ -676,32 +650,17 @@ async def async_handle_snapshot_service(camera, service): async def async_handle_play_stream_service(camera, service_call): """Handle play stream services calls.""" - async with async_timeout.timeout(10): - source = await camera.stream_source() - - if not source: - raise HomeAssistantError( - f"{camera.entity_id} does not support play stream service" - ) + fmt = service_call.data[ATTR_FORMAT] + url = await _async_stream_endpoint_url(camera.hass, camera, fmt) hass = camera.hass - camera_prefs = hass.data[DATA_CAMERA_PREFS].get(camera.entity_id) - fmt = service_call.data[ATTR_FORMAT] - entity_ids = service_call.data[ATTR_MEDIA_PLAYER] - - url = request_stream( - hass, - source, - fmt=fmt, - keepalive=camera_prefs.preload_stream, - options=camera.stream_options, - ) data = { ATTR_MEDIA_CONTENT_ID: f"{get_url(hass)}{url}", ATTR_MEDIA_CONTENT_TYPE: FORMAT_CONTENT_TYPE[fmt], } # It is required to send a different payload for cast media players + entity_ids = service_call.data[ATTR_MEDIA_PLAYER] cast_entity_ids = [ entity for entity, source in entity_sources(hass).items() @@ -740,12 +699,28 @@ async def async_handle_play_stream_service(camera, service_call): ) +async def _async_stream_endpoint_url(hass, camera, fmt): + stream = await camera.create_stream() + if not stream: + raise HomeAssistantError( + f"{camera.entity_id} does not support play stream service" + ) + + # Update keepalive setting which manages idle shutdown + camera_prefs = hass.data[DATA_CAMERA_PREFS].get(camera.entity_id) + stream.keepalive = camera_prefs.preload_stream + + stream.add_provider(fmt) + stream.start() + return stream.endpoint_url(fmt) + + async def async_handle_record_service(camera, call): """Handle stream recording service calls.""" async with async_timeout.timeout(10): - source = await camera.stream_source() + stream = await camera.create_stream() - if not source: + if not stream: raise HomeAssistantError(f"{camera.entity_id} does not support record service") hass = camera.hass @@ -753,13 +728,6 @@ async def async_handle_record_service(camera, call): filename.hass = hass video_path = filename.async_render(variables={ATTR_ENTITY_ID: camera}) - data = { - CONF_STREAM_SOURCE: source, - CONF_FILENAME: video_path, - CONF_DURATION: call.data[CONF_DURATION], - CONF_LOOKBACK: call.data[CONF_LOOKBACK], - } - - await hass.services.async_call( - DOMAIN_STREAM, SERVICE_RECORD, data, blocking=True, context=call.context + await stream.async_record( + video_path, duration=call.data[CONF_DURATION], lookback=call.data[CONF_LOOKBACK] ) diff --git a/homeassistant/components/camera/const.py b/homeassistant/components/camera/const.py index 563f0554f0f..615e54b0eca 100644 --- a/homeassistant/components/camera/const.py +++ b/homeassistant/components/camera/const.py @@ -4,3 +4,8 @@ DOMAIN = "camera" DATA_CAMERA_PREFS = "camera_prefs" PREF_PRELOAD_STREAM = "preload_stream" + +SERVICE_RECORD = "record" + +CONF_LOOKBACK = "lookback" +CONF_DURATION = "duration" diff --git a/homeassistant/components/nest/camera_sdm.py b/homeassistant/components/nest/camera_sdm.py index aa8e100059a..8f5ba88fdd4 100644 --- a/homeassistant/components/nest/camera_sdm.py +++ b/homeassistant/components/nest/camera_sdm.py @@ -146,6 +146,13 @@ class NestCamera(Camera): # Next attempt to catch a url will get a new one self._stream = None return + # Stop any existing stream worker since the url is invalid. The next + # request for this stream will restart it with the right url. + # Issue #42793 tracks improvements (e.g. preserve keepalive, smoother + # transitions across streams) + if self.stream: + self.stream.stop() + self.stream = None self._schedule_stream_refresh() async def async_will_remove_from_hass(self): diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 6980f7ead8f..a8b344a98e9 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -1,28 +1,35 @@ -"""Provide functionality to stream video source.""" +"""Provide functionality to stream video source. + +Components use create_stream with a stream source (e.g. an rtsp url) to create +a new Stream object. Stream manages: + - Background work to fetch and decode a stream + - Desired output formats + - Home Assistant URLs for viewing a stream + - Access tokens for URLs for viewing a stream + +A Stream consists of a background worker, and one or more output formats each +with their own idle timeout managed by the stream component. When an output +format is no longer in use, the stream component will expire it. When there +are no active output formats, the background worker is shut down and access +tokens are expired. Alternatively, a Stream can be configured with keepalive +to always keep workers active. +""" import logging import secrets import threading import time from types import MappingProxyType -import voluptuous as vol - -from homeassistant.const import CONF_FILENAME, EVENT_HOMEASSISTANT_STOP +from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.core import callback from homeassistant.exceptions import HomeAssistantError -import homeassistant.helpers.config_validation as cv -from homeassistant.loader import bind_hass from .const import ( ATTR_ENDPOINTS, ATTR_STREAMS, - CONF_DURATION, - CONF_LOOKBACK, - CONF_STREAM_SOURCE, DOMAIN, MAX_SEGMENTS, OUTPUT_IDLE_TIMEOUT, - SERVICE_RECORD, STREAM_RESTART_INCREMENT, STREAM_RESTART_RESET_TIME, ) @@ -31,20 +38,13 @@ from .hls import async_setup_hls _LOGGER = logging.getLogger(__name__) -STREAM_SERVICE_SCHEMA = vol.Schema({vol.Required(CONF_STREAM_SOURCE): cv.string}) -SERVICE_RECORD_SCHEMA = STREAM_SERVICE_SCHEMA.extend( - { - vol.Required(CONF_FILENAME): cv.string, - vol.Optional(CONF_DURATION, default=30): int, - vol.Optional(CONF_LOOKBACK, default=0): int, - } -) +def create_stream(hass, stream_source, options=None): + """Create a stream with the specified identfier based on the source url. - -@bind_hass -def request_stream(hass, stream_source, *, fmt="hls", keepalive=False, options=None): - """Set up stream with token.""" + The stream_source is typically an rtsp url and options are passed into + pyav / ffmpeg as options. + """ if DOMAIN not in hass.config.components: raise HomeAssistantError("Stream integration is not set up.") @@ -59,25 +59,9 @@ def request_stream(hass, stream_source, *, fmt="hls", keepalive=False, options=N **options, } - try: - streams = hass.data[DOMAIN][ATTR_STREAMS] - stream = streams.get(stream_source) - if not stream: - stream = Stream(hass, stream_source, options=options, keepalive=keepalive) - streams[stream_source] = stream - else: - # Update keepalive option on existing stream - stream.keepalive = keepalive - - # Add provider - stream.add_provider(fmt) - - if not stream.access_token: - stream.access_token = secrets.token_hex() - stream.start() - return hass.data[DOMAIN][ATTR_ENDPOINTS][fmt].format(stream.access_token) - except Exception as err: - raise HomeAssistantError("Unable to get stream") from err + stream = Stream(hass, stream_source, options=options) + hass.data[DOMAIN][ATTR_STREAMS].append(stream) + return stream async def async_setup(hass, config): @@ -92,7 +76,7 @@ async def async_setup(hass, config): hass.data[DOMAIN] = {} hass.data[DOMAIN][ATTR_ENDPOINTS] = {} - hass.data[DOMAIN][ATTR_STREAMS] = {} + hass.data[DOMAIN][ATTR_STREAMS] = [] # Setup HLS hls_endpoint = async_setup_hls(hass) @@ -104,33 +88,25 @@ async def async_setup(hass, config): @callback def shutdown(event): """Stop all stream workers.""" - for stream in hass.data[DOMAIN][ATTR_STREAMS].values(): + for stream in hass.data[DOMAIN][ATTR_STREAMS]: stream.keepalive = False stream.stop() _LOGGER.info("Stopped stream workers") hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) - async def async_record(call): - """Call record stream service handler.""" - await async_handle_record_service(hass, call) - - hass.services.async_register( - DOMAIN, SERVICE_RECORD, async_record, schema=SERVICE_RECORD_SCHEMA - ) - return True class Stream: """Represents a single stream.""" - def __init__(self, hass, source, options=None, keepalive=False): + def __init__(self, hass, source, options=None): """Initialize a stream.""" self.hass = hass self.source = source self.options = options - self.keepalive = keepalive + self.keepalive = False self.access_token = None self._thread = None self._thread_quit = None @@ -139,6 +115,14 @@ class Stream: if self.options is None: self.options = {} + def endpoint_url(self, fmt): + """Start the stream and returns a url for the output format.""" + if fmt not in self._outputs: + raise ValueError(f"Stream is not configured for format '{fmt}'") + if not self.access_token: + self.access_token = secrets.token_hex() + return self.hass.data[DOMAIN][ATTR_ENDPOINTS][fmt].format(self.access_token) + @property def outputs(self): """Return a copy of the stream outputs.""" @@ -244,39 +228,28 @@ class Stream: self._thread = None _LOGGER.info("Stopped stream: %s", self.source) + async def async_record(self, video_path, duration=30, lookback=5): + """Make a .mp4 recording from a provided stream.""" -async def async_handle_record_service(hass, call): - """Handle save video service calls.""" - stream_source = call.data[CONF_STREAM_SOURCE] - video_path = call.data[CONF_FILENAME] - duration = call.data[CONF_DURATION] - lookback = call.data[CONF_LOOKBACK] + # Check for file access + if not self.hass.config.is_allowed_path(video_path): + raise HomeAssistantError(f"Can't write {video_path}, no access to path!") - # Check for file access - if not hass.config.is_allowed_path(video_path): - raise HomeAssistantError(f"Can't write {video_path}, no access to path!") + # Add recorder + recorder = self.outputs.get("recorder") + if recorder: + raise HomeAssistantError( + f"Stream already recording to {recorder.video_path}!" + ) + recorder = self.add_provider("recorder", timeout=duration) + recorder.video_path = video_path - # Check for active stream - streams = hass.data[DOMAIN][ATTR_STREAMS] - stream = streams.get(stream_source) - if not stream: - stream = Stream(hass, stream_source) - streams[stream_source] = stream + self.start() - # Add recorder - recorder = stream.outputs.get("recorder") - if recorder: - raise HomeAssistantError(f"Stream already recording to {recorder.video_path}!") - - recorder = stream.add_provider("recorder", timeout=duration) - recorder.video_path = video_path - - stream.start() - - # Take advantage of lookback - hls = stream.outputs.get("hls") - if lookback > 0 and hls: - num_segments = min(int(lookback // hls.target_duration), MAX_SEGMENTS) - # Wait for latest segment, then add the lookback - await hls.recv() - recorder.prepend(list(hls.get_segment())[-num_segments:]) + # Take advantage of lookback + hls = self.outputs.get("hls") + if lookback > 0 and hls: + num_segments = min(int(lookback // hls.target_duration), MAX_SEGMENTS) + # Wait for latest segment, then add the lookback + await hls.recv() + recorder.prepend(list(hls.get_segment())[-num_segments:]) diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index 45fa3d9e76a..4ee9f2a9814 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -1,15 +1,8 @@ """Constants for Stream component.""" DOMAIN = "stream" -CONF_STREAM_SOURCE = "stream_source" -CONF_LOOKBACK = "lookback" -CONF_DURATION = "duration" - ATTR_ENDPOINTS = "endpoints" ATTR_STREAMS = "streams" -ATTR_KEEPALIVE = "keepalive" - -SERVICE_RECORD = "record" OUTPUT_FORMATS = ["hls"] diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 5427172a55c..31c7940b8e1 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -194,11 +194,7 @@ class StreamView(HomeAssistantView): hass = request.app["hass"] stream = next( - ( - s - for s in hass.data[DOMAIN][ATTR_STREAMS].values() - if s.access_token == token - ), + (s for s in hass.data[DOMAIN][ATTR_STREAMS] if s.access_token == token), None, ) diff --git a/homeassistant/components/stream/services.yaml b/homeassistant/components/stream/services.yaml deleted file mode 100644 index a8652335bf1..00000000000 --- a/homeassistant/components/stream/services.yaml +++ /dev/null @@ -1,15 +0,0 @@ -record: - description: Make a .mp4 recording from a provided stream. - fields: - stream_source: - description: The input source for the stream. - example: "rtsp://my.stream.feed:554" - filename: - description: The file name string. - example: "/tmp/my_stream.mp4" - duration: - description: "Target recording length (in seconds). Default: 30" - example: 30 - lookback: - description: "Target lookback period (in seconds) to include in addition to duration. Only available if there is currently an active HLS stream for stream_source. Default: 0" - example: 5 diff --git a/tests/components/camera/test_init.py b/tests/components/camera/test_init.py index 2c2d744deb9..340a4b5d756 100644 --- a/tests/components/camera/test_init.py +++ b/tests/components/camera/test_init.py @@ -155,25 +155,20 @@ async def test_websocket_camera_thumbnail(hass, hass_ws_client, mock_camera): async def test_websocket_stream_no_source( hass, hass_ws_client, mock_camera, mock_stream ): - """Test camera/stream websocket command.""" + """Test camera/stream websocket command with camera with no source.""" await async_setup_component(hass, "camera", {}) - with patch( - "homeassistant.components.camera.request_stream", - return_value="http://home.assistant/playlist.m3u8", - ) as mock_request_stream: - # Request playlist through WebSocket - client = await hass_ws_client(hass) - await client.send_json( - {"id": 6, "type": "camera/stream", "entity_id": "camera.demo_camera"} - ) - msg = await client.receive_json() + # Request playlist through WebSocket + client = await hass_ws_client(hass) + await client.send_json( + {"id": 6, "type": "camera/stream", "entity_id": "camera.demo_camera"} + ) + msg = await client.receive_json() - # Assert WebSocket response - assert not mock_request_stream.called - assert msg["id"] == 6 - assert msg["type"] == TYPE_RESULT - assert not msg["success"] + # Assert WebSocket response + assert msg["id"] == 6 + assert msg["type"] == TYPE_RESULT + assert not msg["success"] async def test_websocket_camera_stream(hass, hass_ws_client, mock_camera, mock_stream): @@ -181,9 +176,9 @@ async def test_websocket_camera_stream(hass, hass_ws_client, mock_camera, mock_s await async_setup_component(hass, "camera", {}) with patch( - "homeassistant.components.camera.request_stream", + "homeassistant.components.camera.Stream.endpoint_url", return_value="http://home.assistant/playlist.m3u8", - ) as mock_request_stream, patch( + ) as mock_stream_view_url, patch( "homeassistant.components.demo.camera.DemoCamera.stream_source", return_value="http://example.com", ): @@ -195,7 +190,7 @@ async def test_websocket_camera_stream(hass, hass_ws_client, mock_camera, mock_s msg = await client.receive_json() # Assert WebSocket response - assert mock_request_stream.called + assert mock_stream_view_url.called assert msg["id"] == 6 assert msg["type"] == TYPE_RESULT assert msg["success"] @@ -248,9 +243,7 @@ async def test_play_stream_service_no_source(hass, mock_camera, mock_stream): ATTR_ENTITY_ID: "camera.demo_camera", camera.ATTR_MEDIA_PLAYER: "media_player.test", } - with patch("homeassistant.components.camera.request_stream"), pytest.raises( - HomeAssistantError - ): + with pytest.raises(HomeAssistantError): # Call service await hass.services.async_call( camera.DOMAIN, camera.SERVICE_PLAY_STREAM, data, blocking=True @@ -265,7 +258,7 @@ async def test_handle_play_stream_service(hass, mock_camera, mock_stream): ) await async_setup_component(hass, "media_player", {}) with patch( - "homeassistant.components.camera.request_stream" + "homeassistant.components.camera.Stream.endpoint_url", ) as mock_request_stream, patch( "homeassistant.components.demo.camera.DemoCamera.stream_source", return_value="http://example.com", @@ -289,7 +282,7 @@ async def test_no_preload_stream(hass, mock_stream): """Test camera preload preference.""" demo_prefs = CameraEntityPreferences({PREF_PRELOAD_STREAM: False}) with patch( - "homeassistant.components.camera.request_stream" + "homeassistant.components.camera.Stream.endpoint_url", ) as mock_request_stream, patch( "homeassistant.components.camera.prefs.CameraPreferences.get", return_value=demo_prefs, @@ -308,8 +301,8 @@ async def test_preload_stream(hass, mock_stream): """Test camera preload preference.""" demo_prefs = CameraEntityPreferences({PREF_PRELOAD_STREAM: True}) with patch( - "homeassistant.components.camera.request_stream" - ) as mock_request_stream, patch( + "homeassistant.components.camera.create_stream" + ) as mock_create_stream, patch( "homeassistant.components.camera.prefs.CameraPreferences.get", return_value=demo_prefs, ), patch( @@ -322,7 +315,7 @@ async def test_preload_stream(hass, mock_stream): await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_START) await hass.async_block_till_done() - assert mock_request_stream.called + assert mock_create_stream.called async def test_record_service_invalid_path(hass, mock_camera): @@ -348,10 +341,9 @@ async def test_record_service(hass, mock_camera, mock_stream): "homeassistant.components.demo.camera.DemoCamera.stream_source", return_value="http://example.com", ), patch( - "homeassistant.components.stream.async_handle_record_service", - ) as mock_record_service, patch.object( - hass.config, "is_allowed_path", return_value=True - ): + "homeassistant.components.stream.Stream.async_record", + autospec=True, + ) as mock_record: # Call service await hass.services.async_call( camera.DOMAIN, @@ -361,4 +353,4 @@ async def test_record_service(hass, mock_camera, mock_stream): ) # So long as we call stream.record, the rest should be covered # by those tests. - assert mock_record_service.called + assert mock_record.called diff --git a/tests/components/generic/test_camera.py b/tests/components/generic/test_camera.py index 9a147995541..65f5306c4d8 100644 --- a/tests/components/generic/test_camera.py +++ b/tests/components/generic/test_camera.py @@ -176,17 +176,18 @@ async def test_stream_source(aioclient_mock, hass, hass_client, hass_ws_client): "still_image_url": "https://example.com", "stream_source": 'http://example.com/{{ states.sensor.temp.state + "a" }}', "limit_refetch_to_url_change": True, - } + }, }, ) + assert await async_setup_component(hass, "stream", {}) await hass.async_block_till_done() hass.states.async_set("sensor.temp", "5") with patch( - "homeassistant.components.camera.request_stream", + "homeassistant.components.camera.Stream.endpoint_url", return_value="http://home.assistant/playlist.m3u8", - ) as mock_request_stream: + ) as mock_stream_url: # Request playlist through WebSocket client = await hass_ws_client(hass) @@ -196,25 +197,47 @@ async def test_stream_source(aioclient_mock, hass, hass_client, hass_ws_client): msg = await client.receive_json() # Assert WebSocket response - assert mock_request_stream.call_count == 1 - assert mock_request_stream.call_args[0][1] == "http://example.com/5a" + assert mock_stream_url.call_count == 1 assert msg["id"] == 1 assert msg["type"] == TYPE_RESULT assert msg["success"] assert msg["result"]["url"][-13:] == "playlist.m3u8" - # Cause a template render error - hass.states.async_remove("sensor.temp") + +async def test_stream_source_error(aioclient_mock, hass, hass_client, hass_ws_client): + """Test that the stream source has an error.""" + assert await async_setup_component( + hass, + "camera", + { + "camera": { + "name": "config_test", + "platform": "generic", + "still_image_url": "https://example.com", + # Does not exist + "stream_source": 'http://example.com/{{ states.sensor.temp.state + "a" }}', + "limit_refetch_to_url_change": True, + }, + }, + ) + assert await async_setup_component(hass, "stream", {}) + await hass.async_block_till_done() + + with patch( + "homeassistant.components.camera.Stream.endpoint_url", + return_value="http://home.assistant/playlist.m3u8", + ) as mock_stream_url: + # Request playlist through WebSocket + client = await hass_ws_client(hass) await client.send_json( - {"id": 2, "type": "camera/stream", "entity_id": "camera.config_test"} + {"id": 1, "type": "camera/stream", "entity_id": "camera.config_test"} ) msg = await client.receive_json() - # Assert that no new call to the stream request should have been made - assert mock_request_stream.call_count == 1 - # Assert the websocket error message - assert msg["id"] == 2 + # Assert WebSocket response + assert mock_stream_url.call_count == 0 + assert msg["id"] == 1 assert msg["type"] == TYPE_RESULT assert msg["success"] is False assert msg["error"] == { @@ -240,7 +263,7 @@ async def test_no_stream_source(aioclient_mock, hass, hass_client, hass_ws_clien await hass.async_block_till_done() with patch( - "homeassistant.components.camera.request_stream", + "homeassistant.components.camera.Stream.endpoint_url", return_value="http://home.assistant/playlist.m3u8", ) as mock_request_stream: # Request playlist through WebSocket diff --git a/tests/components/nest/camera_sdm_test.py b/tests/components/nest/camera_sdm_test.py index 117c8e97884..956d6036aed 100644 --- a/tests/components/nest/camera_sdm_test.py +++ b/tests/components/nest/camera_sdm_test.py @@ -16,6 +16,7 @@ import pytest from homeassistant.components import camera from homeassistant.components.camera import STATE_IDLE from homeassistant.exceptions import HomeAssistantError +from homeassistant.setup import async_setup_component from homeassistant.util.dt import utcnow from .common import async_setup_sdm_platform @@ -245,12 +246,17 @@ async def test_refresh_expired_stream_token(hass, auth): DEVICE_TRAITS, auth=auth, ) + assert await async_setup_component(hass, "stream", {}) assert len(hass.states.async_all()) == 1 cam = hass.states.get("camera.my_camera") assert cam is not None assert cam.state == STATE_IDLE + # Request a stream for the camera entity to exercise nest cam + camera interaction + # and shutdown on url expiration + await camera.async_request_stream(hass, cam.entity_id, "hls") + stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") assert stream_source == "rtsp://some/url?auth=g.1.streamingToken" diff --git a/tests/components/stream/common.py b/tests/components/stream/common.py index c99cdef7984..5ec4f4217ce 100644 --- a/tests/components/stream/common.py +++ b/tests/components/stream/common.py @@ -5,9 +5,6 @@ import io import av import numpy as np -from homeassistant.components.stream import Stream -from homeassistant.components.stream.const import ATTR_STREAMS, DOMAIN - AUDIO_SAMPLE_RATE = 8000 @@ -93,10 +90,3 @@ def generate_h264_video(container_format="mp4", audio_codec=None): output.seek(0) return output - - -def preload_stream(hass, stream_source): - """Preload a stream for use in tests.""" - stream = Stream(hass, stream_source) - hass.data[DOMAIN][ATTR_STREAMS][stream_source] = stream - return stream diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index ab49a56ca02..b575b3877fa 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -5,13 +5,13 @@ from urllib.parse import urlparse import av -from homeassistant.components.stream import request_stream +from homeassistant.components.stream import create_stream from homeassistant.const import HTTP_NOT_FOUND from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util from tests.common import async_fire_time_changed -from tests.components.stream.common import generate_h264_video, preload_stream +from tests.components.stream.common import generate_h264_video async def test_hls_stream(hass, hass_client, stream_worker_sync): @@ -27,11 +27,12 @@ async def test_hls_stream(hass, hass_client, stream_worker_sync): # Setup demo HLS track source = generate_h264_video() - stream = preload_stream(hass, source) - stream.add_provider("hls") + stream = create_stream(hass, source) # Request stream - url = request_stream(hass, source) + stream.add_provider("hls") + stream.start() + url = stream.endpoint_url("hls") http_client = await hass_client() @@ -72,11 +73,12 @@ async def test_stream_timeout(hass, hass_client, stream_worker_sync): # Setup demo HLS track source = generate_h264_video() - stream = preload_stream(hass, source) - stream.add_provider("hls") + stream = create_stream(hass, source) # Request stream - url = request_stream(hass, source) + stream.add_provider("hls") + stream.start() + url = stream.endpoint_url("hls") http_client = await hass_client() @@ -113,11 +115,13 @@ async def test_stream_ended(hass, stream_worker_sync): # Setup demo HLS track source = generate_h264_video() - stream = preload_stream(hass, source) + stream = create_stream(hass, source) track = stream.add_provider("hls") # Request stream - request_stream(hass, source) + stream.add_provider("hls") + stream.start() + stream.endpoint_url("hls") # Run it dead while True: @@ -142,9 +146,10 @@ async def test_stream_keepalive(hass): # Setup demo HLS track source = "test_stream_keepalive_source" - stream = preload_stream(hass, source) + stream = create_stream(hass, source) track = stream.add_provider("hls") track.num_segments = 2 + stream.start() cur_time = 0 @@ -163,7 +168,8 @@ async def test_stream_keepalive(hass): av_open.side_effect = av.error.InvalidDataError(-2, "error") mock_time.time.side_effect = time_side_effect # Request stream - request_stream(hass, source, keepalive=True) + stream.keepalive = True + stream.start() stream._thread.join() stream._thread = None assert av_open.call_count == 2 diff --git a/tests/components/stream/test_init.py b/tests/components/stream/test_init.py deleted file mode 100644 index 2e13493b641..00000000000 --- a/tests/components/stream/test_init.py +++ /dev/null @@ -1,86 +0,0 @@ -"""The tests for stream.""" -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest - -from homeassistant.components.stream.const import ( - ATTR_STREAMS, - CONF_LOOKBACK, - CONF_STREAM_SOURCE, - DOMAIN, - SERVICE_RECORD, -) -from homeassistant.const import CONF_FILENAME -from homeassistant.exceptions import HomeAssistantError -from homeassistant.setup import async_setup_component - - -async def test_record_service_invalid_file(hass): - """Test record service call with invalid file.""" - await async_setup_component(hass, "stream", {"stream": {}}) - data = {CONF_STREAM_SOURCE: "rtsp://my.video", CONF_FILENAME: "/my/invalid/path"} - with pytest.raises(HomeAssistantError): - await hass.services.async_call(DOMAIN, SERVICE_RECORD, data, blocking=True) - - -async def test_record_service_init_stream(hass): - """Test record service call with invalid file.""" - await async_setup_component(hass, "stream", {"stream": {}}) - data = {CONF_STREAM_SOURCE: "rtsp://my.video", CONF_FILENAME: "/my/invalid/path"} - with patch("homeassistant.components.stream.Stream") as stream_mock, patch.object( - hass.config, "is_allowed_path", return_value=True - ): - # Setup stubs - stream_mock.return_value.outputs = {} - - # Call Service - await hass.services.async_call(DOMAIN, SERVICE_RECORD, data, blocking=True) - - # Assert - assert stream_mock.called - - -async def test_record_service_existing_record_session(hass): - """Test record service call with invalid file.""" - await async_setup_component(hass, "stream", {"stream": {}}) - source = "rtsp://my.video" - data = {CONF_STREAM_SOURCE: source, CONF_FILENAME: "/my/invalid/path"} - - # Setup stubs - stream_mock = MagicMock() - stream_mock.return_value.outputs = {"recorder": MagicMock()} - hass.data[DOMAIN][ATTR_STREAMS][source] = stream_mock - - with patch.object(hass.config, "is_allowed_path", return_value=True), pytest.raises( - HomeAssistantError - ): - # Call Service - await hass.services.async_call(DOMAIN, SERVICE_RECORD, data, blocking=True) - - -async def test_record_service_lookback(hass): - """Test record service call with invalid file.""" - await async_setup_component(hass, "stream", {"stream": {}}) - data = { - CONF_STREAM_SOURCE: "rtsp://my.video", - CONF_FILENAME: "/my/invalid/path", - CONF_LOOKBACK: 4, - } - - with patch("homeassistant.components.stream.Stream") as stream_mock, patch.object( - hass.config, "is_allowed_path", return_value=True - ): - # Setup stubs - hls_mock = MagicMock() - hls_mock.target_duration = 2 - hls_mock.recv = AsyncMock(return_value=None) - stream_mock.return_value.outputs = {"hls": hls_mock} - - # Call Service - await hass.services.async_call(DOMAIN, SERVICE_RECORD, data, blocking=True) - - assert stream_mock.called - stream_mock.return_value.add_provider.assert_called_once_with( - "recorder", timeout=30 - ) - assert hls_mock.recv.called diff --git a/tests/components/stream/test_recorder.py b/tests/components/stream/test_recorder.py index bda53a9cc17..9d418c360b1 100644 --- a/tests/components/stream/test_recorder.py +++ b/tests/components/stream/test_recorder.py @@ -8,13 +8,15 @@ from unittest.mock import patch import av import pytest +from homeassistant.components.stream import create_stream from homeassistant.components.stream.core import Segment from homeassistant.components.stream.recorder import recorder_save_worker +from homeassistant.exceptions import HomeAssistantError from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util from tests.common import async_fire_time_changed -from tests.components.stream.common import generate_h264_video, preload_stream +from tests.components.stream.common import generate_h264_video TEST_TIMEOUT = 10 @@ -75,10 +77,11 @@ async def test_record_stream(hass, hass_client, stream_worker_sync, record_worke # Setup demo track source = generate_h264_video() - stream = preload_stream(hass, source) - recorder = stream.add_provider("recorder") - stream.start() + stream = create_stream(hass, source) + with patch.object(hass.config, "is_allowed_path", return_value=True): + await stream.async_record("/example/path") + recorder = stream.add_provider("recorder") while True: segment = await recorder.recv() if not segment: @@ -95,6 +98,27 @@ async def test_record_stream(hass, hass_client, stream_worker_sync, record_worke record_worker_sync.join() +async def test_record_lookback( + hass, hass_client, stream_worker_sync, record_worker_sync +): + """Exercise record with loopback.""" + await async_setup_component(hass, "stream", {"stream": {}}) + + source = generate_h264_video() + stream = create_stream(hass, source) + + # Start an HLS feed to enable lookback + stream.add_provider("hls") + stream.start() + + with patch.object(hass.config, "is_allowed_path", return_value=True): + await stream.async_record("/example/path", lookback=4) + + # This test does not need recorder cleanup since it is not fully exercised + + stream.stop() + + async def test_recorder_timeout(hass, hass_client, stream_worker_sync): """ Test recorder timeout. @@ -109,9 +133,11 @@ async def test_recorder_timeout(hass, hass_client, stream_worker_sync): with patch("homeassistant.components.stream.IdleTimer.fire") as mock_timeout: # Setup demo track source = generate_h264_video() - stream = preload_stream(hass, source) - recorder = stream.add_provider("recorder", timeout=30) - stream.start() + + stream = create_stream(hass, source) + with patch.object(hass.config, "is_allowed_path", return_value=True): + await stream.async_record("/example/path") + recorder = stream.add_provider("recorder") await recorder.recv() @@ -128,6 +154,19 @@ async def test_recorder_timeout(hass, hass_client, stream_worker_sync): await hass.async_block_till_done() +async def test_record_path_not_allowed(hass, hass_client): + """Test where the output path is not allowed by home assistant configuration.""" + await async_setup_component(hass, "stream", {"stream": {}}) + + # Setup demo track + source = generate_h264_video() + stream = create_stream(hass, source) + with patch.object( + hass.config, "is_allowed_path", return_value=False + ), pytest.raises(HomeAssistantError): + await stream.async_record("/example/path") + + async def test_recorder_save(tmpdir): """Test recorder save.""" # Setup @@ -165,9 +204,10 @@ async def test_record_stream_audio( source = generate_h264_video( container_format="mov", audio_codec=a_codec ) # mov can store PCM - stream = preload_stream(hass, source) + stream = create_stream(hass, source) + with patch.object(hass.config, "is_allowed_path", return_value=True): + await stream.async_record("/example/path") recorder = stream.add_provider("recorder") - stream.start() while True: segment = await recorder.recv()