Add browse media and play media support in Bravia TV (#85288)

* Add media browsing and play media support in Bravia TV

* Add fix invalid Bravia Content-Type header for icons

* Avoid duplicates in source_list

* Small cleanup

* Edit comment

* Revert en.json
This commit is contained in:
Artem Draft 2023-01-24 16:31:09 +03:00 committed by GitHub
parent e717f56113
commit 02e973026d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 224 additions and 217 deletions

View file

@ -11,7 +11,7 @@ from homeassistant.const import CONF_HOST, CONF_MAC, Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_create_clientsession
from .const import CONF_IGNORED_SOURCES, DOMAIN
from .const import DOMAIN
from .coordinator import BraviaTVCoordinator
PLATFORMS: Final[list[Platform]] = [
@ -25,7 +25,6 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b
"""Set up a config entry."""
host = config_entry.data[CONF_HOST]
mac = config_entry.data[CONF_MAC]
ignored_sources = config_entry.options.get(CONF_IGNORED_SOURCES, [])
session = async_create_clientsession(
hass, cookie_jar=CookieJar(unsafe=True, quote_cookie=False)
@ -35,7 +34,6 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b
hass=hass,
client=client,
config=config_entry.data,
ignored_sources=ignored_sources,
)
config_entry.async_on_unload(config_entry.add_update_listener(update_listener))

View file

@ -13,20 +13,16 @@ from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST, CONF_MAC, CONF_NAME, CONF_PIN
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import instance_id
from homeassistant.helpers.aiohttp_client import async_create_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.util.network import is_host_valid
from . import BraviaTVCoordinator
from .const import (
ATTR_CID,
ATTR_MAC,
ATTR_MODEL,
CONF_CLIENT_ID,
CONF_IGNORED_SOURCES,
CONF_NICKNAME,
CONF_USE_PSK,
DOMAIN,
@ -45,12 +41,6 @@ class BraviaTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self.device_config: dict[str, Any] = {}
self.entry: ConfigEntry | None = None
@staticmethod
@callback
def async_get_options_flow(config_entry: ConfigEntry) -> BraviaTVOptionsFlowHandler:
"""Bravia TV options callback."""
return BraviaTVOptionsFlowHandler(config_entry)
def create_client(self) -> None:
"""Create Bravia TV client from config."""
host = self.device_config[CONF_HOST]
@ -257,51 +247,3 @@ class BraviaTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self.entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])
self.device_config = {**entry_data}
return await self.async_step_authorize()
class BraviaTVOptionsFlowHandler(config_entries.OptionsFlowWithConfigEntry):
"""Config flow options for Bravia TV."""
data_schema: vol.Schema
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Manage the options."""
coordinator: BraviaTVCoordinator
coordinator = self.hass.data[DOMAIN][self.config_entry.entry_id]
try:
await coordinator.async_update_sources()
except BraviaError:
return self.async_abort(reason="failed_update")
sources = coordinator.source_map.values()
source_list = [item["title"] for item in sources]
ignored_sources = self.options.get(CONF_IGNORED_SOURCES, [])
for item in ignored_sources:
if item not in source_list:
source_list.append(item)
self.data_schema = vol.Schema(
{
vol.Optional(CONF_IGNORED_SOURCES): cv.multi_select(source_list),
}
)
return await self.async_step_user()
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle a flow initialized by the user."""
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
self.data_schema, self.options
),
)

View file

@ -3,16 +3,25 @@ from __future__ import annotations
from typing import Final
from homeassistant.backports.enum import StrEnum
ATTR_CID: Final = "cid"
ATTR_MAC: Final = "macAddr"
ATTR_MANUFACTURER: Final = "Sony"
ATTR_MODEL: Final = "model"
CONF_CLIENT_ID: Final = "client_id"
CONF_IGNORED_SOURCES: Final = "ignored_sources"
CONF_NICKNAME: Final = "nickname"
CONF_USE_PSK: Final = "use_psk"
DOMAIN: Final = "braviatv"
LEGACY_CLIENT_ID: Final = "HomeAssistant"
NICKNAME_PREFIX: Final = "Home Assistant"
class SourceType(StrEnum):
"""Source type for Sony TV Integration."""
APP = "app"
CHANNEL = "channel"
INPUT = "input"

View file

@ -32,6 +32,7 @@ from .const import (
DOMAIN,
LEGACY_CLIENT_ID,
NICKNAME_PREFIX,
SourceType,
)
_BraviaTVCoordinatorT = TypeVar("_BraviaTVCoordinatorT", bound="BraviaTVCoordinator")
@ -44,7 +45,7 @@ SCAN_INTERVAL: Final = timedelta(seconds=10)
def catch_braviatv_errors(
func: Callable[Concatenate[_BraviaTVCoordinatorT, _P], Awaitable[None]]
) -> Callable[Concatenate[_BraviaTVCoordinatorT, _P], Coroutine[Any, Any, None]]:
"""Catch BraviaClient errors."""
"""Catch Bravia errors."""
@wraps(func)
async def wrapper(
@ -52,7 +53,7 @@ def catch_braviatv_errors(
*args: _P.args,
**kwargs: _P.kwargs,
) -> None:
"""Catch BraviaClient errors and log message."""
"""Catch Bravia errors and log message."""
try:
await func(self, *args, **kwargs)
except BraviaError as err:
@ -70,7 +71,6 @@ class BraviaTVCoordinator(DataUpdateCoordinator[None]):
hass: HomeAssistant,
client: BraviaClient,
config: MappingProxyType[str, Any],
ignored_sources: list[str],
) -> None:
"""Initialize Bravia TV Client."""
@ -79,11 +79,11 @@ class BraviaTVCoordinator(DataUpdateCoordinator[None]):
self.use_psk = config.get(CONF_USE_PSK, False)
self.client_id = config.get(CONF_CLIENT_ID, LEGACY_CLIENT_ID)
self.nickname = config.get(CONF_NICKNAME, NICKNAME_PREFIX)
self.ignored_sources = ignored_sources
self.source: str | None = None
self.source_list: list[str] = []
self.source_map: dict[str, dict] = {}
self.media_title: str | None = None
self.media_channel: str | None = None
self.media_content_id: str | None = None
self.media_content_type: MediaType | None = None
self.media_uri: str | None = None
@ -92,7 +92,6 @@ class BraviaTVCoordinator(DataUpdateCoordinator[None]):
self.volume_target: str | None = None
self.volume_muted = False
self.is_on = False
self.is_channel = False
self.connected = False
self.skipped_updates = 0
@ -106,16 +105,23 @@ class BraviaTVCoordinator(DataUpdateCoordinator[None]):
),
)
def _sources_extend(self, sources: list[dict], source_type: str) -> None:
def _sources_extend(
self,
sources: list[dict],
source_type: SourceType,
add_to_list: bool = False,
sort_by: str | None = None,
) -> None:
"""Extend source map and source list."""
if sort_by:
sources = sorted(sources, key=lambda d: d.get(sort_by, ""))
for item in sources:
item["type"] = source_type
title = item.get("title")
uri = item.get("uri")
if not title or not uri:
continue
self.source_map[uri] = item
if title not in self.ignored_sources:
self.source_map[uri] = {**item, "type": source_type}
if add_to_list and title not in self.source_list:
self.source_list.append(title)
async def _async_update_data(self) -> None:
@ -162,25 +168,10 @@ class BraviaTVCoordinator(DataUpdateCoordinator[None]):
self.connected = False
raise UpdateFailed("Error communicating with device") from err
async def async_update_sources(self) -> None:
"""Update sources."""
self.source_list = []
self.source_map = {}
externals = await self.client.get_external_status()
self._sources_extend(externals, "input")
apps = await self.client.get_app_list()
self._sources_extend(apps, "app")
channels = await self.client.get_content_list_all("tv")
self._sources_extend(channels, "channel")
async def async_update_volume(self) -> None:
"""Update volume information."""
volume_info = await self.client.get_volume_info()
volume_level = volume_info.get("volume")
if volume_level is not None:
if volume_level := volume_info.get("volume"):
self.volume_level = volume_level / 100
self.volume_muted = volume_info.get("mute", False)
self.volume_target = volume_info.get("target")
@ -191,27 +182,68 @@ class BraviaTVCoordinator(DataUpdateCoordinator[None]):
self.media_title = playing_info.get("title")
self.media_uri = playing_info.get("uri")
self.media_duration = playing_info.get("durationSec")
if program_title := playing_info.get("programTitle"):
self.media_title = f"{self.media_title}: {program_title}"
self.media_channel = None
self.media_content_id = None
self.media_content_type = None
self.source = None
if self.media_uri:
source = self.source_map.get(self.media_uri, {})
self.source = source.get("title")
self.is_channel = self.media_uri[:2] == "tv"
if self.is_channel:
self.media_content_id = self.media_uri
if self.media_uri[:8] == "extInput":
self.source = playing_info.get("title")
if self.media_uri[:2] == "tv":
self.media_title = playing_info.get("programTitle")
self.media_channel = playing_info.get("title")
self.media_content_id = playing_info.get("dispNum")
self.media_content_type = MediaType.CHANNEL
else:
self.media_content_id = self.media_uri
self.media_content_type = None
else:
self.source = None
self.is_channel = False
self.media_content_id = None
self.media_content_type = None
if not playing_info:
self.media_title = "Smart TV"
self.media_content_type = MediaType.APP
async def async_update_sources(self) -> None:
"""Update all sources."""
self.source_list = []
self.source_map = {}
inputs = await self.client.get_external_status()
self._sources_extend(inputs, SourceType.INPUT, add_to_list=True)
apps = await self.client.get_app_list()
self._sources_extend(apps, SourceType.APP, sort_by="title")
channels = await self.client.get_content_list_all("tv")
self._sources_extend(channels, SourceType.CHANNEL)
async def async_source_start(self, uri: str, source_type: SourceType | str) -> None:
"""Select source by uri."""
if source_type == SourceType.APP:
await self.client.set_active_app(uri)
else:
await self.client.set_play_content(uri)
async def async_source_find(
self, query: str, source_type: SourceType | str
) -> None:
"""Find and select source by query."""
if query.startswith(("extInput:", "tv:", "com.sony.dtv.")):
return await self.async_source_start(query, source_type)
coarse_uri = None
is_numeric_search = source_type == SourceType.CHANNEL and query.isnumeric()
for uri, item in self.source_map.items():
if item["type"] == source_type:
if is_numeric_search:
num = item.get("dispNum")
if num and int(query) == int(num):
return await self.async_source_start(uri, source_type)
else:
title: str = item["title"]
if query.lower() == title.lower():
return await self.async_source_start(uri, source_type)
if query.lower() in title.lower():
coarse_uri = uri
if coarse_uri:
return await self.async_source_start(coarse_uri, source_type)
raise ValueError(f"Not found {source_type}: {query}")
@catch_braviatv_errors
async def async_turn_on(self) -> None:
"""Turn the device on."""
@ -260,7 +292,7 @@ class BraviaTVCoordinator(DataUpdateCoordinator[None]):
@catch_braviatv_errors
async def async_media_next_track(self) -> None:
"""Send next track command."""
if self.is_channel:
if self.media_content_type == MediaType.CHANNEL:
await self.client.channel_up()
else:
await self.client.next_track()
@ -268,21 +300,24 @@ class BraviaTVCoordinator(DataUpdateCoordinator[None]):
@catch_braviatv_errors
async def async_media_previous_track(self) -> None:
"""Send previous track command."""
if self.is_channel:
if self.media_content_type == MediaType.CHANNEL:
await self.client.channel_down()
else:
await self.client.previous_track()
@catch_braviatv_errors
async def async_play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Play a piece of media."""
if media_type not in (MediaType.APP, MediaType.CHANNEL):
raise ValueError(f"Invalid media type: {media_type}")
await self.async_source_find(media_id, media_type)
@catch_braviatv_errors
async def async_select_source(self, source: str) -> None:
"""Set the input source."""
for uri, item in self.source_map.items():
if item.get("title") == source:
if item.get("type") == "app":
await self.client.set_active_app(uri)
else:
await self.client.set_play_content(uri)
break
await self.async_source_find(source, SourceType.INPUT)
@catch_braviatv_errors
async def async_send_command(self, command: Iterable[str], repeats: int) -> None:

View file

@ -1,18 +1,23 @@
"""Media player support for Bravia TV integration."""
from __future__ import annotations
from typing import Any
from homeassistant.components.media_player import (
BrowseError,
MediaClass,
MediaPlayerDeviceClass,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType,
)
from homeassistant.components.media_player.browse_media import BrowseMedia
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import DOMAIN
from .const import DOMAIN, SourceType
from .entity import BraviaTVEntity
@ -49,6 +54,8 @@ class BraviaTVMediaPlayer(BraviaTVEntity, MediaPlayerEntity):
| MediaPlayerEntityFeature.SELECT_SOURCE
| MediaPlayerEntityFeature.PLAY
| MediaPlayerEntityFeature.STOP
| MediaPlayerEntityFeature.PLAY_MEDIA
| MediaPlayerEntityFeature.BROWSE_MEDIA
)
@property
@ -83,6 +90,11 @@ class BraviaTVMediaPlayer(BraviaTVEntity, MediaPlayerEntity):
"""Title of current playing media."""
return self.coordinator.media_title
@property
def media_channel(self) -> str | None:
"""Channel currently playing."""
return self.coordinator.media_channel
@property
def media_content_id(self) -> str | None:
"""Content ID of current playing media."""
@ -122,6 +134,123 @@ class BraviaTVMediaPlayer(BraviaTVEntity, MediaPlayerEntity):
"""Send mute command."""
await self.coordinator.async_volume_mute(mute)
async def async_browse_media(
self,
media_content_type: str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Browse apps and channels."""
if not media_content_id:
await self.coordinator.async_update_sources()
return await self.async_browse_media_root()
path = media_content_id.partition("/")
if path[0] == "apps":
return await self.async_browse_media_apps(True)
if path[0] == "channels":
return await self.async_browse_media_channels(True)
raise BrowseError(f"Media not found: {media_content_type} / {media_content_id}")
async def async_browse_media_root(self) -> BrowseMedia:
"""Return root media objects."""
return BrowseMedia(
title="Sony TV",
media_class=MediaClass.DIRECTORY,
media_content_id="",
media_content_type="",
can_play=False,
can_expand=True,
children=[
await self.async_browse_media_apps(),
await self.async_browse_media_channels(),
],
)
async def async_browse_media_apps(self, expanded: bool = False) -> BrowseMedia:
"""Return apps media objects."""
if expanded:
children = [
BrowseMedia(
title=item["title"],
media_class=MediaClass.APP,
media_content_id=uri,
media_content_type=MediaType.APP,
can_play=False,
can_expand=False,
thumbnail=self.get_browse_image_url(
MediaType.APP, uri, media_image_id=None
),
)
for uri, item in self.coordinator.source_map.items()
if item["type"] == SourceType.APP
]
else:
children = None
return BrowseMedia(
title="Applications",
media_class=MediaClass.DIRECTORY,
media_content_id="apps",
media_content_type=MediaType.APPS,
children_media_class=MediaClass.APP,
can_play=False,
can_expand=True,
children=children,
)
async def async_browse_media_channels(self, expanded: bool = False) -> BrowseMedia:
"""Return channels media objects."""
if expanded:
children = [
BrowseMedia(
title=item["title"],
media_class=MediaClass.CHANNEL,
media_content_id=uri,
media_content_type=MediaType.CHANNEL,
can_play=False,
can_expand=False,
)
for uri, item in self.coordinator.source_map.items()
if item["type"] == SourceType.CHANNEL
]
else:
children = None
return BrowseMedia(
title="Channels",
media_class=MediaClass.DIRECTORY,
media_content_id="channels",
media_content_type=MediaType.CHANNELS,
children_media_class=MediaClass.CHANNEL,
can_play=False,
can_expand=True,
children=children,
)
async def async_get_browse_image(
self,
media_content_type: str,
media_content_id: str,
media_image_id: str | None = None,
) -> tuple[bytes | None, str | None]:
"""Serve album art. Returns (content, content_type)."""
if media_content_type == MediaType.APP and media_content_id:
if icon := self.coordinator.source_map[media_content_id].get("icon"):
(content, content_type) = await self._async_fetch_image(icon)
if content_type:
# Fix invalid Content-Type header returned by Bravia
content_type = content_type.replace("Content-Type: ", "")
return (content, content_type)
return None, None
async def async_play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Play a piece of media."""
await self.coordinator.async_play_media(media_type, media_id, **kwargs)
async def async_select_source(self, source: str) -> None:
"""Set the input source."""
await self.coordinator.async_select_source(source)

View file

@ -44,18 +44,5 @@
"not_bravia_device": "The device is not a Bravia TV.",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
}
},
"options": {
"step": {
"user": {
"title": "Options for Sony Bravia TV",
"data": {
"ignored_sources": "List of ignored sources"
}
}
},
"abort": {
"failed_update": "An error occurred while updating the list of sources.\n\n Ensure that your TV is turned on before trying to set it up."
}
}
}

View file

@ -13,7 +13,6 @@ from homeassistant import data_entry_flow
from homeassistant.components import ssdp
from homeassistant.components.braviatv.const import (
CONF_CLIENT_ID,
CONF_IGNORED_SOURCES,
CONF_NICKNAME,
CONF_USE_PSK,
DOMAIN,
@ -21,7 +20,6 @@ from homeassistant.components.braviatv.const import (
)
from homeassistant.config_entries import SOURCE_REAUTH, SOURCE_SSDP, SOURCE_USER
from homeassistant.const import CONF_HOST, CONF_MAC, CONF_PIN
from homeassistant.core import HomeAssistant
from homeassistant.helpers import instance_id
from tests.common import MockConfigEntry
@ -376,97 +374,6 @@ async def test_create_entry_psk(hass):
}
async def test_options_flow(hass: HomeAssistant) -> None:
"""Test config flow options."""
config_entry = MockConfigEntry(
domain=DOMAIN,
unique_id="very_unique_string",
data={
CONF_HOST: "bravia-host",
CONF_PIN: "1234",
CONF_MAC: "AA:BB:CC:DD:EE:FF",
},
title="TV-Model",
)
config_entry.add_to_hass(hass)
with patch("pybravia.BraviaClient.connect"), patch(
"pybravia.BraviaClient.get_power_status",
return_value="active",
), patch(
"pybravia.BraviaClient.get_external_status",
return_value=BRAVIA_SOURCES,
), patch(
"pybravia.BraviaClient.send_rest_req",
return_value={},
):
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == data_entry_flow.FlowResultType.FORM
assert result["step_id"] == "user"
result = await hass.config_entries.options.async_configure(
result["flow_id"], user_input={CONF_IGNORED_SOURCES: ["HDMI 1", "HDMI 2"]}
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
assert config_entry.options == {CONF_IGNORED_SOURCES: ["HDMI 1", "HDMI 2"]}
# Test that saving with missing sources is ok
with patch(
"pybravia.BraviaClient.get_external_status",
return_value=BRAVIA_SOURCES[1:],
):
result = await hass.config_entries.options.async_init(config_entry.entry_id)
result = await hass.config_entries.options.async_configure(
result["flow_id"], user_input={CONF_IGNORED_SOURCES: ["HDMI 1"]}
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
assert config_entry.options == {CONF_IGNORED_SOURCES: ["HDMI 1"]}
async def test_options_flow_error(hass: HomeAssistant) -> None:
"""Test config flow options."""
config_entry = MockConfigEntry(
domain=DOMAIN,
unique_id="very_unique_string",
data={
CONF_HOST: "bravia-host",
CONF_PIN: "1234",
CONF_MAC: "AA:BB:CC:DD:EE:FF",
},
title="TV-Model",
)
config_entry.add_to_hass(hass)
with patch("pybravia.BraviaClient.connect"), patch(
"pybravia.BraviaClient.get_power_status",
return_value="active",
), patch(
"pybravia.BraviaClient.get_external_status",
return_value=BRAVIA_SOURCES,
), patch(
"pybravia.BraviaClient.send_rest_req",
return_value={},
):
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
with patch(
"pybravia.BraviaClient.send_rest_req",
side_effect=BraviaError,
):
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == data_entry_flow.FlowResultType.ABORT
assert result["reason"] == "failed_update"
@pytest.mark.parametrize(
"use_psk, new_pin",
[