diff --git a/homeassistant/components/unifiprotect/views.py b/homeassistant/components/unifiprotect/views.py index a8a767c8879..e05dcde1751 100644 --- a/homeassistant/components/unifiprotect/views.py +++ b/homeassistant/components/unifiprotect/views.py @@ -8,11 +8,12 @@ from typing import Any from urllib.parse import urlencode from aiohttp import web -from pyunifiprotect.data import Event +from pyunifiprotect.data import Camera, Event from pyunifiprotect.exceptions import ClientError from homeassistant.components.http import HomeAssistantView from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers import device_registry as dr, entity_registry as er from .const import DOMAIN from .data import ProtectData @@ -104,8 +105,10 @@ class ProtectProxyView(HomeAssistantView): def _get_data_or_404(self, nvr_id: str) -> ProtectData | web.Response: all_data: list[ProtectData] = [] - for data in self.data.values(): + for entry_id, data in self.data.items(): if isinstance(data, ProtectData): + if nvr_id == entry_id: + return data if data.api.bootstrap.nvr.id == nvr_id: return data all_data.append(data) @@ -160,6 +163,27 @@ class VideoProxyView(ProtectProxyView): url = "/api/unifiprotect/video/{nvr_id}/{camera_id}/{start}/{end}" name = "api:unifiprotect_thumbnail" + @callback + def _async_get_camera(self, data: ProtectData, camera_id: str) -> Camera | None: + if (camera := data.api.bootstrap.cameras.get(camera_id)) is not None: + return camera + + entity_registry = er.async_get(self.hass) + device_registry = dr.async_get(self.hass) + + if (entity := entity_registry.async_get(camera_id)) is None or ( + device := device_registry.async_get(entity.device_id or "") + ) is None: + return None + + macs = [c[1] for c in device.connections if c[0] == dr.CONNECTION_NETWORK_MAC] + for mac in macs: + if (ufp_device := data.api.bootstrap.get_device_from_mac(mac)) is not None: + if isinstance(ufp_device, Camera): + camera = ufp_device + break + return camera + async def get( self, request: web.Request, nvr_id: str, camera_id: str, start: str, end: str ) -> web.StreamResponse: @@ -169,7 +193,7 @@ class VideoProxyView(ProtectProxyView): if isinstance(data, web.Response): return data - camera = data.api.bootstrap.cameras.get(camera_id) + camera = self._async_get_camera(data, camera_id) if camera is None: return _404(f"Invalid camera ID: {camera_id}") if not camera.can_read_media(data.api.bootstrap.auth_user): diff --git a/tests/components/unifiprotect/test_views.py b/tests/components/unifiprotect/test_views.py index 0768252f6c9..1c774a9b921 100644 --- a/tests/components/unifiprotect/test_views.py +++ b/tests/components/unifiprotect/test_views.py @@ -111,7 +111,7 @@ async def test_thumbnail( ufp: MockUFPFixture, camera: Camera, ) -> None: - """Test invalid NVR ID in URL.""" + """Test NVR ID in URL.""" ufp.api.get_event_thumbnail = AsyncMock(return_value=b"testtest") @@ -127,6 +127,28 @@ async def test_thumbnail( ufp.api.get_event_thumbnail.assert_called_with("test_id", width=None, height=None) +async def test_thumbnail_entry_id( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, +) -> None: + """Test config entry ID in URL.""" + + ufp.api.get_event_thumbnail = AsyncMock(return_value=b"testtest") + + await init_entry(hass, ufp, [camera]) + url = async_generate_thumbnail_url("test_id", ufp.entry.entry_id) + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + + assert response.status == 200 + assert response.content_type == "image/jpeg" + assert await response.content.read() == b"testtest" + ufp.api.get_event_thumbnail.assert_called_with("test_id", width=None, height=None) + + async def test_video_bad_event( hass: HomeAssistant, ufp: MockUFPFixture, @@ -425,3 +447,47 @@ async def test_video( assert response.status == 200 ufp.api.request.assert_called_once + + +async def test_video_entity_id( + hass: HomeAssistant, + hass_client: mock_aiohttp_client, + ufp: MockUFPFixture, + camera: Camera, + fixed_now: datetime, +) -> None: + """Test video URL with no video.""" + + content = Mock() + content.__anext__ = AsyncMock(side_effect=[b"test", b"test", StopAsyncIteration()]) + content.__aiter__ = Mock(return_value=content) + + mock_response = Mock() + mock_response.content_length = 8 + mock_response.content.iter_chunked = Mock(return_value=content) + + ufp.api.request = AsyncMock(return_value=mock_response) + await init_entry(hass, ufp, [camera]) + + event_start = fixed_now - timedelta(seconds=30) + event = Event( + api=ufp.api, + camera_id=camera.id, + start=event_start, + end=fixed_now, + id="test_id", + type=EventType.MOTION, + score=100, + smart_detect_types=[], + smart_detect_event_ids=[], + ) + + url = async_generate_event_video_url(event) + url = url.replace(camera.id, "camera.test_camera_high") + + http_client = await hass_client() + response = cast(ClientResponse, await http_client.get(url)) + assert await response.content.read() == b"testtest" + + assert response.status == 200 + ufp.api.request.assert_called_once