diff --git a/homeassistant/components/camera/__init__.py b/homeassistant/components/camera/__init__.py index 25505800709..cbc98edf19b 100644 --- a/homeassistant/components/camera/__init__.py +++ b/homeassistant/components/camera/__init__.py @@ -23,16 +23,8 @@ from homeassistant.components.media_player.const import ( DOMAIN as DOMAIN_MP, SERVICE_PLAY_MEDIA, ) -from homeassistant.components.stream import request_stream -from homeassistant.components.stream.const import ( - CONF_DURATION, - CONF_LOOKBACK, - CONF_STREAM_SOURCE, - DOMAIN as DOMAIN_STREAM, - FORMAT_CONTENT_TYPE, - OUTPUT_FORMATS, - SERVICE_RECORD, -) +from homeassistant.components.stream import Stream, create_stream +from homeassistant.components.stream.const import FORMAT_CONTENT_TYPE, OUTPUT_FORMATS from homeassistant.const import ( ATTR_ENTITY_ID, CONF_FILENAME, @@ -53,7 +45,13 @@ from homeassistant.helpers.entity_component import EntityComponent from homeassistant.helpers.network import get_url from homeassistant.loader import bind_hass -from .const import DATA_CAMERA_PREFS, DOMAIN +from .const import ( + CONF_DURATION, + CONF_LOOKBACK, + DATA_CAMERA_PREFS, + DOMAIN, + SERVICE_RECORD, +) from .prefs import CameraPreferences # mypy: allow-untyped-calls, allow-untyped-defs @@ -130,23 +128,7 @@ class Image: async def async_request_stream(hass, entity_id, fmt): """Request a stream for a camera entity.""" camera = _get_camera_from_entity_id(hass, entity_id) - camera_prefs = hass.data[DATA_CAMERA_PREFS].get(entity_id) - - async with async_timeout.timeout(10): - source = await camera.stream_source() - - if not source: - raise HomeAssistantError( - f"{camera.entity_id} does not support play stream service" - ) - - return request_stream( - hass, - source, - fmt=fmt, - keepalive=camera_prefs.preload_stream, - options=camera.stream_options, - ) + return await _async_stream_endpoint_url(hass, camera, fmt) @bind_hass @@ -267,14 +249,11 @@ async def async_setup(hass, config): camera_prefs = prefs.get(camera.entity_id) if not camera_prefs.preload_stream: continue - - async with async_timeout.timeout(10): - source = await camera.stream_source() - - if not source: + stream = await camera.create_stream() + if not stream: continue - - request_stream(hass, source, keepalive=True, options=camera.stream_options) + stream.add_provider("hls") + stream.start() hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, preload_stream) @@ -330,6 +309,7 @@ class Camera(Entity): def __init__(self): """Initialize a camera.""" self.is_streaming = False + self.stream = None self.stream_options = {} self.content_type = DEFAULT_CONTENT_TYPE self.access_tokens: collections.deque = collections.deque([], 2) @@ -375,6 +355,17 @@ class Camera(Entity): """Return the interval between frames of the mjpeg stream.""" return 0.5 + async def create_stream(self) -> Stream: + """Create a Stream for stream_source.""" + # There is at most one stream (a decode worker) per camera + if not self.stream: + async with async_timeout.timeout(10): + source = await self.stream_source() + if not source: + return None + self.stream = create_stream(self.hass, source, options=self.stream_options) + return self.stream + async def stream_source(self): """Return the source of the stream.""" return None @@ -586,24 +577,7 @@ async def ws_camera_stream(hass, connection, msg): try: entity_id = msg["entity_id"] camera = _get_camera_from_entity_id(hass, entity_id) - camera_prefs = hass.data[DATA_CAMERA_PREFS].get(entity_id) - - async with async_timeout.timeout(10): - source = await camera.stream_source() - - if not source: - raise HomeAssistantError( - f"{camera.entity_id} does not support play stream service" - ) - - fmt = msg["format"] - url = request_stream( - hass, - source, - fmt=fmt, - keepalive=camera_prefs.preload_stream, - options=camera.stream_options, - ) + url = await _async_stream_endpoint_url(hass, camera, fmt=msg["format"]) connection.send_result(msg["id"], {"url": url}) except HomeAssistantError as ex: _LOGGER.error("Error requesting stream: %s", ex) @@ -676,32 +650,17 @@ async def async_handle_snapshot_service(camera, service): async def async_handle_play_stream_service(camera, service_call): """Handle play stream services calls.""" - async with async_timeout.timeout(10): - source = await camera.stream_source() - - if not source: - raise HomeAssistantError( - f"{camera.entity_id} does not support play stream service" - ) + fmt = service_call.data[ATTR_FORMAT] + url = await _async_stream_endpoint_url(camera.hass, camera, fmt) hass = camera.hass - camera_prefs = hass.data[DATA_CAMERA_PREFS].get(camera.entity_id) - fmt = service_call.data[ATTR_FORMAT] - entity_ids = service_call.data[ATTR_MEDIA_PLAYER] - - url = request_stream( - hass, - source, - fmt=fmt, - keepalive=camera_prefs.preload_stream, - options=camera.stream_options, - ) data = { ATTR_MEDIA_CONTENT_ID: f"{get_url(hass)}{url}", ATTR_MEDIA_CONTENT_TYPE: FORMAT_CONTENT_TYPE[fmt], } # It is required to send a different payload for cast media players + entity_ids = service_call.data[ATTR_MEDIA_PLAYER] cast_entity_ids = [ entity for entity, source in entity_sources(hass).items() @@ -740,12 +699,28 @@ async def async_handle_play_stream_service(camera, service_call): ) +async def _async_stream_endpoint_url(hass, camera, fmt): + stream = await camera.create_stream() + if not stream: + raise HomeAssistantError( + f"{camera.entity_id} does not support play stream service" + ) + + # Update keepalive setting which manages idle shutdown + camera_prefs = hass.data[DATA_CAMERA_PREFS].get(camera.entity_id) + stream.keepalive = camera_prefs.preload_stream + + stream.add_provider(fmt) + stream.start() + return stream.endpoint_url(fmt) + + async def async_handle_record_service(camera, call): """Handle stream recording service calls.""" async with async_timeout.timeout(10): - source = await camera.stream_source() + stream = await camera.create_stream() - if not source: + if not stream: raise HomeAssistantError(f"{camera.entity_id} does not support record service") hass = camera.hass @@ -753,13 +728,6 @@ async def async_handle_record_service(camera, call): filename.hass = hass video_path = filename.async_render(variables={ATTR_ENTITY_ID: camera}) - data = { - CONF_STREAM_SOURCE: source, - CONF_FILENAME: video_path, - CONF_DURATION: call.data[CONF_DURATION], - CONF_LOOKBACK: call.data[CONF_LOOKBACK], - } - - await hass.services.async_call( - DOMAIN_STREAM, SERVICE_RECORD, data, blocking=True, context=call.context + await stream.async_record( + video_path, duration=call.data[CONF_DURATION], lookback=call.data[CONF_LOOKBACK] ) diff --git a/homeassistant/components/camera/const.py b/homeassistant/components/camera/const.py index 563f0554f0f..615e54b0eca 100644 --- a/homeassistant/components/camera/const.py +++ b/homeassistant/components/camera/const.py @@ -4,3 +4,8 @@ DOMAIN = "camera" DATA_CAMERA_PREFS = "camera_prefs" PREF_PRELOAD_STREAM = "preload_stream" + +SERVICE_RECORD = "record" + +CONF_LOOKBACK = "lookback" +CONF_DURATION = "duration" diff --git a/homeassistant/components/nest/camera_sdm.py b/homeassistant/components/nest/camera_sdm.py index aa8e100059a..8f5ba88fdd4 100644 --- a/homeassistant/components/nest/camera_sdm.py +++ b/homeassistant/components/nest/camera_sdm.py @@ -146,6 +146,13 @@ class NestCamera(Camera): # Next attempt to catch a url will get a new one self._stream = None return + # Stop any existing stream worker since the url is invalid. The next + # request for this stream will restart it with the right url. + # Issue #42793 tracks improvements (e.g. preserve keepalive, smoother + # transitions across streams) + if self.stream: + self.stream.stop() + self.stream = None self._schedule_stream_refresh() async def async_will_remove_from_hass(self): diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 6980f7ead8f..a8b344a98e9 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -1,28 +1,35 @@ -"""Provide functionality to stream video source.""" +"""Provide functionality to stream video source. + +Components use create_stream with a stream source (e.g. an rtsp url) to create +a new Stream object. Stream manages: + - Background work to fetch and decode a stream + - Desired output formats + - Home Assistant URLs for viewing a stream + - Access tokens for URLs for viewing a stream + +A Stream consists of a background worker, and one or more output formats each +with their own idle timeout managed by the stream component. When an output +format is no longer in use, the stream component will expire it. When there +are no active output formats, the background worker is shut down and access +tokens are expired. Alternatively, a Stream can be configured with keepalive +to always keep workers active. +""" import logging import secrets import threading import time from types import MappingProxyType -import voluptuous as vol - -from homeassistant.const import CONF_FILENAME, EVENT_HOMEASSISTANT_STOP +from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.core import callback from homeassistant.exceptions import HomeAssistantError -import homeassistant.helpers.config_validation as cv -from homeassistant.loader import bind_hass from .const import ( ATTR_ENDPOINTS, ATTR_STREAMS, - CONF_DURATION, - CONF_LOOKBACK, - CONF_STREAM_SOURCE, DOMAIN, MAX_SEGMENTS, OUTPUT_IDLE_TIMEOUT, - SERVICE_RECORD, STREAM_RESTART_INCREMENT, STREAM_RESTART_RESET_TIME, ) @@ -31,20 +38,13 @@ from .hls import async_setup_hls _LOGGER = logging.getLogger(__name__) -STREAM_SERVICE_SCHEMA = vol.Schema({vol.Required(CONF_STREAM_SOURCE): cv.string}) -SERVICE_RECORD_SCHEMA = STREAM_SERVICE_SCHEMA.extend( - { - vol.Required(CONF_FILENAME): cv.string, - vol.Optional(CONF_DURATION, default=30): int, - vol.Optional(CONF_LOOKBACK, default=0): int, - } -) +def create_stream(hass, stream_source, options=None): + """Create a stream with the specified identfier based on the source url. - -@bind_hass -def request_stream(hass, stream_source, *, fmt="hls", keepalive=False, options=None): - """Set up stream with token.""" + The stream_source is typically an rtsp url and options are passed into + pyav / ffmpeg as options. + """ if DOMAIN not in hass.config.components: raise HomeAssistantError("Stream integration is not set up.") @@ -59,25 +59,9 @@ def request_stream(hass, stream_source, *, fmt="hls", keepalive=False, options=N **options, } - try: - streams = hass.data[DOMAIN][ATTR_STREAMS] - stream = streams.get(stream_source) - if not stream: - stream = Stream(hass, stream_source, options=options, keepalive=keepalive) - streams[stream_source] = stream - else: - # Update keepalive option on existing stream - stream.keepalive = keepalive - - # Add provider - stream.add_provider(fmt) - - if not stream.access_token: - stream.access_token = secrets.token_hex() - stream.start() - return hass.data[DOMAIN][ATTR_ENDPOINTS][fmt].format(stream.access_token) - except Exception as err: - raise HomeAssistantError("Unable to get stream") from err + stream = Stream(hass, stream_source, options=options) + hass.data[DOMAIN][ATTR_STREAMS].append(stream) + return stream async def async_setup(hass, config): @@ -92,7 +76,7 @@ async def async_setup(hass, config): hass.data[DOMAIN] = {} hass.data[DOMAIN][ATTR_ENDPOINTS] = {} - hass.data[DOMAIN][ATTR_STREAMS] = {} + hass.data[DOMAIN][ATTR_STREAMS] = [] # Setup HLS hls_endpoint = async_setup_hls(hass) @@ -104,33 +88,25 @@ async def async_setup(hass, config): @callback def shutdown(event): """Stop all stream workers.""" - for stream in hass.data[DOMAIN][ATTR_STREAMS].values(): + for stream in hass.data[DOMAIN][ATTR_STREAMS]: stream.keepalive = False stream.stop() _LOGGER.info("Stopped stream workers") hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) - async def async_record(call): - """Call record stream service handler.""" - await async_handle_record_service(hass, call) - - hass.services.async_register( - DOMAIN, SERVICE_RECORD, async_record, schema=SERVICE_RECORD_SCHEMA - ) - return True class Stream: """Represents a single stream.""" - def __init__(self, hass, source, options=None, keepalive=False): + def __init__(self, hass, source, options=None): """Initialize a stream.""" self.hass = hass self.source = source self.options = options - self.keepalive = keepalive + self.keepalive = False self.access_token = None self._thread = None self._thread_quit = None @@ -139,6 +115,14 @@ class Stream: if self.options is None: self.options = {} + def endpoint_url(self, fmt): + """Start the stream and returns a url for the output format.""" + if fmt not in self._outputs: + raise ValueError(f"Stream is not configured for format '{fmt}'") + if not self.access_token: + self.access_token = secrets.token_hex() + return self.hass.data[DOMAIN][ATTR_ENDPOINTS][fmt].format(self.access_token) + @property def outputs(self): """Return a copy of the stream outputs.""" @@ -244,39 +228,28 @@ class Stream: self._thread = None _LOGGER.info("Stopped stream: %s", self.source) + async def async_record(self, video_path, duration=30, lookback=5): + """Make a .mp4 recording from a provided stream.""" -async def async_handle_record_service(hass, call): - """Handle save video service calls.""" - stream_source = call.data[CONF_STREAM_SOURCE] - video_path = call.data[CONF_FILENAME] - duration = call.data[CONF_DURATION] - lookback = call.data[CONF_LOOKBACK] + # Check for file access + if not self.hass.config.is_allowed_path(video_path): + raise HomeAssistantError(f"Can't write {video_path}, no access to path!") - # Check for file access - if not hass.config.is_allowed_path(video_path): - raise HomeAssistantError(f"Can't write {video_path}, no access to path!") + # Add recorder + recorder = self.outputs.get("recorder") + if recorder: + raise HomeAssistantError( + f"Stream already recording to {recorder.video_path}!" + ) + recorder = self.add_provider("recorder", timeout=duration) + recorder.video_path = video_path - # Check for active stream - streams = hass.data[DOMAIN][ATTR_STREAMS] - stream = streams.get(stream_source) - if not stream: - stream = Stream(hass, stream_source) - streams[stream_source] = stream + self.start() - # Add recorder - recorder = stream.outputs.get("recorder") - if recorder: - raise HomeAssistantError(f"Stream already recording to {recorder.video_path}!") - - recorder = stream.add_provider("recorder", timeout=duration) - recorder.video_path = video_path - - stream.start() - - # Take advantage of lookback - hls = stream.outputs.get("hls") - if lookback > 0 and hls: - num_segments = min(int(lookback // hls.target_duration), MAX_SEGMENTS) - # Wait for latest segment, then add the lookback - await hls.recv() - recorder.prepend(list(hls.get_segment())[-num_segments:]) + # Take advantage of lookback + hls = self.outputs.get("hls") + if lookback > 0 and hls: + num_segments = min(int(lookback // hls.target_duration), MAX_SEGMENTS) + # Wait for latest segment, then add the lookback + await hls.recv() + recorder.prepend(list(hls.get_segment())[-num_segments:]) diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index 45fa3d9e76a..4ee9f2a9814 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -1,15 +1,8 @@ """Constants for Stream component.""" DOMAIN = "stream" -CONF_STREAM_SOURCE = "stream_source" -CONF_LOOKBACK = "lookback" -CONF_DURATION = "duration" - ATTR_ENDPOINTS = "endpoints" ATTR_STREAMS = "streams" -ATTR_KEEPALIVE = "keepalive" - -SERVICE_RECORD = "record" OUTPUT_FORMATS = ["hls"] diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 5427172a55c..31c7940b8e1 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -194,11 +194,7 @@ class StreamView(HomeAssistantView): hass = request.app["hass"] stream = next( - ( - s - for s in hass.data[DOMAIN][ATTR_STREAMS].values() - if s.access_token == token - ), + (s for s in hass.data[DOMAIN][ATTR_STREAMS] if s.access_token == token), None, ) diff --git a/homeassistant/components/stream/services.yaml b/homeassistant/components/stream/services.yaml deleted file mode 100644 index a8652335bf1..00000000000 --- a/homeassistant/components/stream/services.yaml +++ /dev/null @@ -1,15 +0,0 @@ -record: - description: Make a .mp4 recording from a provided stream. - fields: - stream_source: - description: The input source for the stream. - example: "rtsp://my.stream.feed:554" - filename: - description: The file name string. - example: "/tmp/my_stream.mp4" - duration: - description: "Target recording length (in seconds). Default: 30" - example: 30 - lookback: - description: "Target lookback period (in seconds) to include in addition to duration. Only available if there is currently an active HLS stream for stream_source. Default: 0" - example: 5 diff --git a/tests/components/camera/test_init.py b/tests/components/camera/test_init.py index 2c2d744deb9..340a4b5d756 100644 --- a/tests/components/camera/test_init.py +++ b/tests/components/camera/test_init.py @@ -155,25 +155,20 @@ async def test_websocket_camera_thumbnail(hass, hass_ws_client, mock_camera): async def test_websocket_stream_no_source( hass, hass_ws_client, mock_camera, mock_stream ): - """Test camera/stream websocket command.""" + """Test camera/stream websocket command with camera with no source.""" await async_setup_component(hass, "camera", {}) - with patch( - "homeassistant.components.camera.request_stream", - return_value="http://home.assistant/playlist.m3u8", - ) as mock_request_stream: - # Request playlist through WebSocket - client = await hass_ws_client(hass) - await client.send_json( - {"id": 6, "type": "camera/stream", "entity_id": "camera.demo_camera"} - ) - msg = await client.receive_json() + # Request playlist through WebSocket + client = await hass_ws_client(hass) + await client.send_json( + {"id": 6, "type": "camera/stream", "entity_id": "camera.demo_camera"} + ) + msg = await client.receive_json() - # Assert WebSocket response - assert not mock_request_stream.called - assert msg["id"] == 6 - assert msg["type"] == TYPE_RESULT - assert not msg["success"] + # Assert WebSocket response + assert msg["id"] == 6 + assert msg["type"] == TYPE_RESULT + assert not msg["success"] async def test_websocket_camera_stream(hass, hass_ws_client, mock_camera, mock_stream): @@ -181,9 +176,9 @@ async def test_websocket_camera_stream(hass, hass_ws_client, mock_camera, mock_s await async_setup_component(hass, "camera", {}) with patch( - "homeassistant.components.camera.request_stream", + "homeassistant.components.camera.Stream.endpoint_url", return_value="http://home.assistant/playlist.m3u8", - ) as mock_request_stream, patch( + ) as mock_stream_view_url, patch( "homeassistant.components.demo.camera.DemoCamera.stream_source", return_value="http://example.com", ): @@ -195,7 +190,7 @@ async def test_websocket_camera_stream(hass, hass_ws_client, mock_camera, mock_s msg = await client.receive_json() # Assert WebSocket response - assert mock_request_stream.called + assert mock_stream_view_url.called assert msg["id"] == 6 assert msg["type"] == TYPE_RESULT assert msg["success"] @@ -248,9 +243,7 @@ async def test_play_stream_service_no_source(hass, mock_camera, mock_stream): ATTR_ENTITY_ID: "camera.demo_camera", camera.ATTR_MEDIA_PLAYER: "media_player.test", } - with patch("homeassistant.components.camera.request_stream"), pytest.raises( - HomeAssistantError - ): + with pytest.raises(HomeAssistantError): # Call service await hass.services.async_call( camera.DOMAIN, camera.SERVICE_PLAY_STREAM, data, blocking=True @@ -265,7 +258,7 @@ async def test_handle_play_stream_service(hass, mock_camera, mock_stream): ) await async_setup_component(hass, "media_player", {}) with patch( - "homeassistant.components.camera.request_stream" + "homeassistant.components.camera.Stream.endpoint_url", ) as mock_request_stream, patch( "homeassistant.components.demo.camera.DemoCamera.stream_source", return_value="http://example.com", @@ -289,7 +282,7 @@ async def test_no_preload_stream(hass, mock_stream): """Test camera preload preference.""" demo_prefs = CameraEntityPreferences({PREF_PRELOAD_STREAM: False}) with patch( - "homeassistant.components.camera.request_stream" + "homeassistant.components.camera.Stream.endpoint_url", ) as mock_request_stream, patch( "homeassistant.components.camera.prefs.CameraPreferences.get", return_value=demo_prefs, @@ -308,8 +301,8 @@ async def test_preload_stream(hass, mock_stream): """Test camera preload preference.""" demo_prefs = CameraEntityPreferences({PREF_PRELOAD_STREAM: True}) with patch( - "homeassistant.components.camera.request_stream" - ) as mock_request_stream, patch( + "homeassistant.components.camera.create_stream" + ) as mock_create_stream, patch( "homeassistant.components.camera.prefs.CameraPreferences.get", return_value=demo_prefs, ), patch( @@ -322,7 +315,7 @@ async def test_preload_stream(hass, mock_stream): await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_START) await hass.async_block_till_done() - assert mock_request_stream.called + assert mock_create_stream.called async def test_record_service_invalid_path(hass, mock_camera): @@ -348,10 +341,9 @@ async def test_record_service(hass, mock_camera, mock_stream): "homeassistant.components.demo.camera.DemoCamera.stream_source", return_value="http://example.com", ), patch( - "homeassistant.components.stream.async_handle_record_service", - ) as mock_record_service, patch.object( - hass.config, "is_allowed_path", return_value=True - ): + "homeassistant.components.stream.Stream.async_record", + autospec=True, + ) as mock_record: # Call service await hass.services.async_call( camera.DOMAIN, @@ -361,4 +353,4 @@ async def test_record_service(hass, mock_camera, mock_stream): ) # So long as we call stream.record, the rest should be covered # by those tests. - assert mock_record_service.called + assert mock_record.called diff --git a/tests/components/generic/test_camera.py b/tests/components/generic/test_camera.py index 9a147995541..65f5306c4d8 100644 --- a/tests/components/generic/test_camera.py +++ b/tests/components/generic/test_camera.py @@ -176,17 +176,18 @@ async def test_stream_source(aioclient_mock, hass, hass_client, hass_ws_client): "still_image_url": "https://example.com", "stream_source": 'http://example.com/{{ states.sensor.temp.state + "a" }}', "limit_refetch_to_url_change": True, - } + }, }, ) + assert await async_setup_component(hass, "stream", {}) await hass.async_block_till_done() hass.states.async_set("sensor.temp", "5") with patch( - "homeassistant.components.camera.request_stream", + "homeassistant.components.camera.Stream.endpoint_url", return_value="http://home.assistant/playlist.m3u8", - ) as mock_request_stream: + ) as mock_stream_url: # Request playlist through WebSocket client = await hass_ws_client(hass) @@ -196,25 +197,47 @@ async def test_stream_source(aioclient_mock, hass, hass_client, hass_ws_client): msg = await client.receive_json() # Assert WebSocket response - assert mock_request_stream.call_count == 1 - assert mock_request_stream.call_args[0][1] == "http://example.com/5a" + assert mock_stream_url.call_count == 1 assert msg["id"] == 1 assert msg["type"] == TYPE_RESULT assert msg["success"] assert msg["result"]["url"][-13:] == "playlist.m3u8" - # Cause a template render error - hass.states.async_remove("sensor.temp") + +async def test_stream_source_error(aioclient_mock, hass, hass_client, hass_ws_client): + """Test that the stream source has an error.""" + assert await async_setup_component( + hass, + "camera", + { + "camera": { + "name": "config_test", + "platform": "generic", + "still_image_url": "https://example.com", + # Does not exist + "stream_source": 'http://example.com/{{ states.sensor.temp.state + "a" }}', + "limit_refetch_to_url_change": True, + }, + }, + ) + assert await async_setup_component(hass, "stream", {}) + await hass.async_block_till_done() + + with patch( + "homeassistant.components.camera.Stream.endpoint_url", + return_value="http://home.assistant/playlist.m3u8", + ) as mock_stream_url: + # Request playlist through WebSocket + client = await hass_ws_client(hass) await client.send_json( - {"id": 2, "type": "camera/stream", "entity_id": "camera.config_test"} + {"id": 1, "type": "camera/stream", "entity_id": "camera.config_test"} ) msg = await client.receive_json() - # Assert that no new call to the stream request should have been made - assert mock_request_stream.call_count == 1 - # Assert the websocket error message - assert msg["id"] == 2 + # Assert WebSocket response + assert mock_stream_url.call_count == 0 + assert msg["id"] == 1 assert msg["type"] == TYPE_RESULT assert msg["success"] is False assert msg["error"] == { @@ -240,7 +263,7 @@ async def test_no_stream_source(aioclient_mock, hass, hass_client, hass_ws_clien await hass.async_block_till_done() with patch( - "homeassistant.components.camera.request_stream", + "homeassistant.components.camera.Stream.endpoint_url", return_value="http://home.assistant/playlist.m3u8", ) as mock_request_stream: # Request playlist through WebSocket diff --git a/tests/components/nest/camera_sdm_test.py b/tests/components/nest/camera_sdm_test.py index 117c8e97884..956d6036aed 100644 --- a/tests/components/nest/camera_sdm_test.py +++ b/tests/components/nest/camera_sdm_test.py @@ -16,6 +16,7 @@ import pytest from homeassistant.components import camera from homeassistant.components.camera import STATE_IDLE from homeassistant.exceptions import HomeAssistantError +from homeassistant.setup import async_setup_component from homeassistant.util.dt import utcnow from .common import async_setup_sdm_platform @@ -245,12 +246,17 @@ async def test_refresh_expired_stream_token(hass, auth): DEVICE_TRAITS, auth=auth, ) + assert await async_setup_component(hass, "stream", {}) assert len(hass.states.async_all()) == 1 cam = hass.states.get("camera.my_camera") assert cam is not None assert cam.state == STATE_IDLE + # Request a stream for the camera entity to exercise nest cam + camera interaction + # and shutdown on url expiration + await camera.async_request_stream(hass, cam.entity_id, "hls") + stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") assert stream_source == "rtsp://some/url?auth=g.1.streamingToken" diff --git a/tests/components/stream/common.py b/tests/components/stream/common.py index c99cdef7984..5ec4f4217ce 100644 --- a/tests/components/stream/common.py +++ b/tests/components/stream/common.py @@ -5,9 +5,6 @@ import io import av import numpy as np -from homeassistant.components.stream import Stream -from homeassistant.components.stream.const import ATTR_STREAMS, DOMAIN - AUDIO_SAMPLE_RATE = 8000 @@ -93,10 +90,3 @@ def generate_h264_video(container_format="mp4", audio_codec=None): output.seek(0) return output - - -def preload_stream(hass, stream_source): - """Preload a stream for use in tests.""" - stream = Stream(hass, stream_source) - hass.data[DOMAIN][ATTR_STREAMS][stream_source] = stream - return stream diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index ab49a56ca02..b575b3877fa 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -5,13 +5,13 @@ from urllib.parse import urlparse import av -from homeassistant.components.stream import request_stream +from homeassistant.components.stream import create_stream from homeassistant.const import HTTP_NOT_FOUND from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util from tests.common import async_fire_time_changed -from tests.components.stream.common import generate_h264_video, preload_stream +from tests.components.stream.common import generate_h264_video async def test_hls_stream(hass, hass_client, stream_worker_sync): @@ -27,11 +27,12 @@ async def test_hls_stream(hass, hass_client, stream_worker_sync): # Setup demo HLS track source = generate_h264_video() - stream = preload_stream(hass, source) - stream.add_provider("hls") + stream = create_stream(hass, source) # Request stream - url = request_stream(hass, source) + stream.add_provider("hls") + stream.start() + url = stream.endpoint_url("hls") http_client = await hass_client() @@ -72,11 +73,12 @@ async def test_stream_timeout(hass, hass_client, stream_worker_sync): # Setup demo HLS track source = generate_h264_video() - stream = preload_stream(hass, source) - stream.add_provider("hls") + stream = create_stream(hass, source) # Request stream - url = request_stream(hass, source) + stream.add_provider("hls") + stream.start() + url = stream.endpoint_url("hls") http_client = await hass_client() @@ -113,11 +115,13 @@ async def test_stream_ended(hass, stream_worker_sync): # Setup demo HLS track source = generate_h264_video() - stream = preload_stream(hass, source) + stream = create_stream(hass, source) track = stream.add_provider("hls") # Request stream - request_stream(hass, source) + stream.add_provider("hls") + stream.start() + stream.endpoint_url("hls") # Run it dead while True: @@ -142,9 +146,10 @@ async def test_stream_keepalive(hass): # Setup demo HLS track source = "test_stream_keepalive_source" - stream = preload_stream(hass, source) + stream = create_stream(hass, source) track = stream.add_provider("hls") track.num_segments = 2 + stream.start() cur_time = 0 @@ -163,7 +168,8 @@ async def test_stream_keepalive(hass): av_open.side_effect = av.error.InvalidDataError(-2, "error") mock_time.time.side_effect = time_side_effect # Request stream - request_stream(hass, source, keepalive=True) + stream.keepalive = True + stream.start() stream._thread.join() stream._thread = None assert av_open.call_count == 2 diff --git a/tests/components/stream/test_init.py b/tests/components/stream/test_init.py deleted file mode 100644 index 2e13493b641..00000000000 --- a/tests/components/stream/test_init.py +++ /dev/null @@ -1,86 +0,0 @@ -"""The tests for stream.""" -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest - -from homeassistant.components.stream.const import ( - ATTR_STREAMS, - CONF_LOOKBACK, - CONF_STREAM_SOURCE, - DOMAIN, - SERVICE_RECORD, -) -from homeassistant.const import CONF_FILENAME -from homeassistant.exceptions import HomeAssistantError -from homeassistant.setup import async_setup_component - - -async def test_record_service_invalid_file(hass): - """Test record service call with invalid file.""" - await async_setup_component(hass, "stream", {"stream": {}}) - data = {CONF_STREAM_SOURCE: "rtsp://my.video", CONF_FILENAME: "/my/invalid/path"} - with pytest.raises(HomeAssistantError): - await hass.services.async_call(DOMAIN, SERVICE_RECORD, data, blocking=True) - - -async def test_record_service_init_stream(hass): - """Test record service call with invalid file.""" - await async_setup_component(hass, "stream", {"stream": {}}) - data = {CONF_STREAM_SOURCE: "rtsp://my.video", CONF_FILENAME: "/my/invalid/path"} - with patch("homeassistant.components.stream.Stream") as stream_mock, patch.object( - hass.config, "is_allowed_path", return_value=True - ): - # Setup stubs - stream_mock.return_value.outputs = {} - - # Call Service - await hass.services.async_call(DOMAIN, SERVICE_RECORD, data, blocking=True) - - # Assert - assert stream_mock.called - - -async def test_record_service_existing_record_session(hass): - """Test record service call with invalid file.""" - await async_setup_component(hass, "stream", {"stream": {}}) - source = "rtsp://my.video" - data = {CONF_STREAM_SOURCE: source, CONF_FILENAME: "/my/invalid/path"} - - # Setup stubs - stream_mock = MagicMock() - stream_mock.return_value.outputs = {"recorder": MagicMock()} - hass.data[DOMAIN][ATTR_STREAMS][source] = stream_mock - - with patch.object(hass.config, "is_allowed_path", return_value=True), pytest.raises( - HomeAssistantError - ): - # Call Service - await hass.services.async_call(DOMAIN, SERVICE_RECORD, data, blocking=True) - - -async def test_record_service_lookback(hass): - """Test record service call with invalid file.""" - await async_setup_component(hass, "stream", {"stream": {}}) - data = { - CONF_STREAM_SOURCE: "rtsp://my.video", - CONF_FILENAME: "/my/invalid/path", - CONF_LOOKBACK: 4, - } - - with patch("homeassistant.components.stream.Stream") as stream_mock, patch.object( - hass.config, "is_allowed_path", return_value=True - ): - # Setup stubs - hls_mock = MagicMock() - hls_mock.target_duration = 2 - hls_mock.recv = AsyncMock(return_value=None) - stream_mock.return_value.outputs = {"hls": hls_mock} - - # Call Service - await hass.services.async_call(DOMAIN, SERVICE_RECORD, data, blocking=True) - - assert stream_mock.called - stream_mock.return_value.add_provider.assert_called_once_with( - "recorder", timeout=30 - ) - assert hls_mock.recv.called diff --git a/tests/components/stream/test_recorder.py b/tests/components/stream/test_recorder.py index bda53a9cc17..9d418c360b1 100644 --- a/tests/components/stream/test_recorder.py +++ b/tests/components/stream/test_recorder.py @@ -8,13 +8,15 @@ from unittest.mock import patch import av import pytest +from homeassistant.components.stream import create_stream from homeassistant.components.stream.core import Segment from homeassistant.components.stream.recorder import recorder_save_worker +from homeassistant.exceptions import HomeAssistantError from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util from tests.common import async_fire_time_changed -from tests.components.stream.common import generate_h264_video, preload_stream +from tests.components.stream.common import generate_h264_video TEST_TIMEOUT = 10 @@ -75,10 +77,11 @@ async def test_record_stream(hass, hass_client, stream_worker_sync, record_worke # Setup demo track source = generate_h264_video() - stream = preload_stream(hass, source) - recorder = stream.add_provider("recorder") - stream.start() + stream = create_stream(hass, source) + with patch.object(hass.config, "is_allowed_path", return_value=True): + await stream.async_record("/example/path") + recorder = stream.add_provider("recorder") while True: segment = await recorder.recv() if not segment: @@ -95,6 +98,27 @@ async def test_record_stream(hass, hass_client, stream_worker_sync, record_worke record_worker_sync.join() +async def test_record_lookback( + hass, hass_client, stream_worker_sync, record_worker_sync +): + """Exercise record with loopback.""" + await async_setup_component(hass, "stream", {"stream": {}}) + + source = generate_h264_video() + stream = create_stream(hass, source) + + # Start an HLS feed to enable lookback + stream.add_provider("hls") + stream.start() + + with patch.object(hass.config, "is_allowed_path", return_value=True): + await stream.async_record("/example/path", lookback=4) + + # This test does not need recorder cleanup since it is not fully exercised + + stream.stop() + + async def test_recorder_timeout(hass, hass_client, stream_worker_sync): """ Test recorder timeout. @@ -109,9 +133,11 @@ async def test_recorder_timeout(hass, hass_client, stream_worker_sync): with patch("homeassistant.components.stream.IdleTimer.fire") as mock_timeout: # Setup demo track source = generate_h264_video() - stream = preload_stream(hass, source) - recorder = stream.add_provider("recorder", timeout=30) - stream.start() + + stream = create_stream(hass, source) + with patch.object(hass.config, "is_allowed_path", return_value=True): + await stream.async_record("/example/path") + recorder = stream.add_provider("recorder") await recorder.recv() @@ -128,6 +154,19 @@ async def test_recorder_timeout(hass, hass_client, stream_worker_sync): await hass.async_block_till_done() +async def test_record_path_not_allowed(hass, hass_client): + """Test where the output path is not allowed by home assistant configuration.""" + await async_setup_component(hass, "stream", {"stream": {}}) + + # Setup demo track + source = generate_h264_video() + stream = create_stream(hass, source) + with patch.object( + hass.config, "is_allowed_path", return_value=False + ), pytest.raises(HomeAssistantError): + await stream.async_record("/example/path") + + async def test_recorder_save(tmpdir): """Test recorder save.""" # Setup @@ -165,9 +204,10 @@ async def test_record_stream_audio( source = generate_h264_video( container_format="mov", audio_codec=a_codec ) # mov can store PCM - stream = preload_stream(hass, source) + stream = create_stream(hass, source) + with patch.object(hass.config, "is_allowed_path", return_value=True): + await stream.async_record("/example/path") recorder = stream.add_provider("recorder") - stream.start() while True: segment = await recorder.recv()