diff --git a/homeassistant/components/camera/__init__.py b/homeassistant/components/camera/__init__.py index cdd8a844389..e453cdfd1a1 100644 --- a/homeassistant/components/camera/__init__.py +++ b/homeassistant/components/camera/__init__.py @@ -20,7 +20,7 @@ import voluptuous as vol from homeassistant.core import callback from homeassistant.const import ATTR_ENTITY_ID, SERVICE_TURN_OFF, \ - SERVICE_TURN_ON, EVENT_HOMEASSISTANT_START + SERVICE_TURN_ON, EVENT_HOMEASSISTANT_START, CONF_FILENAME from homeassistant.exceptions import HomeAssistantError from homeassistant.loader import bind_hass from homeassistant.helpers.entity import Entity @@ -33,7 +33,8 @@ from homeassistant.components.media_player.const import ( SERVICE_PLAY_MEDIA, DOMAIN as DOMAIN_MP) from homeassistant.components.stream import request_stream from homeassistant.components.stream.const import ( - OUTPUT_FORMATS, FORMAT_CONTENT_TYPE) + OUTPUT_FORMATS, FORMAT_CONTENT_TYPE, CONF_STREAM_SOURCE, CONF_LOOKBACK, + CONF_DURATION, SERVICE_RECORD, DOMAIN as DOMAIN_STREAM) from homeassistant.components import websocket_api import homeassistant.helpers.config_validation as cv @@ -85,6 +86,12 @@ CAMERA_SERVICE_PLAY_STREAM = CAMERA_SERVICE_SCHEMA.extend({ vol.Optional(ATTR_FORMAT, default='hls'): vol.In(OUTPUT_FORMATS), }) +CAMERA_SERVICE_RECORD = CAMERA_SERVICE_SCHEMA.extend({ + vol.Required(CONF_FILENAME): cv.template, + vol.Optional(CONF_DURATION, default=30): int, + vol.Optional(CONF_LOOKBACK, default=0): int, +}) + WS_TYPE_CAMERA_THUMBNAIL = 'camera_thumbnail' SCHEMA_WS_CAMERA_THUMBNAIL = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({ vol.Required('type'): WS_TYPE_CAMERA_THUMBNAIL, @@ -260,6 +267,10 @@ async def async_setup(hass, config): SERVICE_PLAY_STREAM, CAMERA_SERVICE_PLAY_STREAM, async_handle_play_stream_service ) + component.async_register_entity_service( + SERVICE_RECORD, CAMERA_SERVICE_RECORD, + async_handle_record_service + ) return True @@ -640,3 +651,27 @@ async def async_handle_play_stream_service(camera, service_call): await hass.services.async_call( DOMAIN_MP, SERVICE_PLAY_MEDIA, data, blocking=True, context=service_call.context) + + +async def async_handle_record_service(camera, call): + """Handle stream recording service calls.""" + if not camera.stream_source: + raise HomeAssistantError("{} does not support record service" + .format(camera.entity_id)) + + hass = camera.hass + filename = call.data[CONF_FILENAME] + filename.hass = hass + video_path = filename.async_render( + variables={ATTR_ENTITY_ID: camera}) + + data = { + CONF_STREAM_SOURCE: camera.stream_source, + CONF_FILENAME: video_path, + CONF_DURATION: call.data[CONF_DURATION], + CONF_LOOKBACK: call.data[CONF_LOOKBACK], + } + + await hass.services.async_call( + DOMAIN_STREAM, SERVICE_RECORD, data, + blocking=True, context=call.context) diff --git a/homeassistant/components/camera/services.yaml b/homeassistant/components/camera/services.yaml index 575f1fe76f7..45a0f4cfec0 100644 --- a/homeassistant/components/camera/services.yaml +++ b/homeassistant/components/camera/services.yaml @@ -51,6 +51,22 @@ play_stream: description: (Optional) Stream format supported by media player. example: 'hls' +record: + description: Record live camera feed. + fields: + entity_id: + description: Name of entities to record. + example: 'camera.living_room_camera' + filename: + description: Template of a Filename. Variable is entity_id. Must be mp4. + example: '/tmp/snapshot_{{ entity_id }}.mp4' + duration: + description: (Optional) Target recording length (in seconds). Default: 30 + example: 30 + lookback: + description: (Optional) Target lookback period (in seconds) to include in addition to duration. Only available if there is currently an active HLS stream. + example: 4 + local_file_update_file_path: description: Update the file_path for a local_file camera. fields: diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index a68f1c47dbf..1e8ae5d60e3 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -10,15 +10,19 @@ import threading import voluptuous as vol from homeassistant.auth.util import generate_secret -from homeassistant.const import EVENT_HOMEASSISTANT_STOP +import homeassistant.helpers.config_validation as cv +from homeassistant.const import EVENT_HOMEASSISTANT_STOP, CONF_FILENAME from homeassistant.core import callback from homeassistant.exceptions import HomeAssistantError from homeassistant.loader import bind_hass -from .const import DOMAIN, ATTR_STREAMS, ATTR_ENDPOINTS +from .const import ( + DOMAIN, ATTR_STREAMS, ATTR_ENDPOINTS, CONF_STREAM_SOURCE, + CONF_DURATION, CONF_LOOKBACK, SERVICE_RECORD) from .core import PROVIDERS from .worker import stream_worker from .hls import async_setup_hls +from .recorder import async_setup_recorder REQUIREMENTS = ['av==6.1.2'] @@ -30,6 +34,16 @@ CONFIG_SCHEMA = vol.Schema({ DOMAIN: vol.Schema({}), }, extra=vol.ALLOW_EXTRA) +STREAM_SERVICE_SCHEMA = vol.Schema({ + vol.Required(CONF_STREAM_SOURCE): cv.string, +}) + +SERVICE_RECORD_SCHEMA = STREAM_SERVICE_SCHEMA.extend({ + vol.Required(CONF_FILENAME): cv.string, + vol.Optional(CONF_DURATION, default=30): int, + vol.Optional(CONF_LOOKBACK, default=0): int, +}) + # Set log level to error for libav logging.getLogger('libav').setLevel(logging.ERROR) @@ -82,6 +96,9 @@ async def async_setup(hass, config): hls_endpoint = async_setup_hls(hass) hass.data[DOMAIN][ATTR_ENDPOINTS]['hls'] = hls_endpoint + # Setup Recorder + async_setup_recorder(hass) + @callback def shutdown(event): """Stop all stream workers.""" @@ -92,6 +109,13 @@ async def async_setup(hass, config): hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) + async def async_record(call): + """Call record stream service handler.""" + await async_handle_record_service(hass, call) + + hass.services.async_register(DOMAIN, SERVICE_RECORD, + async_record, schema=SERVICE_RECORD_SCHEMA) + return True @@ -119,15 +143,15 @@ class Stream: def add_provider(self, fmt): """Add provider output stream.""" - provider = PROVIDERS[fmt](self) - if not self._outputs.get(provider.format): - self._outputs[provider.format] = provider - return self._outputs[provider.format] + if not self._outputs.get(fmt): + provider = PROVIDERS[fmt](self) + self._outputs[fmt] = provider + return self._outputs[fmt] def remove_provider(self, provider): """Remove provider output stream.""" - if provider.format in self._outputs: - del self._outputs[provider.format] + if provider.name in self._outputs: + del self._outputs[provider.name] self.check_idle() if not self._outputs: @@ -165,3 +189,44 @@ class Stream: self._thread.join() self._thread = None _LOGGER.info("Stopped stream: %s", self.source) + + +async def async_handle_record_service(hass, call): + """Handle save video service calls.""" + stream_source = call.data[CONF_STREAM_SOURCE] + video_path = call.data[CONF_FILENAME] + duration = call.data[CONF_DURATION] + lookback = call.data[CONF_LOOKBACK] + + # Check for file access + if not hass.config.is_allowed_path(video_path): + raise HomeAssistantError("Can't write {}, no access to path!" + .format(video_path)) + + # Check for active stream + streams = hass.data[DOMAIN][ATTR_STREAMS] + stream = streams.get(stream_source) + if not stream: + stream = Stream(hass, stream_source) + streams[stream_source] = stream + + # Add recorder + recorder = stream.outputs.get('recorder') + if recorder: + raise HomeAssistantError("Stream already recording to {}!" + .format(recorder.video_path)) + + recorder = stream.add_provider('recorder') + recorder.video_path = video_path + recorder.timeout = duration + + stream.start() + + # Take advantage of lookback + hls = stream.outputs.get('hls') + if lookback > 0 and hls: + num_segments = min(int(lookback // hls.target_duration), + hls.num_segments) + # Wait for latest segment, then add the lookback + await hls.recv() + recorder.prepend(list(hls.get_segment())[-num_segments:]) diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index a87daaa9d40..9421faaff9a 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -1,10 +1,16 @@ """Constants for Stream component.""" DOMAIN = 'stream' +CONF_STREAM_SOURCE = 'stream_source' +CONF_LOOKBACK = 'lookback' +CONF_DURATION = 'duration' + ATTR_ENDPOINTS = 'endpoints' ATTR_STREAMS = 'streams' ATTR_KEEPALIVE = 'keepalive' +SERVICE_RECORD = 'record' + OUTPUT_FORMATS = ['hls'] FORMAT_CONTENT_TYPE = { diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 59c0a6b650f..745c334fce0 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -41,15 +41,21 @@ class StreamOutput: num_segments = 3 - def __init__(self, stream) -> None: + def __init__(self, stream, timeout: int = 300) -> None: """Initialize a stream output.""" self.idle = False + self.timeout = timeout self._stream = stream self._cursor = None self._event = asyncio.Event() self._segments = deque(maxlen=self.num_segments) self._unsub = None + @property + def name(self) -> str: + """Return provider name.""" + return None + @property def format(self) -> str: """Return container format.""" @@ -82,7 +88,8 @@ class StreamOutput: # Reset idle timeout if self._unsub is not None: self._unsub() - self._unsub = async_call_later(self._stream.hass, 300, self._timeout) + self._unsub = async_call_later( + self._stream.hass, self.timeout, self._timeout) if not sequence: return self._segments @@ -111,14 +118,14 @@ class StreamOutput: # Start idle timeout when we start recieving data if self._unsub is None: self._unsub = async_call_later( - self._stream.hass, 300, self._timeout) + self._stream.hass, self.timeout, self._timeout) if segment is None: self._event.set() # Cleanup provider if self._unsub is not None: self._unsub() - self._cleanup() + self.cleanup() return self._segments.append(segment) @@ -133,11 +140,11 @@ class StreamOutput: self.idle = True self._stream.check_idle() else: - self._cleanup() + self.cleanup() - def _cleanup(self): - """Remove provider.""" - self._segments = [] + def cleanup(self): + """Handle cleanup.""" + self._segments = deque(maxlen=self.num_segments) self._stream.remove_provider(self) diff --git a/homeassistant/components/stream/hls.py b/homeassistant/components/stream/hls.py index 8f5dd6c1884..aa5ce105764 100644 --- a/homeassistant/components/stream/hls.py +++ b/homeassistant/components/stream/hls.py @@ -110,6 +110,11 @@ class M3U8Renderer: class HlsStreamOutput(StreamOutput): """Represents HLS Output formats.""" + @property + def name(self) -> str: + """Return provider name.""" + return 'hls' + @property def format(self) -> str: """Return container format.""" diff --git a/homeassistant/components/stream/recorder.py b/homeassistant/components/stream/recorder.py new file mode 100644 index 00000000000..15e2108c82a --- /dev/null +++ b/homeassistant/components/stream/recorder.py @@ -0,0 +1,92 @@ +"""Provide functionality to record stream.""" +import threading +from typing import List + +from homeassistant.core import callback + +from .core import Segment, StreamOutput, PROVIDERS + + +@callback +def async_setup_recorder(hass): + """Only here so Provider Registry works.""" + + +def recorder_save_worker(file_out: str, segments: List[Segment]): + """Handle saving stream.""" + import av + + output = av.open(file_out, 'w', options={'movflags': 'frag_keyframe'}) + output_v = None + + for segment in segments: + # Seek to beginning and open segment + segment.segment.seek(0) + source = av.open(segment.segment, 'r', format='mpegts') + source_v = source.streams.video[0] + + # Add output streams + if not output_v: + output_v = output.add_stream(template=source_v) + + # Remux video + for packet in source.demux(source_v): + if packet is not None and packet.dts is not None: + packet.stream = output_v + output.mux(packet) + + output.close() + + +@PROVIDERS.register('recorder') +class RecorderOutput(StreamOutput): + """Represents HLS Output formats.""" + + def __init__(self, stream, timeout: int = 30) -> None: + """Initialize recorder output.""" + super().__init__(stream, timeout) + self.video_path = None + self._segments = [] + + @property + def name(self) -> str: + """Return provider name.""" + return 'recorder' + + @property + def format(self) -> str: + """Return container format.""" + return 'mpegts' + + @property + def audio_codec(self) -> str: + """Return desired audio codec.""" + return 'aac' + + @property + def video_codec(self) -> str: + """Return desired video codec.""" + return 'h264' + + def prepend(self, segments: List[Segment]) -> None: + """Prepend segments to existing list.""" + own_segments = self.segments + segments = [s for s in segments if s.sequence not in own_segments] + self._segments = segments + self._segments + + @callback + def _timeout(self, _now=None): + """Handle recorder timeout.""" + self._unsub = None + self.cleanup() + + def cleanup(self): + """Write recording and clean up.""" + thread = threading.Thread( + name='recorder_save_worker', + target=recorder_save_worker, + args=(self.video_path, self._segments)) + thread.start() + + self._segments = [] + self._stream.remove_provider(self) diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index d0196761968..3ca8ac079e3 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -112,7 +112,7 @@ def stream_worker(hass, stream, quit_event): a_packet, buffer = create_stream_buffer( stream_output, video_stream, audio_frame) audio_packets[buffer.astream] = a_packet - outputs[stream_output.format] = buffer + outputs[stream_output.name] = buffer # First video packet tends to have a weird dts/pts if first_packet: diff --git a/tests/components/camera/test_init.py b/tests/components/camera/test_init.py index 701a3682830..e730f39656e 100644 --- a/tests/components/camera/test_init.py +++ b/tests/components/camera/test_init.py @@ -341,3 +341,38 @@ async def test_preload_stream(hass, mock_stream): hass.bus.async_fire(EVENT_HOMEASSISTANT_START) await hass.async_block_till_done() assert mock_request_stream.called + + +async def test_record_service_invalid_path(hass, mock_camera): + """Test record service with invalid path.""" + data = { + ATTR_ENTITY_ID: 'camera.demo_camera', + camera.CONF_FILENAME: '/my/invalid/path' + } + with patch.object(hass.config, 'is_allowed_path', return_value=False), \ + pytest.raises(HomeAssistantError): + # Call service + await hass.services.async_call( + camera.DOMAIN, camera.SERVICE_RECORD, data, blocking=True) + + +async def test_record_service(hass, mock_camera, mock_stream): + """Test record service.""" + data = { + ATTR_ENTITY_ID: 'camera.demo_camera', + camera.CONF_FILENAME: '/my/path' + } + + with patch('homeassistant.components.demo.camera.DemoCamera.stream_source', + new_callable=PropertyMock) as mock_stream_source, \ + patch( + 'homeassistant.components.stream.async_handle_record_service', + return_value=mock_coro()) as mock_record_service, \ + patch.object(hass.config, 'is_allowed_path', return_value=True): + mock_stream_source.return_value = io.BytesIO() + # Call service + await hass.services.async_call( + camera.DOMAIN, camera.SERVICE_RECORD, data, blocking=True) + # So long as we call stream.record, the rest should be covered + # by those tests. + assert mock_record_service.called diff --git a/tests/components/stream/test_init.py b/tests/components/stream/test_init.py new file mode 100644 index 00000000000..7f68bf1e7bf --- /dev/null +++ b/tests/components/stream/test_init.py @@ -0,0 +1,103 @@ +"""The tests for stream.""" +from unittest.mock import patch, MagicMock + +import pytest + +from homeassistant.const import CONF_FILENAME +from homeassistant.components.stream.const import ( + DOMAIN, SERVICE_RECORD, CONF_STREAM_SOURCE, CONF_LOOKBACK, ATTR_STREAMS) +from homeassistant.exceptions import HomeAssistantError +from homeassistant.setup import async_setup_component + +from tests.common import mock_coro + + +async def test_record_service_invalid_file(hass): + """Test record service call with invalid file.""" + await async_setup_component(hass, 'stream', { + 'stream': {} + }) + data = { + CONF_STREAM_SOURCE: 'rtsp://my.video', + CONF_FILENAME: '/my/invalid/path' + } + with pytest.raises(HomeAssistantError): + await hass.services.async_call( + DOMAIN, SERVICE_RECORD, data, blocking=True) + + +async def test_record_service_init_stream(hass): + """Test record service call with invalid file.""" + await async_setup_component(hass, 'stream', { + 'stream': {} + }) + data = { + CONF_STREAM_SOURCE: 'rtsp://my.video', + CONF_FILENAME: '/my/invalid/path' + } + with patch('homeassistant.components.stream.Stream') as stream_mock, \ + patch.object(hass.config, 'is_allowed_path', return_value=True): + # Setup stubs + stream_mock.return_value.outputs = {} + + # Call Service + await hass.services.async_call( + DOMAIN, SERVICE_RECORD, data, blocking=True) + + # Assert + assert stream_mock.called + + +async def test_record_service_existing_record_session(hass): + """Test record service call with invalid file.""" + await async_setup_component(hass, 'stream', { + 'stream': {} + }) + source = 'rtsp://my.video' + data = { + CONF_STREAM_SOURCE: source, + CONF_FILENAME: '/my/invalid/path' + } + + # Setup stubs + stream_mock = MagicMock() + stream_mock.return_value.outputs = {'recorder': MagicMock()} + hass.data[DOMAIN][ATTR_STREAMS][source] = stream_mock + + with patch.object(hass.config, 'is_allowed_path', return_value=True), \ + pytest.raises(HomeAssistantError): + # Call Service + await hass.services.async_call( + DOMAIN, SERVICE_RECORD, data, blocking=True) + + +async def test_record_service_lookback(hass): + """Test record service call with invalid file.""" + await async_setup_component(hass, 'stream', { + 'stream': {} + }) + data = { + CONF_STREAM_SOURCE: 'rtsp://my.video', + CONF_FILENAME: '/my/invalid/path', + CONF_LOOKBACK: 4 + } + + with patch('homeassistant.components.stream.Stream') as stream_mock, \ + patch.object(hass.config, 'is_allowed_path', return_value=True): + # Setup stubs + hls_mock = MagicMock() + hls_mock.num_segments = 3 + hls_mock.target_duration = 2 + hls_mock.recv.return_value = mock_coro() + stream_mock.return_value.outputs = { + 'hls': hls_mock + } + + # Call Service + await hass.services.async_call( + DOMAIN, SERVICE_RECORD, data, blocking=True) + + assert stream_mock.called + stream_mock.return_value.add_provider.assert_called_once_with( + 'recorder') + assert hls_mock.recv.called diff --git a/tests/components/stream/test_recorder.py b/tests/components/stream/test_recorder.py new file mode 100644 index 00000000000..4e227e463b4 --- /dev/null +++ b/tests/components/stream/test_recorder.py @@ -0,0 +1,83 @@ +"""The tests for hls streams.""" +from datetime import timedelta +from io import BytesIO +from unittest.mock import patch + +from homeassistant.setup import async_setup_component +from homeassistant.components.stream.core import Segment +from homeassistant.components.stream.recorder import recorder_save_worker +import homeassistant.util.dt as dt_util + +from tests.common import async_fire_time_changed +from tests.components.stream.common import ( + generate_h264_video, preload_stream) + + +async def test_record_stream(hass, hass_client): + """ + Test record stream. + + Purposefully not mocking anything here to test full + integration with the stream component. + """ + await async_setup_component(hass, 'stream', { + 'stream': {} + }) + + with patch( + 'homeassistant.components.stream.recorder.recorder_save_worker'): + # Setup demo track + source = generate_h264_video() + stream = preload_stream(hass, source) + recorder = stream.add_provider('recorder') + stream.start() + + segments = 0 + while True: + segment = await recorder.recv() + if not segment: + break + segments += 1 + + stream.stop() + + assert segments == 3 + + +async def test_recorder_timeout(hass, hass_client): + """Test recorder timeout.""" + await async_setup_component(hass, 'stream', { + 'stream': {} + }) + + with patch( + 'homeassistant.components.stream.recorder.RecorderOutput.cleanup' + ) as mock_cleanup: + # Setup demo track + source = generate_h264_video() + stream = preload_stream(hass, source) + recorder = stream.add_provider('recorder') + stream.start() + + await recorder.recv() + + # Wait a minute + future = dt_util.utcnow() + timedelta(minutes=1) + async_fire_time_changed(hass, future) + await hass.async_block_till_done() + + assert mock_cleanup.called + + +async def test_recorder_save(): + """Test recorder save.""" + # Setup + source = generate_h264_video() + output = BytesIO() + output.name = 'test.mp4' + + # Run + recorder_save_worker(output, [Segment(1, source, 4)]) + + # Assert + assert output.getvalue()