Change the API boundary between camera and stream with initial improvement for nest expiring stream urls (#45431)
* Change the API boundary between stream and camera Shift more of the stream lifecycle management to the camera. The motivation is to support stream urls that expire giving the camera the ability to change the stream once it is created. * Document stream lifecycle and simplify stream/camera interaction * Reorder create_stream function to reduce diffs * Increase test coverage for camera_sdm.py * Fix ffmpeg typo. * Add a stream identifier for each stream, managed by camera * Remove stream record service * Update homeassistant/components/stream/__init__.py Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io> * Unroll changes to Stream interface back into camera component * Fix preload stream to actually start the background worker * Reduce unncessary diffs for readability * Remove redundant camera stream start code Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
This commit is contained in:
parent
889baef456
commit
2bcf87b980
14 changed files with 254 additions and 356 deletions
|
@ -1,28 +1,35 @@
|
|||
"""Provide functionality to stream video source."""
|
||||
"""Provide functionality to stream video source.
|
||||
|
||||
Components use create_stream with a stream source (e.g. an rtsp url) to create
|
||||
a new Stream object. Stream manages:
|
||||
- Background work to fetch and decode a stream
|
||||
- Desired output formats
|
||||
- Home Assistant URLs for viewing a stream
|
||||
- Access tokens for URLs for viewing a stream
|
||||
|
||||
A Stream consists of a background worker, and one or more output formats each
|
||||
with their own idle timeout managed by the stream component. When an output
|
||||
format is no longer in use, the stream component will expire it. When there
|
||||
are no active output formats, the background worker is shut down and access
|
||||
tokens are expired. Alternatively, a Stream can be configured with keepalive
|
||||
to always keep workers active.
|
||||
"""
|
||||
import logging
|
||||
import secrets
|
||||
import threading
|
||||
import time
|
||||
from types import MappingProxyType
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.const import CONF_FILENAME, EVENT_HOMEASSISTANT_STOP
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.loader import bind_hass
|
||||
|
||||
from .const import (
|
||||
ATTR_ENDPOINTS,
|
||||
ATTR_STREAMS,
|
||||
CONF_DURATION,
|
||||
CONF_LOOKBACK,
|
||||
CONF_STREAM_SOURCE,
|
||||
DOMAIN,
|
||||
MAX_SEGMENTS,
|
||||
OUTPUT_IDLE_TIMEOUT,
|
||||
SERVICE_RECORD,
|
||||
STREAM_RESTART_INCREMENT,
|
||||
STREAM_RESTART_RESET_TIME,
|
||||
)
|
||||
|
@ -31,20 +38,13 @@ from .hls import async_setup_hls
|
|||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
STREAM_SERVICE_SCHEMA = vol.Schema({vol.Required(CONF_STREAM_SOURCE): cv.string})
|
||||
|
||||
SERVICE_RECORD_SCHEMA = STREAM_SERVICE_SCHEMA.extend(
|
||||
{
|
||||
vol.Required(CONF_FILENAME): cv.string,
|
||||
vol.Optional(CONF_DURATION, default=30): int,
|
||||
vol.Optional(CONF_LOOKBACK, default=0): int,
|
||||
}
|
||||
)
|
||||
def create_stream(hass, stream_source, options=None):
|
||||
"""Create a stream with the specified identfier based on the source url.
|
||||
|
||||
|
||||
@bind_hass
|
||||
def request_stream(hass, stream_source, *, fmt="hls", keepalive=False, options=None):
|
||||
"""Set up stream with token."""
|
||||
The stream_source is typically an rtsp url and options are passed into
|
||||
pyav / ffmpeg as options.
|
||||
"""
|
||||
if DOMAIN not in hass.config.components:
|
||||
raise HomeAssistantError("Stream integration is not set up.")
|
||||
|
||||
|
@ -59,25 +59,9 @@ def request_stream(hass, stream_source, *, fmt="hls", keepalive=False, options=N
|
|||
**options,
|
||||
}
|
||||
|
||||
try:
|
||||
streams = hass.data[DOMAIN][ATTR_STREAMS]
|
||||
stream = streams.get(stream_source)
|
||||
if not stream:
|
||||
stream = Stream(hass, stream_source, options=options, keepalive=keepalive)
|
||||
streams[stream_source] = stream
|
||||
else:
|
||||
# Update keepalive option on existing stream
|
||||
stream.keepalive = keepalive
|
||||
|
||||
# Add provider
|
||||
stream.add_provider(fmt)
|
||||
|
||||
if not stream.access_token:
|
||||
stream.access_token = secrets.token_hex()
|
||||
stream.start()
|
||||
return hass.data[DOMAIN][ATTR_ENDPOINTS][fmt].format(stream.access_token)
|
||||
except Exception as err:
|
||||
raise HomeAssistantError("Unable to get stream") from err
|
||||
stream = Stream(hass, stream_source, options=options)
|
||||
hass.data[DOMAIN][ATTR_STREAMS].append(stream)
|
||||
return stream
|
||||
|
||||
|
||||
async def async_setup(hass, config):
|
||||
|
@ -92,7 +76,7 @@ async def async_setup(hass, config):
|
|||
|
||||
hass.data[DOMAIN] = {}
|
||||
hass.data[DOMAIN][ATTR_ENDPOINTS] = {}
|
||||
hass.data[DOMAIN][ATTR_STREAMS] = {}
|
||||
hass.data[DOMAIN][ATTR_STREAMS] = []
|
||||
|
||||
# Setup HLS
|
||||
hls_endpoint = async_setup_hls(hass)
|
||||
|
@ -104,33 +88,25 @@ async def async_setup(hass, config):
|
|||
@callback
|
||||
def shutdown(event):
|
||||
"""Stop all stream workers."""
|
||||
for stream in hass.data[DOMAIN][ATTR_STREAMS].values():
|
||||
for stream in hass.data[DOMAIN][ATTR_STREAMS]:
|
||||
stream.keepalive = False
|
||||
stream.stop()
|
||||
_LOGGER.info("Stopped stream workers")
|
||||
|
||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
|
||||
|
||||
async def async_record(call):
|
||||
"""Call record stream service handler."""
|
||||
await async_handle_record_service(hass, call)
|
||||
|
||||
hass.services.async_register(
|
||||
DOMAIN, SERVICE_RECORD, async_record, schema=SERVICE_RECORD_SCHEMA
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class Stream:
|
||||
"""Represents a single stream."""
|
||||
|
||||
def __init__(self, hass, source, options=None, keepalive=False):
|
||||
def __init__(self, hass, source, options=None):
|
||||
"""Initialize a stream."""
|
||||
self.hass = hass
|
||||
self.source = source
|
||||
self.options = options
|
||||
self.keepalive = keepalive
|
||||
self.keepalive = False
|
||||
self.access_token = None
|
||||
self._thread = None
|
||||
self._thread_quit = None
|
||||
|
@ -139,6 +115,14 @@ class Stream:
|
|||
if self.options is None:
|
||||
self.options = {}
|
||||
|
||||
def endpoint_url(self, fmt):
|
||||
"""Start the stream and returns a url for the output format."""
|
||||
if fmt not in self._outputs:
|
||||
raise ValueError(f"Stream is not configured for format '{fmt}'")
|
||||
if not self.access_token:
|
||||
self.access_token = secrets.token_hex()
|
||||
return self.hass.data[DOMAIN][ATTR_ENDPOINTS][fmt].format(self.access_token)
|
||||
|
||||
@property
|
||||
def outputs(self):
|
||||
"""Return a copy of the stream outputs."""
|
||||
|
@ -244,39 +228,28 @@ class Stream:
|
|||
self._thread = None
|
||||
_LOGGER.info("Stopped stream: %s", self.source)
|
||||
|
||||
async def async_record(self, video_path, duration=30, lookback=5):
|
||||
"""Make a .mp4 recording from a provided stream."""
|
||||
|
||||
async def async_handle_record_service(hass, call):
|
||||
"""Handle save video service calls."""
|
||||
stream_source = call.data[CONF_STREAM_SOURCE]
|
||||
video_path = call.data[CONF_FILENAME]
|
||||
duration = call.data[CONF_DURATION]
|
||||
lookback = call.data[CONF_LOOKBACK]
|
||||
# Check for file access
|
||||
if not self.hass.config.is_allowed_path(video_path):
|
||||
raise HomeAssistantError(f"Can't write {video_path}, no access to path!")
|
||||
|
||||
# Check for file access
|
||||
if not hass.config.is_allowed_path(video_path):
|
||||
raise HomeAssistantError(f"Can't write {video_path}, no access to path!")
|
||||
# Add recorder
|
||||
recorder = self.outputs.get("recorder")
|
||||
if recorder:
|
||||
raise HomeAssistantError(
|
||||
f"Stream already recording to {recorder.video_path}!"
|
||||
)
|
||||
recorder = self.add_provider("recorder", timeout=duration)
|
||||
recorder.video_path = video_path
|
||||
|
||||
# Check for active stream
|
||||
streams = hass.data[DOMAIN][ATTR_STREAMS]
|
||||
stream = streams.get(stream_source)
|
||||
if not stream:
|
||||
stream = Stream(hass, stream_source)
|
||||
streams[stream_source] = stream
|
||||
self.start()
|
||||
|
||||
# Add recorder
|
||||
recorder = stream.outputs.get("recorder")
|
||||
if recorder:
|
||||
raise HomeAssistantError(f"Stream already recording to {recorder.video_path}!")
|
||||
|
||||
recorder = stream.add_provider("recorder", timeout=duration)
|
||||
recorder.video_path = video_path
|
||||
|
||||
stream.start()
|
||||
|
||||
# Take advantage of lookback
|
||||
hls = stream.outputs.get("hls")
|
||||
if lookback > 0 and hls:
|
||||
num_segments = min(int(lookback // hls.target_duration), MAX_SEGMENTS)
|
||||
# Wait for latest segment, then add the lookback
|
||||
await hls.recv()
|
||||
recorder.prepend(list(hls.get_segment())[-num_segments:])
|
||||
# Take advantage of lookback
|
||||
hls = self.outputs.get("hls")
|
||||
if lookback > 0 and hls:
|
||||
num_segments = min(int(lookback // hls.target_duration), MAX_SEGMENTS)
|
||||
# Wait for latest segment, then add the lookback
|
||||
await hls.recv()
|
||||
recorder.prepend(list(hls.get_segment())[-num_segments:])
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue