Add cast platform for extending Google Cast media_player (#65149)

* Add cast platform for extending Google Cast media_player

* Update tests

* Refactor according to review comments

* Add test for playing using a cast platform

* Apply suggestions from code review

Co-authored-by: jjlawren <jjlawren@users.noreply.github.com>

* Pass cast type instead of a filter function when browsing

* Raise on invalid cast platform

* Test media browsing

Co-authored-by: jjlawren <jjlawren@users.noreply.github.com>
This commit is contained in:
Erik Montnemery 2022-01-31 10:50:05 +01:00 committed by GitHub
parent 6fdaec0847
commit b0c36d7729
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 353 additions and 100 deletions

View file

@ -1,12 +1,21 @@
"""Component to embed Google Cast."""
import logging
from __future__ import annotations
import logging
from typing import Protocol
from pychromecast import Chromecast
import voluptuous as vol
from homeassistant.components.media_player import BrowseMedia
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.integration_platform import (
async_process_integration_platforms,
)
from homeassistant.helpers.typing import ConfigType
from . import home_assistant_cast
@ -49,9 +58,58 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Cast from a config entry."""
await home_assistant_cast.async_setup_ha_cast(hass, entry)
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
hass.data[DOMAIN] = {}
await async_process_integration_platforms(hass, DOMAIN, _register_cast_platform)
return True
class CastProtocol(Protocol):
"""Define the format of cast platforms."""
async def async_get_media_browser_root_object(
self, cast_type: str
) -> list[BrowseMedia]:
"""Create a list of root objects for media browsing."""
async def async_browse_media(
self,
hass: HomeAssistant,
media_content_type: str,
media_content_id: str,
cast_type: str,
) -> BrowseMedia | None:
"""Browse media.
Return a BrowseMedia object or None if the media does not belong to this platform.
"""
async def async_play_media(
self,
hass: HomeAssistant,
cast_entity_id: str,
chromecast: Chromecast,
media_type: str,
media_id: str,
) -> bool:
"""Play media.
Return True if the media is played by the platform, False if not.
"""
async def _register_cast_platform(
hass: HomeAssistant, integration_domain: str, platform: CastProtocol
):
"""Register a cast platform."""
if (
not hasattr(platform, "async_get_media_browser_root_object")
or not hasattr(platform, "async_browse_media")
or not hasattr(platform, "async_play_media")
):
raise HomeAssistantError(f"Invalid cast platform {platform}")
hass.data[DOMAIN][integration_domain] = platform
async def async_remove_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Remove Home Assistant Cast user."""
await home_assistant_cast.async_remove_user(hass, entry)

View file

@ -11,7 +11,6 @@ from urllib.parse import quote
import pychromecast
from pychromecast.controllers.homeassistant import HomeAssistantController
from pychromecast.controllers.multizone import MultizoneManager
from pychromecast.controllers.plex import PlexController
from pychromecast.controllers.receiver import VOLUME_CONTROL_TYPE_FIXED
from pychromecast.quick_play import quick_play
from pychromecast.socket_client import (
@ -20,7 +19,7 @@ from pychromecast.socket_client import (
)
import voluptuous as vol
from homeassistant.components import media_source, plex, zeroconf
from homeassistant.components import media_source, zeroconf
from homeassistant.components.http.auth import async_sign_path
from homeassistant.components.media_player import (
BrowseError,
@ -29,7 +28,6 @@ from homeassistant.components.media_player import (
)
from homeassistant.components.media_player.const import (
ATTR_MEDIA_EXTRA,
MEDIA_CLASS_APP,
MEDIA_CLASS_DIRECTORY,
MEDIA_TYPE_MOVIE,
MEDIA_TYPE_MUSIC,
@ -47,8 +45,6 @@ from homeassistant.components.media_player.const import (
SUPPORT_VOLUME_MUTE,
SUPPORT_VOLUME_SET,
)
from homeassistant.components.plex.const import PLEX_URI_SCHEME
from homeassistant.components.plex.services import lookup_plex_media
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CAST_APP_ID_HOMEASSISTANT_LOVELACE,
@ -463,21 +459,15 @@ class CastDevice(MediaPlayerEntity):
async def _async_root_payload(self, content_filter):
"""Generate root node."""
children = []
# Add external sources
if "plex" in self.hass.config.components:
children.append(
BrowseMedia(
title="Plex",
media_class=MEDIA_CLASS_APP,
media_content_id="",
media_content_type="plex",
thumbnail="https://brands.home-assistant.io/_/plex/logo.png",
can_play=False,
can_expand=True,
# Add media browsers
for platform in self.hass.data[CAST_DOMAIN].values():
children.extend(
await platform.async_get_media_browser_root_object(
self._chromecast.cast_type
)
)
# Add local media source
# Add media sources
try:
result = await media_source.async_browse_media(
self.hass, None, content_filter=content_filter
@ -519,14 +509,15 @@ class CastDevice(MediaPlayerEntity):
if media_content_id is None:
return await self._async_root_payload(content_filter)
if plex.is_plex_media_id(media_content_id):
return await plex.async_browse_media(
self.hass, media_content_type, media_content_id, platform=CAST_DOMAIN
)
if media_content_type == "plex":
return await plex.async_browse_media(
self.hass, None, None, platform=CAST_DOMAIN
for platform in self.hass.data[CAST_DOMAIN].values():
browse_media = await platform.async_browse_media(
self.hass,
media_content_type,
media_content_id,
self._chromecast.cast_type,
)
if browse_media:
return browse_media
return await media_source.async_browse_media(
self.hass, media_content_id, content_filter=content_filter
@ -556,7 +547,7 @@ class CastDevice(MediaPlayerEntity):
extra = kwargs.get(ATTR_MEDIA_EXTRA, {})
metadata = extra.get("metadata")
# We do not want this to be forwarded to a group
# Handle media supported by a known cast app
if media_type == CAST_DOMAIN:
try:
app_data = json.loads(media_id)
@ -588,23 +579,21 @@ class CastDevice(MediaPlayerEntity):
)
except NotImplementedError:
_LOGGER.error("App %s not supported", app_name)
return
# Handle plex
elif media_id and media_id.startswith(PLEX_URI_SCHEME):
media_id = media_id[len(PLEX_URI_SCHEME) :]
media = await self.hass.async_add_executor_job(
lookup_plex_media, self.hass, media_type, media_id
# Try the cast platforms
for platform in self.hass.data[CAST_DOMAIN].values():
result = await platform.async_play_media(
self.hass, self.entity_id, self._chromecast, media_type, media_id
)
if media is None:
if result:
return
controller = PlexController()
self._chromecast.register_handler(controller)
await self.hass.async_add_executor_job(controller.play_media, media)
else:
app_data = {"media_id": media_id, "media_type": media_type, **extra}
await self.hass.async_add_executor_job(
quick_play, self._chromecast, "default_media_receiver", app_data
)
# Default to play with the default media receiver
app_data = {"media_id": media_id, "media_type": media_type, **extra}
await self.hass.async_add_executor_job(
quick_play, self._chromecast, "default_media_receiver", app_data
)
def _media_status(self):
"""

View file

@ -0,0 +1,75 @@
"""Google Cast support for the Plex component."""
from __future__ import annotations
from pychromecast import Chromecast
from pychromecast.controllers.plex import PlexController
from homeassistant.components.cast.const import DOMAIN as CAST_DOMAIN
from homeassistant.components.media_player import BrowseMedia
from homeassistant.components.media_player.const import MEDIA_CLASS_APP
from homeassistant.core import HomeAssistant
from . import async_browse_media as async_browse_plex_media, is_plex_media_id
from .const import PLEX_URI_SCHEME
from .services import lookup_plex_media
async def async_get_media_browser_root_object(cast_type: str) -> list[BrowseMedia]:
"""Create a root object for media browsing."""
return [
BrowseMedia(
title="Plex",
media_class=MEDIA_CLASS_APP,
media_content_id="",
media_content_type="plex",
thumbnail="https://brands.home-assistant.io/_/plex/logo.png",
can_play=False,
can_expand=True,
)
]
async def async_browse_media(
hass: HomeAssistant,
media_content_type: str,
media_content_id: str,
cast_type: str,
) -> BrowseMedia | None:
"""Browse media."""
if is_plex_media_id(media_content_id):
return await async_browse_plex_media(
hass, media_content_type, media_content_id, platform=CAST_DOMAIN
)
if media_content_type == "plex":
return await async_browse_plex_media(hass, None, None, platform=CAST_DOMAIN)
return None
def _play_media(
hass: HomeAssistant, chromecast: Chromecast, media_type: str, media_id: str
) -> None:
"""Play media."""
media_id = media_id[len(PLEX_URI_SCHEME) :]
media = lookup_plex_media(hass, media_type, media_id)
if media is None:
return
controller = PlexController()
chromecast.register_handler(controller)
controller.play_media(media)
async def async_play_media(
hass: HomeAssistant,
cast_entity_id: str,
chromecast: Chromecast,
media_type: str,
media_id: str,
) -> bool:
"""Play media."""
if media_id and media_id.startswith(PLEX_URI_SCHEME):
await hass.async_add_executor_job(
_play_media, hass, chromecast, media_type, media_id
)
return True
return False

View file

@ -26,12 +26,6 @@ def mz_mock():
return MagicMock(spec_set=pychromecast.controllers.multizone.MultizoneManager)
@pytest.fixture()
def plex_mock():
"""Mock pychromecast PlexController."""
return MagicMock(spec_set=pychromecast.controllers.plex.PlexController)
@pytest.fixture()
def quick_play_mock():
"""Mock pychromecast quick_play."""
@ -51,7 +45,6 @@ def cast_mock(
castbrowser_mock,
get_chromecast_mock,
get_multizone_status_mock,
plex_mock,
):
"""Mock pychromecast."""
ignore_cec_orig = list(pychromecast.IGNORE_CEC)
@ -65,9 +58,6 @@ def cast_mock(
), patch(
"homeassistant.components.cast.media_player.MultizoneManager",
return_value=mz_mock,
), patch(
"homeassistant.components.cast.media_player.PlexController",
return_value=plex_mock,
), patch(
"homeassistant.components.cast.media_player.zeroconf.async_get_instance",
AsyncMock(),

View file

@ -3,7 +3,7 @@
from __future__ import annotations
import json
from unittest.mock import ANY, MagicMock, patch
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, patch
from uuid import UUID
import attr
@ -14,7 +14,10 @@ import pytest
from homeassistant.components import media_player, tts
from homeassistant.components.cast import media_player as cast
from homeassistant.components.cast.media_player import ChromecastInfo
from homeassistant.components.media_player import BrowseMedia
from homeassistant.components.media_player.const import (
MEDIA_CLASS_APP,
MEDIA_CLASS_PLAYLIST,
SUPPORT_NEXT_TRACK,
SUPPORT_PAUSE,
SUPPORT_PLAY,
@ -38,7 +41,7 @@ from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry, assert_setup_component
from tests.common import MockConfigEntry, assert_setup_component, mock_platform
from tests.components.media_player import common
# pylint: disable=invalid-name
@ -844,54 +847,6 @@ async def test_entity_play_media_cast(hass: HomeAssistant, quick_play_mock):
)
async def test_entity_play_media_plex(hass: HomeAssistant, plex_mock):
"""Test playing media."""
entity_id = "media_player.speaker"
info = get_fake_chromecast_info()
chromecast, _ = await async_setup_media_player_cast(hass, info)
_, conn_status_cb, _ = get_status_callbacks(chromecast)
connection_status = MagicMock()
connection_status.status = "CONNECTED"
conn_status_cb(connection_status)
await hass.async_block_till_done()
with patch(
"homeassistant.components.cast.media_player.lookup_plex_media",
return_value=None,
):
await hass.services.async_call(
media_player.DOMAIN,
media_player.SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: entity_id,
media_player.ATTR_MEDIA_CONTENT_TYPE: "music",
media_player.ATTR_MEDIA_CONTENT_ID: 'plex://{"library_name": "Music", "artist.title": "Not an Artist"}',
},
blocking=True,
)
assert not plex_mock.play_media.called
mock_plex_media = MagicMock()
with patch(
"homeassistant.components.cast.media_player.lookup_plex_media",
return_value=mock_plex_media,
):
await hass.services.async_call(
media_player.DOMAIN,
media_player.SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: entity_id,
media_player.ATTR_MEDIA_CONTENT_TYPE: "music",
media_player.ATTR_MEDIA_CONTENT_ID: 'plex://{"library_name": "Music", "artist.title": "Artist"}',
},
blocking=True,
)
plex_mock.play_media.assert_called_once_with(mock_plex_media)
async def test_entity_play_media_cast_invalid(hass, caplog, quick_play_mock):
"""Test playing media."""
entity_id = "media_player.speaker"
@ -1578,3 +1533,189 @@ async def test_entry_setup_list_config(hass: HomeAssistant):
assert set(config_entry.data["uuid"]) == {"bla", "blu"}
assert set(config_entry.data["ignore_cec"]) == {"cast1", "cast2", "cast3"}
assert set(pychromecast.IGNORE_CEC) == {"cast1", "cast2", "cast3"}
async def test_invalid_cast_platform(hass: HomeAssistant, caplog):
"""Test we can play media through a cast platform."""
cast_platform_mock = Mock()
del cast_platform_mock.async_get_media_browser_root_object
del cast_platform_mock.async_browse_media
del cast_platform_mock.async_play_media
mock_platform(hass, "test.cast", cast_platform_mock)
await async_setup_component(hass, "test", {"test": {}})
await hass.async_block_till_done()
info = get_fake_chromecast_info()
await async_setup_media_player_cast(hass, info)
assert "Invalid cast platform <Mock id" in caplog.text
async def test_cast_platform_play_media(hass: HomeAssistant, quick_play_mock, caplog):
"""Test we can play media through a cast platform."""
entity_id = "media_player.speaker"
_can_play = True
def can_play(*args):
return _can_play
cast_platform_mock = Mock(
async_get_media_browser_root_object=AsyncMock(return_value=[]),
async_browse_media=AsyncMock(return_value=None),
async_play_media=AsyncMock(side_effect=can_play),
)
mock_platform(hass, "test.cast", cast_platform_mock)
await async_setup_component(hass, "test", {"test": {}})
await hass.async_block_till_done()
info = get_fake_chromecast_info()
chromecast, _ = await async_setup_media_player_cast(hass, info)
assert "Invalid cast platform <Mock id" not in caplog.text
_, conn_status_cb, _ = get_status_callbacks(chromecast)
connection_status = MagicMock()
connection_status.status = "CONNECTED"
conn_status_cb(connection_status)
await hass.async_block_till_done()
# This will play using the cast platform
await hass.services.async_call(
media_player.DOMAIN,
media_player.SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: entity_id,
media_player.ATTR_MEDIA_CONTENT_TYPE: "audio",
media_player.ATTR_MEDIA_CONTENT_ID: "best.mp3",
media_player.ATTR_MEDIA_EXTRA: {"metadata": {"metadatatype": 3}},
},
blocking=True,
)
# Assert the media player attempt to play media through the cast platform
cast_platform_mock.async_play_media.assert_called_once_with(
hass, entity_id, chromecast, "audio", "best.mp3"
)
# Assert pychromecast is not used to play media
chromecast.media_controller.play_media.assert_not_called()
quick_play_mock.assert_not_called()
# This will not play using the cast platform
_can_play = False
cast_platform_mock.async_play_media.reset_mock()
await hass.services.async_call(
media_player.DOMAIN,
media_player.SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: entity_id,
media_player.ATTR_MEDIA_CONTENT_TYPE: "audio",
media_player.ATTR_MEDIA_CONTENT_ID: "best.mp3",
media_player.ATTR_MEDIA_EXTRA: {"metadata": {"metadatatype": 3}},
},
blocking=True,
)
# Assert the media player attempt to play media through the cast platform
cast_platform_mock.async_play_media.assert_called_once_with(
hass, entity_id, chromecast, "audio", "best.mp3"
)
# Assert pychromecast is used to play media
chromecast.media_controller.play_media.assert_not_called()
quick_play_mock.assert_called()
async def test_cast_platform_browse_media(hass: HomeAssistant, hass_ws_client):
"""Test we can play media through a cast platform."""
cast_platform_mock = Mock(
async_get_media_browser_root_object=AsyncMock(
return_value=[
BrowseMedia(
title="Spotify",
media_class=MEDIA_CLASS_APP,
media_content_id="",
media_content_type="spotify",
thumbnail="https://brands.home-assistant.io/_/spotify/logo.png",
can_play=False,
can_expand=True,
)
]
),
async_browse_media=AsyncMock(
return_value=BrowseMedia(
title="Spotify Favourites",
media_class=MEDIA_CLASS_PLAYLIST,
media_content_id="",
media_content_type="spotify",
can_play=True,
can_expand=False,
)
),
async_play_media=AsyncMock(return_value=False),
)
mock_platform(hass, "test.cast", cast_platform_mock)
await async_setup_component(hass, "test", {"test": {}})
await async_setup_component(hass, "media_source", {"media_source": {}})
await hass.async_block_till_done()
info = get_fake_chromecast_info()
chromecast, _ = await async_setup_media_player_cast(hass, info)
_, conn_status_cb, _ = get_status_callbacks(chromecast)
connection_status = MagicMock()
connection_status.status = "CONNECTED"
conn_status_cb(connection_status)
await hass.async_block_till_done()
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.speaker",
}
)
response = await client.receive_json()
assert response["success"]
expected_child = {
"title": "Spotify",
"media_class": "app",
"media_content_type": "spotify",
"media_content_id": "",
"can_play": False,
"can_expand": True,
"children_media_class": None,
"thumbnail": "https://brands.home-assistant.io/_/spotify/logo.png",
}
assert expected_child in response["result"]["children"]
client = await hass_ws_client()
await client.send_json(
{
"id": 2,
"type": "media_player/browse_media",
"entity_id": "media_player.speaker",
"media_content_id": "",
"media_content_type": "spotify",
}
)
response = await client.receive_json()
assert response["success"]
expected_response = {
"title": "Spotify Favourites",
"media_class": "playlist",
"media_content_type": "spotify",
"media_content_id": "",
"can_play": True,
"can_expand": False,
"children_media_class": None,
"thumbnail": None,
"children": [],
}
assert response["result"] == expected_response