Persist nest media events to disk backed storage (#61641)

* Persist nest media events to disk backed storage

Persist nest events in the media player to disk, targeting about ~500mb
per camera device as a cap. Events are stored in config/nest/event_media/.

Add a NestEventMediaStore is used for persistence. It has three main jobs:
- Read/write the key/value data that holds event data (event type, time, device, etc)
- Read/write media contents to disk
- Pick the filename for the media event based on device and event deatils

The nest event media manager library handles cache management and eviction, and by
default uses an in memory cache. Home Assistant nest integration now provides the
disk backed implementation, which is invoked by the nest library.

The store reads the event metadata key/value dict on startup, and then writes it
back with a short delay of 5 seconds to avoid unnecessary writes.

Future work planned includes:
- Possibly a small memory buffer for media objects themselves. This could make sense
  when adding thumbnails to the media player grid to avoid unnecessary fetches
- Transcoding mp4 clips to animated image previews

* Address style errors

* Cleanup from CI test/pylint/etc.

* Put media for each device into its own directory

* Update comments for media store

* Decrease # of events to lower disk requirements

Target more like 1k events, to reduce disk needs.

* Address PR feedback

* Update homeassistant/components/nest/media_source.py

Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>

* Ignore incorrect mypy in nest library

* Fix pylint errors

Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
This commit is contained in:
Allen Porter 2021-12-18 23:53:40 -08:00 committed by GitHub
parent e834382b9a
commit a63fa53275
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 540 additions and 24 deletions

View file

@ -6,6 +6,10 @@ as media in the media source.
import datetime
from http import HTTPStatus
import shutil
from typing import Generator
from unittest.mock import Mock, patch
import uuid
import aiohttp
from google_nest_sdm.device import Device
@ -19,9 +23,15 @@ from homeassistant.components.media_source.error import Unresolvable
from homeassistant.config_entries import ConfigEntryState
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.template import DATE_STR_FORMAT
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from .common import async_setup_sdm_platform
from .common import (
CONFIG,
FakeSubscriber,
async_setup_sdm_platform,
create_config_entry,
)
DOMAIN = "nest"
DEVICE_ID = "example/api/device/id"
@ -49,6 +59,7 @@ BATTERY_CAMERA_TRAITS = {
"sdm.devices.traits.CameraPerson": {},
"sdm.devices.traits.CameraMotion": {},
}
PERSON_EVENT = "sdm.devices.events.CameraPerson.Person"
MOTION_EVENT = "sdm.devices.events.CameraMotion.Motion"
@ -63,6 +74,17 @@ IMAGE_BYTES_FROM_EVENT = b"test url image bytes"
IMAGE_AUTHORIZATION_HEADERS = {"Authorization": "Basic g.0.eventToken"}
@pytest.fixture(autouse=True)
def cleanup_media_storage(hass):
"""Test cleanup, remove any media storage persisted during the test."""
tmp_path = str(uuid.uuid4())
m = Mock(spec=float)
m.return_value = tmp_path
with patch("homeassistant.components.nest.media_source.MEDIA_PATH", new_callable=m):
yield
shutil.rmtree(hass.config.path(tmp_path), ignore_errors=True)
async def async_setup_devices(hass, auth, device_type, traits={}, events=[]):
"""Set up the platform and prerequisites."""
devices = {
@ -115,6 +137,22 @@ def create_event_message(event_data, timestamp, device_id=None):
)
def create_battery_event_data(
event_type, event_session_id=EVENT_SESSION_ID, event_id="n:2"
):
"""Return event payload data for a battery camera event."""
return {
event_type: {
"eventSessionId": event_session_id,
"eventId": event_id,
},
"sdm.devices.events.CameraClipPreview.ClipPreview": {
"eventSessionId": event_session_id,
"previewUrl": "https://127.0.0.1/example",
},
}
async def test_no_eligible_devices(hass, auth):
"""Test a media source with no eligible camera devices."""
await async_setup_devices(
@ -335,7 +373,7 @@ async def test_event_order(hass, auth):
event_timestamp_string = event_timestamp2.strftime(DATE_STR_FORMAT)
assert browse.children[0].title == f"Motion @ {event_timestamp_string}"
assert not browse.children[0].can_expand
assert not browse.can_play
assert not browse.children[0].can_play
# Person event is next
assert browse.children[1].domain == DOMAIN
@ -344,7 +382,7 @@ async def test_event_order(hass, auth):
event_timestamp_string = event_timestamp1.strftime(DATE_STR_FORMAT)
assert browse.children[1].title == f"Person @ {event_timestamp_string}"
assert not browse.children[1].can_expand
assert not browse.can_play
assert not browse.children[1].can_play
async def test_browse_invalid_device_id(hass, auth):
@ -436,16 +474,6 @@ async def test_resolve_invalid_event_id(hass, auth):
async def test_camera_event_clip_preview(hass, auth, hass_client):
"""Test an event for a battery camera video clip."""
event_timestamp = dt_util.now()
event_data = {
"sdm.devices.events.CameraMotion.Motion": {
"eventSessionId": EVENT_SESSION_ID,
"eventId": "n:2",
},
"sdm.devices.events.CameraClipPreview.ClipPreview": {
"eventSessionId": EVENT_SESSION_ID,
"previewUrl": "https://127.0.0.1/example",
},
}
await async_setup_devices(
hass,
auth,
@ -453,7 +481,7 @@ async def test_camera_event_clip_preview(hass, auth, hass_client):
BATTERY_CAMERA_TRAITS,
events=[
create_event_message(
event_data,
create_battery_event_data(MOTION_EVENT),
timestamp=event_timestamp,
),
],
@ -692,3 +720,314 @@ async def test_multiple_devices(hass, auth, hass_client):
hass, f"{const.URI_SCHEME}{DOMAIN}/{device2.id}"
)
assert len(browse.children) == 3
@pytest.fixture
def event_store() -> Generator[None, None, None]:
"""Persist changes to event store immediately."""
m = Mock(spec=float)
m.return_value = 0
with patch(
"homeassistant.components.nest.media_source.STORAGE_SAVE_DELAY_SECONDS",
new_callable=m,
):
yield
async def test_media_store_persistence(hass, auth, hass_client, event_store):
"""Test the disk backed media store persistence."""
nest_device = Device.MakeDevice(
{
"name": DEVICE_ID,
"type": CAMERA_DEVICE_TYPE,
"traits": BATTERY_CAMERA_TRAITS,
},
auth=auth,
)
subscriber = FakeSubscriber()
device_manager = await subscriber.async_get_device_manager()
device_manager.add_device(nest_device)
# Fetch media for events when published
subscriber.cache_policy.fetch = True
config_entry = create_config_entry(hass)
with patch(
"homeassistant.helpers.config_entry_oauth2_flow.async_get_config_entry_implementation"
), patch("homeassistant.components.nest.PLATFORMS", [PLATFORM]), patch(
"homeassistant.components.nest.api.GoogleNestSubscriber",
return_value=subscriber,
):
assert await async_setup_component(hass, DOMAIN, CONFIG)
await hass.async_block_till_done()
device_registry = dr.async_get(hass)
device = device_registry.async_get_device({(DOMAIN, DEVICE_ID)})
assert device
assert device.name == DEVICE_NAME
auth.responses = [
aiohttp.web.Response(body=IMAGE_BYTES_FROM_EVENT),
]
event_timestamp = dt_util.now()
await subscriber.async_receive_event(
create_event_message(
create_battery_event_data(MOTION_EVENT), timestamp=event_timestamp
)
)
# Browse to event
browse = await media_source.async_browse_media(
hass, f"{const.URI_SCHEME}{DOMAIN}/{device.id}"
)
assert len(browse.children) == 1
assert browse.children[0].domain == DOMAIN
assert browse.children[0].identifier == f"{device.id}/{EVENT_SESSION_ID}"
event_timestamp_string = event_timestamp.strftime(DATE_STR_FORMAT)
assert browse.children[0].title == f"Motion @ {event_timestamp_string}"
assert not browse.children[0].can_expand
assert browse.children[0].can_play
media = await media_source.async_resolve_media(
hass, f"{const.URI_SCHEME}{DOMAIN}/{device.id}/{EVENT_SESSION_ID}"
)
assert media.url == f"/api/nest/event_media/{device.id}/{EVENT_SESSION_ID}"
assert media.mime_type == "video/mp4"
# Fetch event media
client = await hass_client()
response = await client.get(media.url)
assert response.status == HTTPStatus.OK, "Response not matched: %s" % response
contents = await response.read()
assert contents == IMAGE_BYTES_FROM_EVENT
# Ensure event media store persists to disk
await hass.async_block_till_done()
# Unload the integration.
assert config_entry.state == ConfigEntryState.LOADED
assert await hass.config_entries.async_unload(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.state == ConfigEntryState.NOT_LOADED
# Now rebuild the entire integration and verify that all persisted storage
# can be re-loaded from disk.
subscriber = FakeSubscriber()
device_manager = await subscriber.async_get_device_manager()
device_manager.add_device(nest_device)
# Fetch media for events when published
subscriber.cache_policy.fetch = True
with patch(
"homeassistant.helpers.config_entry_oauth2_flow.async_get_config_entry_implementation"
), patch("homeassistant.components.nest.PLATFORMS", [PLATFORM]), patch(
"homeassistant.components.nest.api.GoogleNestSubscriber",
return_value=subscriber,
):
await hass.config_entries.async_reload(config_entry.entry_id)
await hass.async_block_till_done()
device_registry = dr.async_get(hass)
device = device_registry.async_get_device({(DOMAIN, DEVICE_ID)})
assert device
assert device.name == DEVICE_NAME
# Verify event metadata exists
browse = await media_source.async_browse_media(
hass, f"{const.URI_SCHEME}{DOMAIN}/{device.id}"
)
assert len(browse.children) == 1
assert browse.children[0].domain == DOMAIN
assert browse.children[0].identifier == f"{device.id}/{EVENT_SESSION_ID}"
event_timestamp_string = event_timestamp.strftime(DATE_STR_FORMAT)
assert browse.children[0].title == f"Motion @ {event_timestamp_string}"
assert not browse.children[0].can_expand
assert browse.children[0].can_play
media = await media_source.async_resolve_media(
hass, f"{const.URI_SCHEME}{DOMAIN}/{device.id}/{EVENT_SESSION_ID}"
)
assert media.url == f"/api/nest/event_media/{device.id}/{EVENT_SESSION_ID}"
assert media.mime_type == "video/mp4"
# Verify media exists
response = await client.get(media.url)
assert response.status == HTTPStatus.OK, "Response not matched: %s" % response
contents = await response.read()
assert contents == IMAGE_BYTES_FROM_EVENT
async def test_media_store_filesystem_error(hass, auth, hass_client):
"""Test a filesystem error read/writing event media."""
event_timestamp = dt_util.now()
await async_setup_devices(
hass,
auth,
CAMERA_DEVICE_TYPE,
BATTERY_CAMERA_TRAITS,
events=[
create_event_message(
create_battery_event_data(MOTION_EVENT),
timestamp=event_timestamp,
),
],
)
assert len(hass.states.async_all()) == 1
camera = hass.states.get("camera.front")
assert camera is not None
device_registry = dr.async_get(hass)
device = device_registry.async_get_device({(DOMAIN, DEVICE_ID)})
assert device
assert device.name == DEVICE_NAME
auth.responses = [
aiohttp.web.Response(body=IMAGE_BYTES_FROM_EVENT),
]
# The client fetches the media from the server, but has a failure when
# persisting the media to disk.
client = await hass_client()
with patch("homeassistant.components.nest.media_source.open", side_effect=OSError):
response = await client.get(
f"/api/nest/event_media/{device.id}/{EVENT_SESSION_ID}"
)
assert response.status == HTTPStatus.OK, "Response not matched: %s" % response
contents = await response.read()
assert contents == IMAGE_BYTES_FROM_EVENT
await hass.async_block_till_done()
# Fetch the media again, and since the object does not exist in the cache it
# needs to be fetched again. The server returns an error to prove that it was
# not a cache read. A second attempt succeeds.
auth.responses = [
aiohttp.web.Response(status=HTTPStatus.INTERNAL_SERVER_ERROR),
aiohttp.web.Response(body=IMAGE_BYTES_FROM_EVENT),
]
# First attempt, server fails when fetching
response = await client.get(f"/api/nest/event_media/{device.id}/{EVENT_SESSION_ID}")
assert response.status == HTTPStatus.INTERNAL_SERVER_ERROR, (
"Response not matched: %s" % response
)
# Second attempt, server responds success
response = await client.get(f"/api/nest/event_media/{device.id}/{EVENT_SESSION_ID}")
assert response.status == HTTPStatus.OK, "Response not matched: %s" % response
contents = await response.read()
assert contents == IMAGE_BYTES_FROM_EVENT
# Third attempt reads from the disk cache with no server fetch
response = await client.get(f"/api/nest/event_media/{device.id}/{EVENT_SESSION_ID}")
assert response.status == HTTPStatus.OK, "Response not matched: %s" % response
contents = await response.read()
assert contents == IMAGE_BYTES_FROM_EVENT
auth.responses = [
aiohttp.web.Response(body=IMAGE_BYTES_FROM_EVENT),
]
# Exercise a failure reading from the disk cache. Re-populate from server and write to disk ok
with patch("homeassistant.components.nest.media_source.open", side_effect=OSError):
response = await client.get(
f"/api/nest/event_media/{device.id}/{EVENT_SESSION_ID}"
)
assert response.status == HTTPStatus.OK, "Response not matched: %s" % response
contents = await response.read()
assert contents == IMAGE_BYTES_FROM_EVENT
await hass.async_block_till_done()
response = await client.get(f"/api/nest/event_media/{device.id}/{EVENT_SESSION_ID}")
assert response.status == HTTPStatus.OK, "Response not matched: %s" % response
contents = await response.read()
assert contents == IMAGE_BYTES_FROM_EVENT
async def test_camera_event_media_eviction(hass, auth, hass_client):
"""Test media files getting evicted from the cache."""
# Set small cache size for testing eviction
m = Mock(spec=float)
m.return_value = 5
with patch("homeassistant.components.nest.EVENT_MEDIA_CACHE_SIZE", new_callable=m):
subscriber = await async_setup_devices(
hass,
auth,
CAMERA_DEVICE_TYPE,
BATTERY_CAMERA_TRAITS,
)
# Media fetched as soon as it is published
subscriber.cache_policy.fetch = True
device_registry = dr.async_get(hass)
device = device_registry.async_get_device({(DOMAIN, DEVICE_ID)})
assert device
assert device.name == DEVICE_NAME
# Browse to the device
browse = await media_source.async_browse_media(
hass, f"{const.URI_SCHEME}{DOMAIN}/{device.id}"
)
assert browse.domain == DOMAIN
assert browse.identifier == device.id
assert browse.title == "Front: Recent Events"
assert browse.can_expand
# No events published yet
assert len(browse.children) == 0
event_timestamp = dt_util.now()
for i in range(0, 7):
auth.responses = [aiohttp.web.Response(body=f"image-bytes-{i}".encode())]
ts = event_timestamp + datetime.timedelta(seconds=i)
await subscriber.async_receive_event(
create_event_message(
create_battery_event_data(
MOTION_EVENT, event_session_id=f"event-session-{i}"
),
timestamp=ts,
)
)
await hass.async_block_till_done()
# Cache is limited to 5 events removing media as the cache is filled
browse = await media_source.async_browse_media(
hass, f"{const.URI_SCHEME}{DOMAIN}/{device.id}"
)
assert len(browse.children) == 5
auth.responses = [
aiohttp.web.Response(body=b"image-bytes-7"),
]
ts = event_timestamp + datetime.timedelta(seconds=8)
# Simulate a failure case removing the media on cache eviction
with patch(
"homeassistant.components.nest.media_source.os.remove", side_effect=OSError
) as mock_remove:
await subscriber.async_receive_event(
create_event_message(
create_battery_event_data(
MOTION_EVENT, event_session_id="event-session-7"
),
timestamp=ts,
)
)
await hass.async_block_till_done()
assert mock_remove.called
browse = await media_source.async_browse_media(
hass, f"{const.URI_SCHEME}{DOMAIN}/{device.id}"
)
assert len(browse.children) == 5
# Verify all other content is still persisted correctly
client = await hass_client()
for i in range(3, 8):
response = await client.get(
f"/api/nest/event_media/{device.id}/event-session-{i}"
)
assert response.status == HTTPStatus.OK, "Response not matched: %s" % response
contents = await response.read()
assert contents == f"image-bytes-{i}".encode()
await hass.async_block_till_done()