* initial commit of streams * refactor stream component * refactor so stream formats are not considered a platform * initial test and minor refactor * fix linting * update requirements * need av in tests as well * fix import in class def vs method * fix travis and docker builds * address code review comments * fix logger, add stream start/stop logs, listen to HASS stop * address additional code review comments * beef up tests * fix tests * fix lint * add stream_source to onvif camera * address pr comments * add keepalive to camera play_stream service * remove keepalive and move import * implement registry and have output provider remove itself from stream after idle, set libav log level to error
162 lines
4.3 KiB
Python
162 lines
4.3 KiB
Python
"""Provides core stream functionality."""
|
|
import asyncio
|
|
from collections import deque
|
|
import io
|
|
from typing import List, Any
|
|
|
|
import attr
|
|
from aiohttp import web
|
|
|
|
from homeassistant.core import callback
|
|
from homeassistant.components.http import HomeAssistantView
|
|
from homeassistant.helpers.event import async_call_later
|
|
from homeassistant.util.decorator import Registry
|
|
|
|
from .const import DOMAIN, ATTR_STREAMS
|
|
|
|
PROVIDERS = Registry()
|
|
|
|
|
|
@attr.s
|
|
class StreamBuffer:
|
|
"""Represent a segment."""
|
|
|
|
segment = attr.ib(type=io.BytesIO)
|
|
output = attr.ib() # type=av.OutputContainer
|
|
vstream = attr.ib() # type=av.VideoStream
|
|
astream = attr.ib(default=None) # type=av.AudioStream
|
|
|
|
|
|
@attr.s
|
|
class Segment:
|
|
"""Represent a segment."""
|
|
|
|
sequence = attr.ib(type=int)
|
|
segment = attr.ib(type=io.BytesIO)
|
|
duration = attr.ib(type=float)
|
|
|
|
|
|
class StreamOutput:
|
|
"""Represents a stream output."""
|
|
|
|
num_segments = 3
|
|
|
|
def __init__(self, stream) -> None:
|
|
"""Initialize a stream output."""
|
|
self._stream = stream
|
|
self._cursor = None
|
|
self._event = asyncio.Event()
|
|
self._segments = deque(maxlen=self.num_segments)
|
|
self._unsub = None
|
|
|
|
@property
|
|
def format(self) -> str:
|
|
"""Return container format."""
|
|
return None
|
|
|
|
@property
|
|
def audio_codec(self) -> str:
|
|
"""Return desired audio codec."""
|
|
return None
|
|
|
|
@property
|
|
def video_codec(self) -> str:
|
|
"""Return desired video codec."""
|
|
return None
|
|
|
|
@property
|
|
def segments(self) -> List[int]:
|
|
"""Return current sequence from segments."""
|
|
return [s.sequence for s in self._segments]
|
|
|
|
@property
|
|
def target_duration(self) -> int:
|
|
"""Return the average duration of the segments in seconds."""
|
|
durations = [s.duration for s in self._segments]
|
|
return round(sum(durations) // len(self._segments)) or 1
|
|
|
|
def get_segment(self, sequence: int = None) -> Any:
|
|
"""Retrieve a specific segment, or the whole list."""
|
|
# Reset idle timeout
|
|
if self._unsub is not None:
|
|
self._unsub()
|
|
self._unsub = async_call_later(self._stream.hass, 300, self._cleanup)
|
|
|
|
if not sequence:
|
|
return self._segments
|
|
|
|
for segment in self._segments:
|
|
if segment.sequence == sequence:
|
|
return segment
|
|
return None
|
|
|
|
async def recv(self) -> Segment:
|
|
"""Wait for and retrieve the latest segment."""
|
|
last_segment = max(self.segments, default=0)
|
|
if self._cursor is None or self._cursor <= last_segment:
|
|
await self._event.wait()
|
|
|
|
if not self._segments:
|
|
return None
|
|
|
|
segment = self.get_segment()[-1]
|
|
self._cursor = segment.sequence
|
|
return segment
|
|
|
|
@callback
|
|
def put(self, segment: Segment) -> None:
|
|
"""Store output."""
|
|
# Start idle timeout when we start recieving data
|
|
if self._unsub is None:
|
|
self._unsub = async_call_later(
|
|
self._stream.hass, 300, self._cleanup)
|
|
|
|
if segment is None:
|
|
self._event.set()
|
|
# Cleanup provider
|
|
if self._unsub is not None:
|
|
self._unsub()
|
|
self._cleanup()
|
|
return
|
|
|
|
self._segments.append(segment)
|
|
self._event.set()
|
|
self._event.clear()
|
|
|
|
@callback
|
|
def _cleanup(self, _now=None):
|
|
"""Remove provider."""
|
|
self._segments = []
|
|
self._stream.remove_provider(self)
|
|
|
|
|
|
class StreamView(HomeAssistantView):
|
|
"""
|
|
Base StreamView.
|
|
|
|
For implementation of a new stream format, define `url` and `name`
|
|
attributes, and implement `handle` method in a child class.
|
|
"""
|
|
|
|
requires_auth = False
|
|
platform = None
|
|
|
|
async def get(self, request, token, sequence=None):
|
|
"""Start a GET request."""
|
|
hass = request.app['hass']
|
|
|
|
stream = next((
|
|
s for s in hass.data[DOMAIN][ATTR_STREAMS].values()
|
|
if s.access_token == token), None)
|
|
|
|
if not stream:
|
|
raise web.HTTPNotFound()
|
|
|
|
# Start worker if not already started
|
|
stream.start()
|
|
|
|
return await self.handle(request, stream, sequence)
|
|
|
|
async def handle(self, request, stream, sequence):
|
|
"""Handle the stream request."""
|
|
raise NotImplementedError()
|