From 94db07ca8c2bb12a753aac0f08c6d035ef2a87cd Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Tue, 10 Nov 2020 14:48:02 -0800 Subject: [PATCH] Update nest sdm camera to refresh in background (#42865) Co-authored-by: Paulus Schoutsen --- homeassistant/components/nest/camera_sdm.py | 50 ++++++-- homeassistant/components/nest/manifest.json | 2 +- requirements_all.txt | 2 +- requirements_test_all.txt | 2 +- tests/components/nest/camera_sdm_test.py | 125 +++++++++++++++++--- 5 files changed, 157 insertions(+), 24 deletions(-) diff --git a/homeassistant/components/nest/camera_sdm.py b/homeassistant/components/nest/camera_sdm.py index c7f5fc97f92..ad4293fde5d 100644 --- a/homeassistant/components/nest/camera_sdm.py +++ b/homeassistant/components/nest/camera_sdm.py @@ -1,16 +1,19 @@ """Support for Google Nest SDM Cameras.""" +import datetime import logging from typing import Optional from google_nest_sdm.camera_traits import CameraImageTrait, CameraLiveStreamTrait from google_nest_sdm.device import Device from haffmpeg.tools import IMAGE_JPEG +import requests from homeassistant.components.camera import SUPPORT_STREAM, Camera from homeassistant.components.ffmpeg import async_get_image from homeassistant.config_entries import ConfigEntry from homeassistant.helpers.dispatcher import async_dispatcher_connect +from homeassistant.helpers.event import async_track_point_in_utc_time from homeassistant.helpers.typing import HomeAssistantType from homeassistant.util.dt import utcnow @@ -19,6 +22,9 @@ from .device_info import DeviceInfo _LOGGER = logging.getLogger(__name__) +# Used to schedule an alarm to refresh the stream before expiration +STREAM_EXPIRATION_BUFFER = datetime.timedelta(seconds=30) + async def async_setup_sdm_entry( hass: HomeAssistantType, entry: ConfigEntry, async_add_entities @@ -49,6 +55,7 @@ class NestCamera(Camera): self._device = device self._device_info = DeviceInfo(device) self._stream = None + self._stream_refresh_unsub = None @property def should_poll(self) -> bool: @@ -93,21 +100,50 @@ class NestCamera(Camera): if CameraLiveStreamTrait.NAME not in self._device.traits: return None trait = self._device.traits[CameraLiveStreamTrait.NAME] - now = utcnow() if not self._stream: - logging.debug("Fetching stream url") + _LOGGER.debug("Fetching stream url") self._stream = await trait.generate_rtsp_stream() - elif self._stream.expires_at < now: - logging.debug("Stream expired, extending stream") - new_stream = await self._stream.extend_rtsp_stream() - self._stream = new_stream + self._schedule_stream_refresh() + if self._stream.expires_at < utcnow(): + _LOGGER.warning("Stream already expired") return self._stream.rtsp_stream_url + def _schedule_stream_refresh(self): + """Schedules an alarm to refresh the stream url before expiration.""" + _LOGGER.debug("New stream url expires at %s", self._stream.expires_at) + refresh_time = self._stream.expires_at - STREAM_EXPIRATION_BUFFER + # Schedule an alarm to extend the stream + if self._stream_refresh_unsub is not None: + self._stream_refresh_unsub() + + self._stream_refresh_unsub = async_track_point_in_utc_time( + self.hass, + self._handle_stream_refresh, + refresh_time, + ) + + async def _handle_stream_refresh(self, now): + """Alarm that fires to check if the stream should be refreshed.""" + if not self._stream: + return + _LOGGER.debug("Extending stream url") + self._stream_refresh_unsub = None + try: + self._stream = await self._stream.extend_rtsp_stream() + except requests.HTTPError as err: + _LOGGER.debug("Failed to extend stream: %s", err) + # Next attempt to catch a url will get a new one + self._stream = None + return + self._schedule_stream_refresh() + async def async_will_remove_from_hass(self): """Invalidates the RTSP token when unloaded.""" if self._stream: - logging.debug("Invalidating stream") + _LOGGER.debug("Invalidating stream") await self._stream.stop_rtsp_stream() + if self._stream_refresh_unsub: + self._stream_refresh_unsub() async def async_added_to_hass(self): """Run when entity is added to register update signal handler.""" diff --git a/homeassistant/components/nest/manifest.json b/homeassistant/components/nest/manifest.json index 0e8c0038914..aa925086283 100644 --- a/homeassistant/components/nest/manifest.json +++ b/homeassistant/components/nest/manifest.json @@ -6,7 +6,7 @@ "documentation": "https://www.home-assistant.io/integrations/nest", "requirements": [ "python-nest==4.1.0", - "google-nest-sdm==0.1.12" + "google-nest-sdm==0.1.13" ], "codeowners": [ "@awarecan", diff --git a/requirements_all.txt b/requirements_all.txt index ea174103aa5..e3fb68c8bfe 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -687,7 +687,7 @@ google-cloud-pubsub==2.1.0 google-cloud-texttospeech==0.4.0 # homeassistant.components.nest -google-nest-sdm==0.1.12 +google-nest-sdm==0.1.13 # homeassistant.components.google_travel_time googlemaps==2.5.1 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 978216d9ca5..3106b623ae6 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -355,7 +355,7 @@ google-api-python-client==1.6.4 google-cloud-pubsub==2.1.0 # homeassistant.components.nest -google-nest-sdm==0.1.12 +google-nest-sdm==0.1.13 # homeassistant.components.gree greeclimate==0.9.5 diff --git a/tests/components/nest/camera_sdm_test.py b/tests/components/nest/camera_sdm_test.py index 0162bf1b2fd..40972d04351 100644 --- a/tests/components/nest/camera_sdm_test.py +++ b/tests/components/nest/camera_sdm_test.py @@ -10,6 +10,7 @@ from typing import List from google_nest_sdm.auth import AbstractAuth from google_nest_sdm.device import Device +from requests import HTTPError from homeassistant.components import camera from homeassistant.components.camera import STATE_IDLE @@ -18,6 +19,7 @@ from homeassistant.util.dt import utcnow from .common import async_setup_sdm_platform from tests.async_mock import patch +from tests.common import async_fire_time_changed PLATFORM = "camera" CAMERA_DEVICE_TYPE = "sdm.devices.types.CAMERA" @@ -42,16 +44,20 @@ DOMAIN = "nest" class FakeResponse: """A fake web response used for returning results of commands.""" - def __init__(self, json): + def __init__(self, json=None, error=None): """Initialize the FakeResponse.""" self._json = json + self._error = error def raise_for_status(self): """Mimics a successful response status.""" + if self._error: + raise self._error pass async def json(self): """Return a dict with the response.""" + assert self._json return self._json @@ -91,6 +97,13 @@ async def async_setup_camera(hass, traits={}, auth=None): return await async_setup_sdm_platform(hass, PLATFORM, devices) +async def fire_alarm(hass, point_in_time): + """Fire an alarm and wait for callbacks to run.""" + with patch("homeassistant.util.dt.utcnow", return_value=point_in_time): + async_fire_time_changed(hass, point_in_time) + await hass.async_block_till_done() + + async def test_no_devices(hass): """Test configuration that returns no devices.""" await async_setup_camera(hass) @@ -169,30 +182,40 @@ async def test_camera_stream(hass, aiohttp_client): async def test_refresh_expired_stream_token(hass, aiohttp_client): """Test a camera stream expiration and refresh.""" now = utcnow() - past = now - datetime.timedelta(seconds=100) - future = now + datetime.timedelta(seconds=100) + stream_1_expiration = now + datetime.timedelta(seconds=90) + stream_2_expiration = now + datetime.timedelta(seconds=180) + stream_3_expiration = now + datetime.timedelta(seconds=360) responses = [ + # Stream URL #1 FakeResponse( { "results": { "streamUrls": { - "rtspUrl": "rtsp://some/url?auth=g.0.streamingToken" + "rtspUrl": "rtsp://some/url?auth=g.1.streamingToken" }, "streamExtensionToken": "g.1.extensionToken", - "streamToken": "g.0.streamingToken", - "expiresAt": past.isoformat(timespec="seconds"), + "streamToken": "g.1.streamingToken", + "expiresAt": stream_1_expiration.isoformat(timespec="seconds"), }, } ), + # Stream URL #2 FakeResponse( { "results": { - "streamUrls": { - "rtspUrl": "rtsp://some/url?auth=g.2.streamingToken" - }, - "streamExtensionToken": "g.3.extensionToken", + "streamExtensionToken": "g.2.extensionToken", "streamToken": "g.2.streamingToken", - "expiresAt": future.isoformat(timespec="seconds"), + "expiresAt": stream_2_expiration.isoformat(timespec="seconds"), + }, + } + ), + # Stream URL #3 + FakeResponse( + { + "results": { + "streamExtensionToken": "g.3.extensionToken", + "streamToken": "g.3.streamingToken", + "expiresAt": stream_3_expiration.isoformat(timespec="seconds"), }, } ), @@ -209,16 +232,32 @@ async def test_refresh_expired_stream_token(hass, aiohttp_client): assert cam.state == STATE_IDLE stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") - assert stream_source == "rtsp://some/url?auth=g.0.streamingToken" + assert stream_source == "rtsp://some/url?auth=g.1.streamingToken" - # On second fetch, notice the stream is expired and fetch again + # Fire alarm before stream_1_expiration. The stream url is not refreshed + next_update = now + datetime.timedelta(seconds=25) + await fire_alarm(hass, next_update) + stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") + assert stream_source == "rtsp://some/url?auth=g.1.streamingToken" + + # Alarm is near stream_1_expiration which causes the stream extension + next_update = now + datetime.timedelta(seconds=65) + await fire_alarm(hass, next_update) stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") assert stream_source == "rtsp://some/url?auth=g.2.streamingToken" - # Stream is not expired; Same url returned + # Next alarm is well before stream_2_expiration, no change + next_update = now + datetime.timedelta(seconds=100) + await fire_alarm(hass, next_update) stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") assert stream_source == "rtsp://some/url?auth=g.2.streamingToken" + # Alarm is near stream_2_expiration, causing it to be extended + next_update = now + datetime.timedelta(seconds=155) + await fire_alarm(hass, next_update) + stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") + assert stream_source == "rtsp://some/url?auth=g.3.streamingToken" + async def test_camera_removed(hass, aiohttp_client): """Test case where entities are removed and stream tokens expired.""" @@ -256,3 +295,61 @@ async def test_camera_removed(hass, aiohttp_client): for config_entry in hass.config_entries.async_entries(DOMAIN): await hass.config_entries.async_remove(config_entry.entry_id) assert len(hass.states.async_all()) == 0 + + +async def test_refresh_expired_stream_failure(hass, aiohttp_client): + """Tests a failure when refreshing the stream.""" + now = utcnow() + stream_1_expiration = now + datetime.timedelta(seconds=90) + stream_2_expiration = now + datetime.timedelta(seconds=180) + responses = [ + FakeResponse( + { + "results": { + "streamUrls": { + "rtspUrl": "rtsp://some/url?auth=g.1.streamingToken" + }, + "streamExtensionToken": "g.1.extensionToken", + "streamToken": "g.1.streamingToken", + "expiresAt": stream_1_expiration.isoformat(timespec="seconds"), + }, + } + ), + # Extending the stream fails + FakeResponse(error=HTTPError(response="Some Error")), + # Next attempt to get a stream fetches a new url + FakeResponse( + { + "results": { + "streamUrls": { + "rtspUrl": "rtsp://some/url?auth=g.2.streamingToken" + }, + "streamExtensionToken": "g.2.extensionToken", + "streamToken": "g.2.streamingToken", + "expiresAt": stream_2_expiration.isoformat(timespec="seconds"), + }, + } + ), + ] + await async_setup_camera( + hass, + DEVICE_TRAITS, + auth=FakeAuth(responses), + ) + + assert len(hass.states.async_all()) == 1 + cam = hass.states.get("camera.my_camera") + assert cam is not None + assert cam.state == STATE_IDLE + + stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") + assert stream_source == "rtsp://some/url?auth=g.1.streamingToken" + + # Fire alarm when stream is nearing expiration, causing it to be extended. + # The stream expires. + next_update = now + datetime.timedelta(seconds=65) + await fire_alarm(hass, next_update) + + # The stream is entirely refreshed + stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") + assert stream_source == "rtsp://some/url?auth=g.2.streamingToken"