hass-core/tests/components/camera/test_media_source.py
Robert Resch 675ee8e813
Add async webrtc offer support (#127981)
* Add async webrtc offer support

* Create dataclass for messages

* Send session ID over websocket

* Fixes

* Rename

* Implement some review findings

* Add WebRTCError and small renames

* Use dedicated function instead of inspec

* Update go2rtc-client to 0.0.1b1

* Improve checking for sync offer

* Revert change as not needed anymore

* Typo

* Fix tests

* Add missing go2rtc tests

* Move webrtc offer tests to test_webrtc file

* Add ws camera/webrtc/candidate tests

* Add missing tests

* Implement suggestions

* Implement review changes

* rename

* Revert test to use ws endpoints

* Change doc string

* Don't import from submodule

* Get type form class name

* Update homeassistant/components/camera/__init__.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Adopt tests

* Apply suggestions from code review

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Fix tests

---------

Co-authored-by: Bram Kragten <mail@bramkragten.nl>
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
Co-authored-by: Erik <erik@montnemery.com>
2024-10-28 15:46:15 +01:00

148 lines
5.6 KiB
Python

"""Test camera media source."""
from unittest.mock import PropertyMock, patch
import pytest
from homeassistant.components import media_source
from homeassistant.components.camera.const import StreamType
from homeassistant.components.stream import FORMAT_CONTENT_TYPE
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
@pytest.fixture(autouse=True)
async def setup_media_source(hass: HomeAssistant) -> None:
"""Set up media source."""
assert await async_setup_component(hass, "media_source", {})
@pytest.mark.usefixtures("mock_camera_with_device", "mock_camera")
async def test_device_with_device(hass: HomeAssistant) -> None:
"""Test browsing when camera has a device and a name."""
item = await media_source.async_browse_media(hass, "media-source://camera")
assert item.not_shown == 2
assert len(item.children) == 1
assert item.children[0].title == "Test Camera Device Demo camera without stream"
@pytest.mark.usefixtures("mock_camera_with_no_name", "mock_camera")
async def test_device_with_no_name(hass: HomeAssistant) -> None:
"""Test browsing when camera has device and name == None."""
item = await media_source.async_browse_media(hass, "media-source://camera")
assert item.not_shown == 2
assert len(item.children) == 1
assert item.children[0].title == "Test Camera Device Demo camera without stream"
@pytest.mark.usefixtures("mock_camera_hls")
async def test_browsing_hls(hass: HomeAssistant) -> None:
"""Test browsing HLS camera media source."""
item = await media_source.async_browse_media(hass, "media-source://camera")
assert item is not None
assert item.title == "Camera"
assert len(item.children) == 0
assert item.not_shown == 3
# Adding stream enables HLS camera
hass.config.components.add("stream")
item = await media_source.async_browse_media(hass, "media-source://camera")
assert item.not_shown == 0
assert len(item.children) == 3
assert item.children[0].media_content_type == FORMAT_CONTENT_TYPE["hls"]
@pytest.mark.usefixtures("mock_camera")
async def test_browsing_mjpeg(hass: HomeAssistant) -> None:
"""Test browsing MJPEG camera media source."""
item = await media_source.async_browse_media(hass, "media-source://camera")
assert item is not None
assert item.title == "Camera"
assert len(item.children) == 1
assert item.not_shown == 2
assert item.children[0].media_content_type == "image/jpg"
assert item.children[0].title == "Demo camera without stream"
@pytest.mark.usefixtures("mock_camera_webrtc")
async def test_browsing_webrtc(hass: HomeAssistant) -> None:
"""Test browsing WebRTC camera media source."""
# 3 cameras:
# one only supports WebRTC (no stream source)
# one raises when getting the source
# One has a stream source, and should be the only browsable one
with patch(
"homeassistant.components.camera.Camera.stream_source",
side_effect=["test", None, Exception],
):
item = await media_source.async_browse_media(hass, "media-source://camera")
assert item is not None
assert item.title == "Camera"
assert len(item.children) == 0
assert item.not_shown == 3
# Adding stream enables HLS camera
hass.config.components.add("stream")
item = await media_source.async_browse_media(hass, "media-source://camera")
assert item.not_shown == 2
assert len(item.children) == 1
assert item.children[0].media_content_type == FORMAT_CONTENT_TYPE["hls"]
@pytest.mark.usefixtures("mock_camera_hls")
async def test_resolving(hass: HomeAssistant) -> None:
"""Test resolving."""
# Adding stream enables HLS camera
hass.config.components.add("stream")
with patch(
"homeassistant.components.camera.media_source._async_stream_endpoint_url",
return_value="http://example.com/stream",
):
item = await media_source.async_resolve_media(
hass, "media-source://camera/camera.demo_camera", None
)
assert item is not None
assert item.url == "http://example.com/stream"
assert item.mime_type == FORMAT_CONTENT_TYPE["hls"]
@pytest.mark.usefixtures("mock_camera_hls")
async def test_resolving_errors(hass: HomeAssistant) -> None:
"""Test resolving."""
with pytest.raises(media_source.Unresolvable) as exc_info:
await media_source.async_resolve_media(
hass, "media-source://camera/camera.demo_camera", None
)
assert str(exc_info.value) == "Stream integration not loaded"
hass.config.components.add("stream")
with pytest.raises(media_source.Unresolvable) as exc_info:
await media_source.async_resolve_media(
hass, "media-source://camera/camera.non_existing", None
)
assert str(exc_info.value) == "Could not resolve media item: camera.non_existing"
with (
pytest.raises(media_source.Unresolvable) as exc_info,
patch(
"homeassistant.components.camera.Camera.frontend_stream_type",
new_callable=PropertyMock(return_value=StreamType.WEB_RTC),
),
):
await media_source.async_resolve_media(
hass, "media-source://camera/camera.demo_camera", None
)
assert str(exc_info.value) == "Camera does not support MJPEG or HLS streaming."
with pytest.raises(media_source.Unresolvable) as exc_info:
await media_source.async_resolve_media(
hass, "media-source://camera/camera.demo_camera", None
)
assert (
str(exc_info.value) == "camera.demo_camera does not support play stream service"
)