diff --git a/.travis.yml b/.travis.yml index be00f989290..0461d182232 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,18 @@ sudo: false +dist: xenial addons: apt: + sources: + - sourceline: "ppa:jonathonf/ffmpeg-4" packages: - libudev-dev + - libavformat-dev + - libavcodec-dev + - libavdevice-dev + - libavutil-dev + - libswscale-dev + - libswresample-dev + - libavfilter-dev matrix: fast_finish: true include: @@ -19,15 +29,12 @@ matrix: env: TOXENV=py36 - python: "3.7" env: TOXENV=py37 - dist: xenial - python: "3.8-dev" env: TOXENV=py38 - dist: xenial if: branch = dev AND type = push allow_failures: - python: "3.8-dev" env: TOXENV=py38 - dist: xenial cache: directories: diff --git a/homeassistant/components/camera/__init__.py b/homeassistant/components/camera/__init__.py index 474f9594610..48dd355ebd6 100644 --- a/homeassistant/components/camera/__init__.py +++ b/homeassistant/components/camera/__init__.py @@ -28,6 +28,12 @@ from homeassistant.helpers.entity_component import EntityComponent from homeassistant.helpers.config_validation import ( # noqa PLATFORM_SCHEMA, PLATFORM_SCHEMA_BASE) from homeassistant.components.http import HomeAssistantView, KEY_AUTHENTICATED +from homeassistant.components.media_player.const import ( + ATTR_MEDIA_CONTENT_ID, ATTR_MEDIA_CONTENT_TYPE, + SERVICE_PLAY_MEDIA, DOMAIN as DOMAIN_MP) +from homeassistant.components.stream import request_stream +from homeassistant.components.stream.const import ( + OUTPUT_FORMATS, FORMAT_CONTENT_TYPE) from homeassistant.components import websocket_api import homeassistant.helpers.config_validation as cv @@ -39,11 +45,14 @@ _LOGGER = logging.getLogger(__name__) SERVICE_ENABLE_MOTION = 'enable_motion_detection' SERVICE_DISABLE_MOTION = 'disable_motion_detection' SERVICE_SNAPSHOT = 'snapshot' +SERVICE_PLAY_STREAM = 'play_stream' SCAN_INTERVAL = timedelta(seconds=30) ENTITY_ID_FORMAT = DOMAIN + '.{}' ATTR_FILENAME = 'filename' +ATTR_MEDIA_PLAYER = 'media_player' +ATTR_FORMAT = 'format' STATE_RECORDING = 'recording' STATE_STREAMING = 'streaming' @@ -69,6 +78,11 @@ CAMERA_SERVICE_SNAPSHOT = CAMERA_SERVICE_SCHEMA.extend({ vol.Required(ATTR_FILENAME): cv.template }) +CAMERA_SERVICE_PLAY_STREAM = CAMERA_SERVICE_SCHEMA.extend({ + vol.Required(ATTR_MEDIA_PLAYER): cv.entities_domain(DOMAIN_MP), + vol.Optional(ATTR_FORMAT, default='hls'): vol.In(OUTPUT_FORMATS), +}) + WS_TYPE_CAMERA_THUMBNAIL = 'camera_thumbnail' SCHEMA_WS_CAMERA_THUMBNAIL = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({ vol.Required('type'): WS_TYPE_CAMERA_THUMBNAIL, @@ -176,6 +190,7 @@ async def async_setup(hass, config): WS_TYPE_CAMERA_THUMBNAIL, websocket_camera_thumbnail, SCHEMA_WS_CAMERA_THUMBNAIL ) + hass.components.websocket_api.async_register_command(ws_camera_stream) await component.async_setup(config) @@ -209,6 +224,10 @@ async def async_setup(hass, config): SERVICE_SNAPSHOT, CAMERA_SERVICE_SNAPSHOT, async_handle_snapshot_service ) + component.async_register_entity_service( + SERVICE_PLAY_STREAM, CAMERA_SERVICE_PLAY_STREAM, + async_handle_play_stream_service + ) return True @@ -273,6 +292,11 @@ class Camera(Entity): """Return the interval between frames of the mjpeg stream.""" return 0.5 + @property + def stream_source(self): + """Return the source of the stream.""" + return None + def camera_image(self): """Return bytes of camera image.""" raise NotImplementedError() @@ -473,6 +497,33 @@ async def websocket_camera_thumbnail(hass, connection, msg): msg['id'], 'image_fetch_failed', 'Unable to fetch image')) +@websocket_api.async_response +@websocket_api.websocket_command({ + vol.Required('type'): 'camera/stream', + vol.Required('entity_id'): cv.entity_id, + vol.Optional('format', default='hls'): vol.In(OUTPUT_FORMATS), +}) +async def ws_camera_stream(hass, connection, msg): + """Handle get camera stream websocket command. + + Async friendly. + """ + try: + camera = _get_camera_from_entity_id(hass, msg['entity_id']) + + if not camera.stream_source: + raise HomeAssistantError("{} does not support play stream service" + .format(camera.entity_id)) + + fmt = msg['format'] + url = request_stream(hass, camera.stream_source, fmt=fmt) + connection.send_result(msg['id'], {'url': url}) + except HomeAssistantError as ex: + _LOGGER.error(ex) + connection.send_error( + msg['id'], 'start_stream_failed', str(ex)) + + async def async_handle_snapshot_service(camera, service): """Handle snapshot services calls.""" hass = camera.hass @@ -500,3 +551,25 @@ async def async_handle_snapshot_service(camera, service): _write_image, snapshot_file, image) except OSError as err: _LOGGER.error("Can't write image to file: %s", err) + + +async def async_handle_play_stream_service(camera, service_call): + """Handle play stream services calls.""" + if not camera.stream_source: + raise HomeAssistantError("{} does not support play stream service" + .format(camera.entity_id)) + + hass = camera.hass + fmt = service_call.data[ATTR_FORMAT] + entity_ids = service_call.data[ATTR_MEDIA_PLAYER] + + url = request_stream(hass, camera.stream_source, fmt=fmt) + data = { + ATTR_ENTITY_ID: entity_ids, + ATTR_MEDIA_CONTENT_ID: url, + ATTR_MEDIA_CONTENT_TYPE: FORMAT_CONTENT_TYPE[fmt] + } + + await hass.services.async_call( + DOMAIN_MP, SERVICE_PLAY_MEDIA, data, + blocking=True, context=service_call.context) diff --git a/homeassistant/components/camera/ffmpeg.py b/homeassistant/components/camera/ffmpeg.py index db9e73f3e1b..83ffdd499e9 100644 --- a/homeassistant/components/camera/ffmpeg.py +++ b/homeassistant/components/camera/ffmpeg.py @@ -76,3 +76,8 @@ class FFmpegCamera(Camera): def name(self): """Return the name of this camera.""" return self._name + + @property + def stream_source(self): + """Return the source of the stream.""" + return self._input diff --git a/homeassistant/components/camera/onvif.py b/homeassistant/components/camera/onvif.py index da0bae7c50b..b0bd029a80c 100644 --- a/homeassistant/components/camera/onvif.py +++ b/homeassistant/components/camera/onvif.py @@ -230,3 +230,8 @@ class ONVIFHassCamera(Camera): def name(self): """Return the name of this camera.""" return self._name + + @property + def stream_source(self): + """Return the source of the stream.""" + return self._input diff --git a/homeassistant/components/camera/services.yaml b/homeassistant/components/camera/services.yaml index 1cae5baf1cf..ec00ce3ef5c 100644 --- a/homeassistant/components/camera/services.yaml +++ b/homeassistant/components/camera/services.yaml @@ -38,6 +38,22 @@ snapshot: description: Template of a Filename. Variable is entity_id. example: '/tmp/snapshot_{{ entity_id }}' +play_stream: + description: Play camera stream on supported media player. + fields: + entity_id: + description: Name(s) of entities to stream from. + example: 'camera.living_room_camera' + media_player: + description: Name(s) of media player to stream to. + example: 'media_player.living_room_tv' + format: + description: (Optional) Stream format supported by media player. + example: 'hls' + keepalive: + description: (Optional) Keep the stream worker alive for fast access. + example: 'true' + local_file_update_file_path: description: Update the file_path for a local_file camera. fields: diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py new file mode 100644 index 00000000000..5a4b1ade96d --- /dev/null +++ b/homeassistant/components/stream/__init__.py @@ -0,0 +1,153 @@ +""" +Provide functionality to stream video source. + +For more details about this component, please refer to the documentation at +https://home-assistant.io/components/stream/ +""" +import logging +import threading + +import voluptuous as vol + +from homeassistant.auth.util import generate_secret +from homeassistant.const import EVENT_HOMEASSISTANT_STOP +from homeassistant.core import callback +from homeassistant.exceptions import HomeAssistantError +from homeassistant.loader import bind_hass + +from .const import DOMAIN, ATTR_STREAMS, ATTR_ENDPOINTS +from .core import PROVIDERS +from .worker import stream_worker +from .hls import async_setup_hls + +REQUIREMENTS = ['av==6.1.2'] + +_LOGGER = logging.getLogger(__name__) + +DEPENDENCIES = ['http'] + +CONFIG_SCHEMA = vol.Schema({ + DOMAIN: vol.Schema({}), +}, extra=vol.ALLOW_EXTRA) + +# Set log level to error for libav +logging.getLogger('libav').setLevel(logging.ERROR) + + +@bind_hass +def request_stream(hass, stream_source, *, fmt='hls', + keepalive=False, options=None): + """Set up stream with token.""" + if DOMAIN not in hass.config.components: + raise HomeAssistantError("Stream component is not set up.") + + if options is None: + options = {} + + try: + streams = hass.data[DOMAIN][ATTR_STREAMS] + stream = streams.get(stream_source) + if not stream: + stream = Stream(hass, stream_source, + options=options, keepalive=keepalive) + streams[stream_source] = stream + + # Add provider + stream.add_provider(fmt) + + if not stream.access_token: + stream.access_token = generate_secret() + stream.start() + return hass.data[DOMAIN][ATTR_ENDPOINTS][fmt].format( + hass.config.api.base_url, stream.access_token) + except Exception: + raise HomeAssistantError('Unable to get stream') + + +async def async_setup(hass, config): + """Set up stream.""" + hass.data[DOMAIN] = {} + hass.data[DOMAIN][ATTR_ENDPOINTS] = {} + hass.data[DOMAIN][ATTR_STREAMS] = {} + + # Setup HLS + hls_endpoint = async_setup_hls(hass) + hass.data[DOMAIN][ATTR_ENDPOINTS]['hls'] = hls_endpoint + + @callback + def shutdown(event): + """Stop all stream workers.""" + for stream in hass.data[DOMAIN][ATTR_STREAMS].values(): + stream.keepalive = False + stream.stop() + _LOGGER.info("Stopped stream workers.") + + hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) + + return True + + +class Stream: + """Represents a single stream.""" + + def __init__(self, hass, source, options=None, keepalive=False): + """Initialize a stream.""" + self.hass = hass + self.source = source + self.options = options + self.keepalive = keepalive + self.access_token = None + self._thread = None + self._thread_quit = None + self._outputs = {} + + if self.options is None: + self.options = {} + + @property + def outputs(self): + """Return stream outputs.""" + return self._outputs + + def add_provider(self, fmt): + """Add provider output stream.""" + provider = PROVIDERS[fmt](self) + if not self._outputs.get(provider.format): + self._outputs[provider.format] = provider + return self._outputs[provider.format] + + def remove_provider(self, provider): + """Remove provider output stream.""" + if provider.format in self._outputs: + del self._outputs[provider.format] + + if not self._outputs: + self.stop() + + def start(self): + """Start a stream.""" + if self._thread is None or not self._thread.isAlive(): + self._thread_quit = threading.Event() + self._thread = threading.Thread( + name='stream_worker', + target=stream_worker, + args=( + self.hass, self, self._thread_quit)) + self._thread.start() + _LOGGER.info("Started stream: %s", self.source) + + def stop(self): + """Remove outputs and access token.""" + self._outputs = {} + self.access_token = None + + if not self.keepalive: + self._stop() + + def _stop(self): + """Stop worker thread.""" + if self._thread is not None: + self._thread_quit.set() + self._thread.join() + self._thread = None + _LOGGER.info("Stopped stream: %s", self.source) diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py new file mode 100644 index 00000000000..a87daaa9d40 --- /dev/null +++ b/homeassistant/components/stream/const.py @@ -0,0 +1,14 @@ +"""Constants for Stream component.""" +DOMAIN = 'stream' + +ATTR_ENDPOINTS = 'endpoints' +ATTR_STREAMS = 'streams' +ATTR_KEEPALIVE = 'keepalive' + +OUTPUT_FORMATS = ['hls'] + +FORMAT_CONTENT_TYPE = { + 'hls': 'application/vnd.apple.mpegurl' +} + +AUDIO_SAMPLE_RATE = 44100 diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py new file mode 100644 index 00000000000..3d6ffa0e20c --- /dev/null +++ b/homeassistant/components/stream/core.py @@ -0,0 +1,162 @@ +"""Provides core stream functionality.""" +import asyncio +from collections import deque +import io +from typing import List, Any + +import attr +from aiohttp import web + +from homeassistant.core import callback +from homeassistant.components.http import HomeAssistantView +from homeassistant.helpers.event import async_call_later +from homeassistant.util.decorator import Registry + +from .const import DOMAIN, ATTR_STREAMS + +PROVIDERS = Registry() + + +@attr.s +class StreamBuffer: + """Represent a segment.""" + + segment = attr.ib(type=io.BytesIO) + output = attr.ib() # type=av.OutputContainer + vstream = attr.ib() # type=av.VideoStream + astream = attr.ib(default=None) # type=av.AudioStream + + +@attr.s +class Segment: + """Represent a segment.""" + + sequence = attr.ib(type=int) + segment = attr.ib(type=io.BytesIO) + duration = attr.ib(type=float) + + +class StreamOutput: + """Represents a stream output.""" + + num_segments = 3 + + def __init__(self, stream) -> None: + """Initialize a stream output.""" + self._stream = stream + self._cursor = None + self._event = asyncio.Event() + self._segments = deque(maxlen=self.num_segments) + self._unsub = None + + @property + def format(self) -> str: + """Return container format.""" + return None + + @property + def audio_codec(self) -> str: + """Return desired audio codec.""" + return None + + @property + def video_codec(self) -> str: + """Return desired video codec.""" + return None + + @property + def segments(self) -> List[int]: + """Return current sequence from segments.""" + return [s.sequence for s in self._segments] + + @property + def target_duration(self) -> int: + """Return the average duration of the segments in seconds.""" + durations = [s.duration for s in self._segments] + return round(sum(durations) // len(self._segments)) or 1 + + def get_segment(self, sequence: int = None) -> Any: + """Retrieve a specific segment, or the whole list.""" + # Reset idle timeout + if self._unsub is not None: + self._unsub() + self._unsub = async_call_later(self._stream.hass, 300, self._cleanup) + + if not sequence: + return self._segments + + for segment in self._segments: + if segment.sequence == sequence: + return segment + return None + + async def recv(self) -> Segment: + """Wait for and retrieve the latest segment.""" + last_segment = max(self.segments, default=0) + if self._cursor is None or self._cursor <= last_segment: + await self._event.wait() + + if not self._segments: + return None + + segment = self.get_segment()[-1] + self._cursor = segment.sequence + return segment + + @callback + def put(self, segment: Segment) -> None: + """Store output.""" + # Start idle timeout when we start recieving data + if self._unsub is None: + self._unsub = async_call_later( + self._stream.hass, 300, self._cleanup) + + if segment is None: + self._event.set() + # Cleanup provider + if self._unsub is not None: + self._unsub() + self._cleanup() + return + + self._segments.append(segment) + self._event.set() + self._event.clear() + + @callback + def _cleanup(self, _now=None): + """Remove provider.""" + self._segments = [] + self._stream.remove_provider(self) + + +class StreamView(HomeAssistantView): + """ + Base StreamView. + + For implementation of a new stream format, define `url` and `name` + attributes, and implement `handle` method in a child class. + """ + + requires_auth = False + platform = None + + async def get(self, request, token, sequence=None): + """Start a GET request.""" + hass = request.app['hass'] + + stream = next(( + s for s in hass.data[DOMAIN][ATTR_STREAMS].values() + if s.access_token == token), None) + + if not stream: + raise web.HTTPNotFound() + + # Start worker if not already started + stream.start() + + return await self.handle(request, stream, sequence) + + async def handle(self, request, stream, sequence): + """Handle the stream request.""" + raise NotImplementedError() diff --git a/homeassistant/components/stream/hls.py b/homeassistant/components/stream/hls.py new file mode 100644 index 00000000000..285f752c033 --- /dev/null +++ b/homeassistant/components/stream/hls.py @@ -0,0 +1,126 @@ +""" +Provide functionality to stream HLS. + +For more details about this component, please refer to the documentation at +https://home-assistant.io/components/stream/hls +""" +from aiohttp import web + +from homeassistant.core import callback +from homeassistant.util.dt import utcnow + +from .const import FORMAT_CONTENT_TYPE +from .core import StreamView, StreamOutput, PROVIDERS + + +@callback +def async_setup_hls(hass): + """Set up api endpoints.""" + hass.http.register_view(HlsPlaylistView()) + hass.http.register_view(HlsSegmentView()) + return '{}/api/hls/{}/playlist.m3u8' + + +class HlsPlaylistView(StreamView): + """Stream view to serve a M3U8 stream.""" + + url = r'/api/hls/{token:[a-f0-9]+}/playlist.m3u8' + name = 'api:stream:hls:playlist' + cors_allowed = True + + async def handle(self, request, stream, sequence): + """Return m3u8 playlist.""" + renderer = M3U8Renderer(stream) + track = stream.add_provider('hls') + stream.start() + # Wait for a segment to be ready + if not track.segments: + await track.recv() + headers = { + 'Content-Type': FORMAT_CONTENT_TYPE['hls'] + } + return web.Response(body=renderer.render( + track, utcnow()).encode("utf-8"), headers=headers) + + +class HlsSegmentView(StreamView): + """Stream view to serve a MPEG2TS segment.""" + + url = r'/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.ts' + name = 'api:stream:hls:segment' + cors_allowed = True + + async def handle(self, request, stream, sequence): + """Return mpegts segment.""" + track = stream.add_provider('hls') + segment = track.get_segment(int(sequence)) + if not segment: + return web.HTTPNotFound() + headers = { + 'Content-Type': 'video/mp2t' + } + return web.Response(body=segment.segment.getvalue(), headers=headers) + + +class M3U8Renderer: + """M3U8 Render Helper.""" + + def __init__(self, stream): + """Initialize renderer.""" + self.stream = stream + + @staticmethod + def render_preamble(track): + """Render preamble.""" + return [ + "#EXT-X-VERSION:3", + "#EXT-X-TARGETDURATION:{}".format(track.target_duration), + ] + + @staticmethod + def render_playlist(track, start_time): + """Render playlist.""" + segments = track.segments + + if not segments: + return [] + + playlist = ["#EXT-X-MEDIA-SEQUENCE:{}".format(segments[0])] + + for sequence in segments: + segment = track.get_segment(sequence) + playlist.extend([ + "#EXTINF:{:.04},".format(float(segment.duration)), + "./segment/{}.ts".format(segment.sequence), + ]) + + return playlist + + def render(self, track, start_time): + """Render M3U8 file.""" + lines = ( + ["#EXTM3U"] + + self.render_preamble(track) + + self.render_playlist(track, start_time) + ) + return "\n".join(lines) + "\n" + + +@PROVIDERS.register('hls') +class HlsStreamOutput(StreamOutput): + """Represents HLS Output formats.""" + + @property + def format(self) -> str: + """Return container format.""" + return 'mpegts' + + @property + def audio_codec(self) -> str: + """Return desired audio codec.""" + return 'aac' + + @property + def video_codec(self) -> str: + """Return desired video codec.""" + return 'h264' diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py new file mode 100644 index 00000000000..3a3e19d9703 --- /dev/null +++ b/homeassistant/components/stream/worker.py @@ -0,0 +1,142 @@ +"""Provides the worker thread needed for processing streams.""" +from fractions import Fraction +import io +import logging + +from .const import AUDIO_SAMPLE_RATE +from .core import Segment, StreamBuffer + +_LOGGER = logging.getLogger(__name__) + + +def generate_audio_frame(): + """Generate a blank audio frame.""" + from av import AudioFrame + audio_frame = AudioFrame(format='dbl', layout='mono', samples=1024) + # audio_bytes = b''.join(b'\x00\x00\x00\x00\x00\x00\x00\x00' + # for i in range(0, 1024)) + audio_bytes = b'\x00\x00\x00\x00\x00\x00\x00\x00' * 1024 + audio_frame.planes[0].update(audio_bytes) + audio_frame.sample_rate = AUDIO_SAMPLE_RATE + audio_frame.time_base = Fraction(1, AUDIO_SAMPLE_RATE) + return audio_frame + + +def create_stream_buffer(stream_output, video_stream, audio_frame): + """Create a new StreamBuffer.""" + import av + a_packet = None + segment = io.BytesIO() + output = av.open( + segment, mode='w', format=stream_output.format) + vstream = output.add_stream( + stream_output.video_codec, video_stream.rate) + # Fix format + vstream.codec_context.format = \ + video_stream.codec_context.format + # Check if audio is requested + astream = None + if stream_output.audio_codec: + astream = output.add_stream( + stream_output.audio_codec, AUDIO_SAMPLE_RATE) + # Need to do it multiple times for some reason + while not a_packet: + a_packets = astream.encode(audio_frame) + if a_packets: + a_packet = a_packets[0] + return (a_packet, StreamBuffer(segment, output, vstream, astream)) + + +def stream_worker(hass, stream, quit_event): + """Handle consuming streams.""" + import av + container = av.open(stream.source, options=stream.options) + try: + video_stream = container.streams.video[0] + except (KeyError, IndexError): + _LOGGER.error("Stream has no video") + return + + audio_frame = generate_audio_frame() + + outputs = {} + first_packet = True + sequence = 1 + audio_packets = {} + + while not quit_event.is_set(): + try: + packet = next(container.demux(video_stream)) + if packet.dts is None: + # If we get a "flushing" packet, the stream is done + raise StopIteration + except (av.AVError, StopIteration) as ex: + # End of stream, clear listeners and stop thread + for fmt, _ in outputs.items(): + hass.loop.call_soon_threadsafe( + stream.outputs[fmt].put, None) + _LOGGER.error("Error demuxing stream: %s", ex) + break + + # Reset segment on every keyframe + if packet.is_keyframe: + # Save segment to outputs + segment_duration = (packet.pts * packet.time_base) / sequence + for fmt, buffer in outputs.items(): + buffer.output.close() + del audio_packets[buffer.astream] + if stream.outputs.get(fmt): + hass.loop.call_soon_threadsafe( + stream.outputs[fmt].put, Segment( + sequence, buffer.segment, segment_duration + )) + + # Clear outputs and increment sequence + outputs = {} + if not first_packet: + sequence += 1 + + # Initialize outputs + for stream_output in stream.outputs.values(): + if video_stream.name != stream_output.video_codec: + continue + + a_packet, buffer = create_stream_buffer( + stream_output, video_stream, audio_frame) + audio_packets[buffer.astream] = a_packet + outputs[stream_output.format] = buffer + + # First video packet tends to have a weird dts/pts + if first_packet: + packet.dts = 0 + packet.pts = 0 + first_packet = False + + # Store packets on each output + for buffer in outputs.values(): + # Check if the format requires audio + if audio_packets.get(buffer.astream): + a_packet = audio_packets[buffer.astream] + a_time_base = a_packet.time_base + + # Determine video start timestamp and duration + video_start = packet.pts * packet.time_base + video_duration = packet.duration * packet.time_base + + if packet.is_keyframe: + # Set first audio packet in sequence to equal video pts + a_packet.pts = int(video_start / a_time_base) + a_packet.dts = int(video_start / a_time_base) + + # Determine target end timestamp for audio + target_pts = int((video_start + video_duration) / a_time_base) + while a_packet.pts < target_pts: + # Mux audio packet and adjust points until target hit + buffer.output.mux(a_packet) + a_packet.pts += a_packet.duration + a_packet.dts += a_packet.duration + audio_packets[buffer.astream] = a_packet + + # Assign the video packet to the new stream & mux + packet.stream = buffer.vstream + buffer.output.mux(packet) diff --git a/requirements_all.txt b/requirements_all.txt index d34c3483eb1..08b4f1bb5df 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -179,6 +179,9 @@ asterisk_mbox==0.5.0 # homeassistant.components.media_player.dlna_dmr async-upnp-client==0.14.5 +# homeassistant.components.stream +av==6.1.2 + # homeassistant.components.light.avion # avion==0.10 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 2da5247417d..aa2f7e8fc6f 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -53,6 +53,9 @@ aiounifi==4 # homeassistant.components.notify.apns apns2==0.3.0 +# homeassistant.components.stream +av==6.1.2 + # homeassistant.components.zha bellows-homeassistant==0.7.1 diff --git a/script/gen_requirements_all.py b/script/gen_requirements_all.py index 04c32ff2b26..bb242d1e7ba 100755 --- a/script/gen_requirements_all.py +++ b/script/gen_requirements_all.py @@ -45,6 +45,7 @@ TEST_REQUIREMENTS = ( 'aiohue', 'aiounifi', 'apns2', + 'av', 'caldav', 'coinmarketcap', 'defusedxml', diff --git a/tests/components/camera/test_init.py b/tests/components/camera/test_init.py index 6b98f378ef0..840e30161f3 100644 --- a/tests/components/camera/test_init.py +++ b/tests/components/camera/test_init.py @@ -1,12 +1,12 @@ """The tests for the camera component.""" import asyncio import base64 -from unittest.mock import patch, mock_open +from unittest.mock import patch, mock_open, PropertyMock import pytest from homeassistant.setup import setup_component, async_setup_component -from homeassistant.const import ATTR_ENTITY_PICTURE +from homeassistant.const import (ATTR_ENTITY_ID, ATTR_ENTITY_PICTURE) from homeassistant.components import camera, http from homeassistant.components.websocket_api.const import TYPE_RESULT from homeassistant.exceptions import HomeAssistantError @@ -16,6 +16,7 @@ from tests.common import ( get_test_home_assistant, get_test_instance_port, assert_setup_component, mock_coro) from tests.components.camera import common +from tests.components.stream.common import generate_h264_video @pytest.fixture @@ -32,6 +33,14 @@ def mock_camera(hass): yield +@pytest.fixture +def mock_stream(hass): + """Initialize a demo camera platform with streaming.""" + assert hass.loop.run_until_complete(async_setup_component(hass, 'stream', { + 'stream': {} + })) + + class TestSetupCamera: """Test class for setup camera.""" @@ -156,3 +165,88 @@ async def test_webocket_camera_thumbnail(hass, hass_ws_client, mock_camera): assert msg['result']['content_type'] == 'image/jpeg' assert msg['result']['content'] == \ base64.b64encode(b'Test').decode('utf-8') + + +async def test_webocket_stream_no_source(hass, hass_ws_client, + mock_camera, mock_stream): + """Test camera/stream websocket command.""" + await async_setup_component(hass, 'camera') + + with patch('homeassistant.components.camera.request_stream', + return_value='http://home.assistant/playlist.m3u8') \ + as mock_request_stream: + # Request playlist through WebSocket + client = await hass_ws_client(hass) + await client.send_json({ + 'id': 6, + 'type': 'camera/stream', + 'entity_id': 'camera.demo_camera', + }) + msg = await client.receive_json() + + # Assert WebSocket response + assert not mock_request_stream.called + assert msg['id'] == 6 + assert msg['type'] == TYPE_RESULT + assert not msg['success'] + + +async def test_webocket_camera_stream(hass, hass_ws_client, hass_client, + mock_camera, mock_stream): + """Test camera/stream websocket command.""" + await async_setup_component(hass, 'camera') + + with patch('homeassistant.components.camera.request_stream', + return_value='http://home.assistant/playlist.m3u8' + ) as mock_request_stream, \ + patch('homeassistant.components.camera.demo.DemoCamera.stream_source', + new_callable=PropertyMock) as mock_stream_source: + mock_stream_source.return_value = generate_h264_video() + # Request playlist through WebSocket + client = await hass_ws_client(hass) + await client.send_json({ + 'id': 6, + 'type': 'camera/stream', + 'entity_id': 'camera.demo_camera', + }) + msg = await client.receive_json() + + # Assert WebSocket response + assert mock_request_stream.called + assert msg['id'] == 6 + assert msg['type'] == TYPE_RESULT + assert msg['success'] + assert msg['result']['url'][-13:] == 'playlist.m3u8' + + +async def test_play_stream_service_no_source(hass, mock_camera, mock_stream): + """Test camera play_stream service.""" + data = { + ATTR_ENTITY_ID: 'camera.demo_camera', + camera.ATTR_MEDIA_PLAYER: 'media_player.test' + } + with patch('homeassistant.components.camera.request_stream'), \ + pytest.raises(HomeAssistantError): + # Call service + await hass.services.async_call( + camera.DOMAIN, camera.SERVICE_PLAY_STREAM, data, blocking=True) + + +async def test_handle_play_stream_service(hass, mock_camera, mock_stream): + """Test camera play_stream service.""" + await async_setup_component(hass, 'media_player') + data = { + ATTR_ENTITY_ID: 'camera.demo_camera', + camera.ATTR_MEDIA_PLAYER: 'media_player.test' + } + with patch('homeassistant.components.camera.request_stream' + ) as mock_request_stream, \ + patch('homeassistant.components.camera.demo.DemoCamera.stream_source', + new_callable=PropertyMock) as mock_stream_source: + mock_stream_source.return_value = generate_h264_video() + # Call service + await hass.services.async_call( + camera.DOMAIN, camera.SERVICE_PLAY_STREAM, data, blocking=True) + # So long as we request the stream, the rest should be covered + # by the play_media service tests. + assert mock_request_stream.called diff --git a/tests/components/stream/__init__.py b/tests/components/stream/__init__.py new file mode 100644 index 00000000000..96247f0ee16 --- /dev/null +++ b/tests/components/stream/__init__.py @@ -0,0 +1 @@ +"""The tests for stream platforms.""" diff --git a/tests/components/stream/common.py b/tests/components/stream/common.py new file mode 100644 index 00000000000..7e8016cd43c --- /dev/null +++ b/tests/components/stream/common.py @@ -0,0 +1,63 @@ +"""Collection of test helpers.""" +import io + +from homeassistant.components.stream import Stream +from homeassistant.components.stream.const import ( + DOMAIN, ATTR_STREAMS) + + +def generate_h264_video(): + """ + Generate a test video. + + See: http://docs.mikeboers.com/pyav/develop/cookbook/numpy.html + """ + import numpy as np + import av + + duration = 5 + fps = 24 + total_frames = duration * fps + + output = io.BytesIO() + output.name = 'test.ts' + container = av.open(output, mode='w') + + stream = container.add_stream('libx264', rate=fps) + stream.width = 480 + stream.height = 320 + stream.pix_fmt = 'yuv420p' + + for frame_i in range(total_frames): + + img = np.empty((480, 320, 3)) + img[:, :, 0] = 0.5 + 0.5 * np.sin( + 2 * np.pi * (0 / 3 + frame_i / total_frames)) + img[:, :, 1] = 0.5 + 0.5 * np.sin( + 2 * np.pi * (1 / 3 + frame_i / total_frames)) + img[:, :, 2] = 0.5 + 0.5 * np.sin( + 2 * np.pi * (2 / 3 + frame_i / total_frames)) + + img = np.round(255 * img).astype(np.uint8) + img = np.clip(img, 0, 255) + + frame = av.VideoFrame.from_ndarray(img, format='rgb24') + for packet in stream.encode(frame): + container.mux(packet) + + # Flush stream + for packet in stream.encode(): + container.mux(packet) + + # Close the file + container.close() + output.seek(0) + + return output + + +def preload_stream(hass, stream_source): + """Preload a stream for use in tests.""" + stream = Stream(hass, stream_source) + hass.data[DOMAIN][ATTR_STREAMS][stream_source] = stream + return stream diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py new file mode 100644 index 00000000000..a2c962ffb45 --- /dev/null +++ b/tests/components/stream/test_hls.py @@ -0,0 +1,117 @@ +"""The tests for hls streams.""" +from datetime import timedelta +from urllib.parse import urlparse + +from homeassistant.setup import async_setup_component +from homeassistant.components.stream import request_stream +import homeassistant.util.dt as dt_util + +from tests.common import async_fire_time_changed +from tests.components.stream.common import ( + generate_h264_video, preload_stream) + + +async def test_hls_stream(hass, hass_client): + """ + Test hls stream. + + Purposefully not mocking anything here to test full + integration with the stream component. + """ + await async_setup_component(hass, 'stream', { + 'stream': {} + }) + + # Setup demo HLS track + source = generate_h264_video() + stream = preload_stream(hass, source) + stream.add_provider('hls') + + # Request stream + url = request_stream(hass, source) + + http_client = await hass_client() + + # Fetch playlist + parsed_url = urlparse(url) + playlist_response = await http_client.get(parsed_url.path) + assert playlist_response.status == 200 + + # Fetch segment + playlist = await playlist_response.text() + playlist_url = '/'.join(parsed_url.path.split('/')[:-1]) + segment_url = playlist_url + playlist.splitlines()[-1][1:] + segment_response = await http_client.get(segment_url) + assert segment_response.status == 200 + + # Stop stream, if it hasn't quit already + stream.stop() + + # Ensure playlist not accessable after stream ends + fail_response = await http_client.get(parsed_url.path) + assert fail_response.status == 404 + + +async def test_stream_timeout(hass, hass_client): + """Test hls stream timeout.""" + await async_setup_component(hass, 'stream', { + 'stream': {} + }) + + # Setup demo HLS track + source = generate_h264_video() + stream = preload_stream(hass, source) + stream.add_provider('hls') + + # Request stream + url = request_stream(hass, source) + + http_client = await hass_client() + + # Fetch playlist + parsed_url = urlparse(url) + playlist_response = await http_client.get(parsed_url.path) + assert playlist_response.status == 200 + + # Wait a minute + future = dt_util.utcnow() + timedelta(minutes=1) + async_fire_time_changed(hass, future) + + # Fetch again to reset timer + playlist_response = await http_client.get(parsed_url.path) + assert playlist_response.status == 200 + + # Wait 5 minutes + future = dt_util.utcnow() + timedelta(minutes=5) + async_fire_time_changed(hass, future) + + # Ensure playlist not accessable + fail_response = await http_client.get(parsed_url.path) + assert fail_response.status == 404 + + +async def test_stream_ended(hass): + """Test hls stream packets ended.""" + await async_setup_component(hass, 'stream', { + 'stream': {} + }) + + # Setup demo HLS track + source = generate_h264_video() + stream = preload_stream(hass, source) + track = stream.add_provider('hls') + track.num_segments = 2 + + # Request stream + request_stream(hass, source) + + # Run it dead + segments = 0 + while await track.recv() is not None: + segments += 1 + + assert segments == 3 + assert not track.get_segment() + + # Stop stream, if it hasn't quit already + stream.stop() diff --git a/virtualization/Docker/setup_docker_prereqs b/virtualization/Docker/setup_docker_prereqs index 713bbfffba4..bbc513502fa 100755 --- a/virtualization/Docker/setup_docker_prereqs +++ b/virtualization/Docker/setup_docker_prereqs @@ -28,6 +28,9 @@ PACKAGES=( libmpc-dev libmpfr-dev libgmp-dev # homeassistant.components.ffmpeg ffmpeg + # homeassistant.components.stream + libavformat-dev libavcodec-dev libavdevice-dev + libavutil-dev libswscale-dev libswresample-dev libavfilter-dev # homeassistant.components.sensor.iperf3 iperf3 )