Add UniFi Protect views (#74190)

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Christopher Bailey 2022-06-29 19:10:38 -04:00 committed by GitHub
parent 0028dc46e6
commit 1555f40bad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 647 additions and 0 deletions

View file

@ -40,6 +40,7 @@ from .discovery import async_start_discovery
from .migrate import async_migrate_data
from .services import async_cleanup_services, async_setup_services
from .utils import _async_unifi_mac_from_hass, async_get_devices
from .views import ThumbnailProxyView, VideoProxyView
_LOGGER = logging.getLogger(__name__)
@ -92,6 +93,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = data_service
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
async_setup_services(hass)
hass.http.register_view(ThumbnailProxyView(hass))
hass.http.register_view(VideoProxyView(hass))
entry.async_on_unload(entry.add_update_listener(_async_options_updated))
entry.async_on_unload(

View file

@ -0,0 +1,211 @@
"""UniFi Protect Integration views."""
from __future__ import annotations
from datetime import datetime
from http import HTTPStatus
import logging
from typing import Any
from urllib.parse import urlencode
from aiohttp import web
from pyunifiprotect.data import Event
from pyunifiprotect.exceptions import ClientError
from homeassistant.components.http import HomeAssistantView
from homeassistant.core import HomeAssistant, callback
from .const import DOMAIN
from .data import ProtectData
_LOGGER = logging.getLogger(__name__)
@callback
def async_generate_thumbnail_url(
event_id: str,
nvr_id: str,
width: int | None = None,
height: int | None = None,
) -> str:
"""Generate URL for event thumbnail."""
url_format = ThumbnailProxyView.url or "{nvr_id}/{event_id}"
url = url_format.format(nvr_id=nvr_id, event_id=event_id)
params = {}
if width is not None:
params["width"] = str(width)
if height is not None:
params["height"] = str(height)
return f"{url}?{urlencode(params)}"
@callback
def async_generate_event_video_url(event: Event) -> str:
"""Generate URL for event video."""
_validate_event(event)
if event.start is None or event.end is None:
raise ValueError("Event is ongoing")
url_format = VideoProxyView.url or "{nvr_id}/{camera_id}/{start}/{end}"
url = url_format.format(
nvr_id=event.api.bootstrap.nvr.id,
camera_id=event.camera_id,
start=event.start.isoformat(),
end=event.end.isoformat(),
)
return url
@callback
def _client_error(message: Any, code: HTTPStatus) -> web.Response:
_LOGGER.warning("Client error (%s): %s", code.value, message)
if code == HTTPStatus.BAD_REQUEST:
return web.Response(body=message, status=code)
return web.Response(status=code)
@callback
def _400(message: Any) -> web.Response:
return _client_error(message, HTTPStatus.BAD_REQUEST)
@callback
def _403(message: Any) -> web.Response:
return _client_error(message, HTTPStatus.FORBIDDEN)
@callback
def _404(message: Any) -> web.Response:
return _client_error(message, HTTPStatus.NOT_FOUND)
@callback
def _validate_event(event: Event) -> None:
if event.camera is None:
raise ValueError("Event does not have a camera")
if not event.camera.can_read_media(event.api.bootstrap.auth_user):
raise PermissionError(f"User cannot read media from camera: {event.camera.id}")
class ProtectProxyView(HomeAssistantView):
"""Base class to proxy request to UniFi Protect console."""
requires_auth = True
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize a thumbnail proxy view."""
self.hass = hass
self.data = hass.data[DOMAIN]
def _get_data_or_404(self, nvr_id: str) -> ProtectData | web.Response:
all_data: list[ProtectData] = []
for data in self.data.values():
if isinstance(data, ProtectData):
if data.api.bootstrap.nvr.id == nvr_id:
return data
all_data.append(data)
return _404("Invalid NVR ID")
class ThumbnailProxyView(ProtectProxyView):
"""View to proxy event thumbnails from UniFi Protect."""
url = "/api/unifiprotect/thumbnail/{nvr_id}/{event_id}"
name = "api:unifiprotect_thumbnail"
async def get(
self, request: web.Request, nvr_id: str, event_id: str
) -> web.Response:
"""Get Event Thumbnail."""
data = self._get_data_or_404(nvr_id)
if isinstance(data, web.Response):
return data
width: int | str | None = request.query.get("width")
height: int | str | None = request.query.get("height")
if width is not None:
try:
width = int(width)
except ValueError:
return _400("Invalid width param")
if height is not None:
try:
height = int(height)
except ValueError:
return _400("Invalid height param")
try:
thumbnail = await data.api.get_event_thumbnail(
event_id, width=width, height=height
)
except ClientError as err:
return _404(err)
if thumbnail is None:
return _404("Event thumbnail not found")
return web.Response(body=thumbnail, content_type="image/jpeg")
class VideoProxyView(ProtectProxyView):
"""View to proxy video clips from UniFi Protect."""
url = "/api/unifiprotect/video/{nvr_id}/{camera_id}/{start}/{end}"
name = "api:unifiprotect_thumbnail"
async def get(
self, request: web.Request, nvr_id: str, camera_id: str, start: str, end: str
) -> web.StreamResponse:
"""Get Camera Video clip."""
data = self._get_data_or_404(nvr_id)
if isinstance(data, web.Response):
return data
camera = data.api.bootstrap.cameras.get(camera_id)
if camera is None:
return _404(f"Invalid camera ID: {camera_id}")
if not camera.can_read_media(data.api.bootstrap.auth_user):
return _403(f"User cannot read media from camera: {camera.id}")
try:
start_dt = datetime.fromisoformat(start)
except ValueError:
return _400("Invalid start")
try:
end_dt = datetime.fromisoformat(end)
except ValueError:
return _400("Invalid end")
response = web.StreamResponse(
status=200,
reason="OK",
headers={
"Content-Type": "video/mp4",
},
)
async def iterator(total: int, chunk: bytes | None) -> None:
if not response.prepared:
response.content_length = total
await response.prepare(request)
if chunk is not None:
await response.write(chunk)
try:
await camera.get_video(start_dt, end_dt, iterator_callback=iterator)
except ClientError as err:
return _404(err)
if response.prepared:
await response.write_eof()
return response

View file

@ -4,6 +4,7 @@ from __future__ import annotations
from collections.abc import Callable
from datetime import datetime, timedelta
from functools import partial
from ipaddress import IPv4Address
import json
from typing import Any
@ -102,6 +103,11 @@ def mock_ufp_client(bootstrap: Bootstrap):
"""Mock ProtectApiClient for testing."""
client = Mock()
client.bootstrap = bootstrap
client._bootstrap = bootstrap
client.api_path = "/api"
# functionality from API client tests actually need
client._stream_response = partial(ProtectApiClient._stream_response, client)
client.get_camera_video = partial(ProtectApiClient.get_camera_video, client)
nvr = client.bootstrap.nvr
nvr._api = client

View file

@ -0,0 +1,427 @@
"""Test UniFi Protect views."""
from datetime import datetime, timedelta
from typing import Any, cast
from unittest.mock import AsyncMock, Mock
from aiohttp import ClientResponse
import pytest
from pyunifiprotect.data import Camera, Event, EventType
from pyunifiprotect.exceptions import ClientError
from homeassistant.components.unifiprotect.views import (
async_generate_event_video_url,
async_generate_thumbnail_url,
)
from homeassistant.core import HomeAssistant
from .utils import MockUFPFixture, init_entry
from tests.test_util.aiohttp import mock_aiohttp_client
async def test_thumbnail_bad_nvr_id(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
) -> None:
"""Test invalid NVR ID in URL."""
ufp.api.get_event_thumbnail = AsyncMock()
await init_entry(hass, ufp, [camera])
url = async_generate_thumbnail_url("test_id", "bad_id")
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert response.status == 404
ufp.api.get_event_thumbnail.assert_not_called
@pytest.mark.parametrize("width,height", [("test", None), (None, "test")])
async def test_thumbnail_bad_params(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
width: Any,
height: Any,
) -> None:
"""Test invalid bad query parameters."""
ufp.api.get_event_thumbnail = AsyncMock()
await init_entry(hass, ufp, [camera])
url = async_generate_thumbnail_url(
"test_id", ufp.api.bootstrap.nvr.id, width=width, height=height
)
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert response.status == 400
ufp.api.get_event_thumbnail.assert_not_called
async def test_thumbnail_bad_event(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
) -> None:
"""Test invalid with error raised."""
ufp.api.get_event_thumbnail = AsyncMock(side_effect=ClientError())
await init_entry(hass, ufp, [camera])
url = async_generate_thumbnail_url("test_id", ufp.api.bootstrap.nvr.id)
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert response.status == 404
ufp.api.get_event_thumbnail.assert_called_with("test_id", width=None, height=None)
async def test_thumbnail_no_data(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
) -> None:
"""Test invalid no thumbnail returned."""
ufp.api.get_event_thumbnail = AsyncMock(return_value=None)
await init_entry(hass, ufp, [camera])
url = async_generate_thumbnail_url("test_id", ufp.api.bootstrap.nvr.id)
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert response.status == 404
ufp.api.get_event_thumbnail.assert_called_with("test_id", width=None, height=None)
async def test_thumbnail(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
) -> None:
"""Test invalid NVR ID in URL."""
ufp.api.get_event_thumbnail = AsyncMock(return_value=b"testtest")
await init_entry(hass, ufp, [camera])
url = async_generate_thumbnail_url("test_id", ufp.api.bootstrap.nvr.id)
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert response.status == 200
assert response.content_type == "image/jpeg"
assert await response.content.read() == b"testtest"
ufp.api.get_event_thumbnail.assert_called_with("test_id", width=None, height=None)
async def test_video_bad_event(
hass: HomeAssistant,
ufp: MockUFPFixture,
camera: Camera,
fixed_now: datetime,
) -> None:
"""Test generating event with bad camera ID."""
await init_entry(hass, ufp, [camera])
event = Event(
api=ufp.api,
camera_id="test_id",
start=fixed_now - timedelta(seconds=30),
end=fixed_now,
id="test_id",
type=EventType.MOTION,
score=100,
smart_detect_types=[],
smart_detect_event_ids=[],
)
with pytest.raises(ValueError):
async_generate_event_video_url(event)
async def test_video_bad_event_ongoing(
hass: HomeAssistant,
ufp: MockUFPFixture,
camera: Camera,
fixed_now: datetime,
) -> None:
"""Test generating event with bad camera ID."""
await init_entry(hass, ufp, [camera])
event = Event(
api=ufp.api,
camera_id=camera.id,
start=fixed_now - timedelta(seconds=30),
end=None,
id="test_id",
type=EventType.MOTION,
score=100,
smart_detect_types=[],
smart_detect_event_ids=[],
)
with pytest.raises(ValueError):
async_generate_event_video_url(event)
async def test_video_bad_perms(
hass: HomeAssistant,
ufp: MockUFPFixture,
camera: Camera,
fixed_now: datetime,
) -> None:
"""Test generating event with bad user permissions."""
ufp.api.bootstrap.auth_user.all_permissions = []
await init_entry(hass, ufp, [camera])
event = Event(
api=ufp.api,
camera_id=camera.id,
start=fixed_now - timedelta(seconds=30),
end=fixed_now,
id="test_id",
type=EventType.MOTION,
score=100,
smart_detect_types=[],
smart_detect_event_ids=[],
)
with pytest.raises(PermissionError):
async_generate_event_video_url(event)
async def test_video_bad_nvr_id(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
fixed_now: datetime,
) -> None:
"""Test video URL with bad NVR id."""
ufp.api.request = AsyncMock()
await init_entry(hass, ufp, [camera])
event = Event(
api=ufp.api,
camera_id=camera.id,
start=fixed_now - timedelta(seconds=30),
end=fixed_now,
id="test_id",
type=EventType.MOTION,
score=100,
smart_detect_types=[],
smart_detect_event_ids=[],
)
url = async_generate_event_video_url(event)
url = url.replace(ufp.api.bootstrap.nvr.id, "bad_id")
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert response.status == 404
ufp.api.request.assert_not_called
async def test_video_bad_camera_id(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
fixed_now: datetime,
) -> None:
"""Test video URL with bad camera id."""
ufp.api.request = AsyncMock()
await init_entry(hass, ufp, [camera])
event = Event(
api=ufp.api,
camera_id=camera.id,
start=fixed_now - timedelta(seconds=30),
end=fixed_now,
id="test_id",
type=EventType.MOTION,
score=100,
smart_detect_types=[],
smart_detect_event_ids=[],
)
url = async_generate_event_video_url(event)
url = url.replace(camera.id, "bad_id")
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert response.status == 404
ufp.api.request.assert_not_called
async def test_video_bad_camera_perms(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
fixed_now: datetime,
) -> None:
"""Test video URL with bad camera perms."""
ufp.api.request = AsyncMock()
await init_entry(hass, ufp, [camera])
event = Event(
api=ufp.api,
camera_id=camera.id,
start=fixed_now - timedelta(seconds=30),
end=fixed_now,
id="test_id",
type=EventType.MOTION,
score=100,
smart_detect_types=[],
smart_detect_event_ids=[],
)
url = async_generate_event_video_url(event)
ufp.api.bootstrap.auth_user.all_permissions = []
ufp.api.bootstrap.auth_user._perm_cache = {}
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert response.status == 403
ufp.api.request.assert_not_called
@pytest.mark.parametrize("start,end", [("test", None), (None, "test")])
async def test_video_bad_params(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
fixed_now: datetime,
start: Any,
end: Any,
) -> None:
"""Test video URL with bad start/end params."""
ufp.api.request = AsyncMock()
await init_entry(hass, ufp, [camera])
event_start = fixed_now - timedelta(seconds=30)
event = Event(
api=ufp.api,
camera_id=camera.id,
start=event_start,
end=fixed_now,
id="test_id",
type=EventType.MOTION,
score=100,
smart_detect_types=[],
smart_detect_event_ids=[],
)
url = async_generate_event_video_url(event)
from_value = event_start if start is not None else fixed_now
to_value = start if start is not None else end
url = url.replace(from_value.isoformat(), to_value)
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert response.status == 400
ufp.api.request.assert_not_called
async def test_video_bad_video(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
fixed_now: datetime,
) -> None:
"""Test video URL with no video."""
ufp.api.request = AsyncMock(side_effect=ClientError)
await init_entry(hass, ufp, [camera])
event_start = fixed_now - timedelta(seconds=30)
event = Event(
api=ufp.api,
camera_id=camera.id,
start=event_start,
end=fixed_now,
id="test_id",
type=EventType.MOTION,
score=100,
smart_detect_types=[],
smart_detect_event_ids=[],
)
url = async_generate_event_video_url(event)
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert response.status == 404
ufp.api.request.assert_called_once
async def test_video(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
fixed_now: datetime,
) -> None:
"""Test video URL with no video."""
content = Mock()
content.__anext__ = AsyncMock(side_effect=[b"test", b"test", StopAsyncIteration()])
content.__aiter__ = Mock(return_value=content)
mock_response = Mock()
mock_response.content_length = 8
mock_response.content.iter_chunked = Mock(return_value=content)
ufp.api.request = AsyncMock(return_value=mock_response)
await init_entry(hass, ufp, [camera])
event_start = fixed_now - timedelta(seconds=30)
event = Event(
api=ufp.api,
camera_id=camera.id,
start=event_start,
end=fixed_now,
id="test_id",
type=EventType.MOTION,
score=100,
smart_detect_types=[],
smart_detect_event_ids=[],
)
url = async_generate_event_video_url(event)
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert await response.content.read() == b"testtest"
assert response.status == 200
ufp.api.request.assert_called_once