Add favorites support to Media Browser for Squeezebox integration (#124732)

* Add Favorites support to Media Browser

* CI fixes

* More CI Fixes

* Another CI

* Change icons for other library items to use standard LMS icons

* Change max favorites to BROWSE_LIMIT

* Simplify library_payload to consolidate favorite and non-favorite items

* Simplify library_payload to consolidate favorite and non-favorite items

* Add support for favorite hierarchy

* small fix for icon naming with local albums

* Add ability to expand an album from a favorite list

* Reformat to fix linting error

* and ruff format

* Use library calls from pysqueezebox

* Folder and playback support

* Bump to pysqueezebox 0.8.0

* Bump pysqueezebox version to 0.8.1

* Add unit tests

* Improve unit tests

* Refactor tests to use websockets and services.async_call

* Apply suggestions from code review

---------

Co-authored-by: peteS-UK <64092177+peteS-UK@users.noreply.github.com>
This commit is contained in:
Raj Laud 2024-09-03 10:50:55 -04:00 committed by GitHub
parent 42ed7fbb0d
commit 78517f75e8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 382 additions and 12 deletions

View file

@ -11,9 +11,10 @@ from homeassistant.components.media_player import (
)
from homeassistant.helpers.network import is_internal_request
LIBRARY = ["Artists", "Albums", "Tracks", "Playlists", "Genres"]
LIBRARY = ["Favorites", "Artists", "Albums", "Tracks", "Playlists", "Genres"]
MEDIA_TYPE_TO_SQUEEZEBOX = {
"Favorites": "favorites",
"Artists": "artists",
"Albums": "albums",
"Tracks": "titles",
@ -32,9 +33,11 @@ SQUEEZEBOX_ID_BY_TYPE = {
MediaType.TRACK: "track_id",
MediaType.PLAYLIST: "playlist_id",
MediaType.GENRE: "genre_id",
"Favorites": "item_id",
}
CONTENT_TYPE_MEDIA_CLASS = {
"Favorites": {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK},
"Artists": {"item": MediaClass.DIRECTORY, "children": MediaClass.ARTIST},
"Albums": {"item": MediaClass.DIRECTORY, "children": MediaClass.ALBUM},
"Tracks": {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK},
@ -57,6 +60,7 @@ CONTENT_TYPE_TO_CHILD_TYPE = {
"Tracks": MediaType.TRACK,
"Playlists": MediaType.PLAYLIST,
"Genres": MediaType.GENRE,
"Favorites": None, # can only be determined after inspecting the item
}
BROWSE_LIMIT = 1000
@ -64,6 +68,7 @@ BROWSE_LIMIT = 1000
async def build_item_response(entity, player, payload):
"""Create response payload for search described by payload."""
internal_request = is_internal_request(entity.hass)
search_id = payload["search_id"]
@ -71,6 +76,8 @@ async def build_item_response(entity, player, payload):
media_class = CONTENT_TYPE_MEDIA_CLASS[search_type]
children = None
if search_id and search_id != search_type:
browse_id = (SQUEEZEBOX_ID_BY_TYPE[search_type], search_id)
else:
@ -82,16 +89,36 @@ async def build_item_response(entity, player, payload):
browse_id=browse_id,
)
children = None
if result is not None and result.get("items"):
item_type = CONTENT_TYPE_TO_CHILD_TYPE[search_type]
child_media_class = CONTENT_TYPE_MEDIA_CLASS[item_type]
children = []
for item in result["items"]:
item_id = str(item["id"])
item_thumbnail = None
if item_type:
child_item_type = item_type
child_media_class = CONTENT_TYPE_MEDIA_CLASS[item_type]
can_expand = child_media_class["children"] is not None
can_play = True
if search_type == "Favorites":
if "album_id" in item:
item_id = str(item["album_id"])
child_item_type = MediaType.ALBUM
child_media_class = CONTENT_TYPE_MEDIA_CLASS[MediaType.ALBUM]
can_expand = True
can_play = True
elif item["hasitems"]:
child_item_type = "Favorites"
child_media_class = CONTENT_TYPE_MEDIA_CLASS["Favorites"]
can_expand = True
can_play = False
else:
child_item_type = "Favorites"
child_media_class = CONTENT_TYPE_MEDIA_CLASS[MediaType.TRACK]
can_expand = False
can_play = True
if artwork_track_id := item.get("artwork_track_id"):
if internal_request:
@ -102,15 +129,17 @@ async def build_item_response(entity, player, payload):
item_thumbnail = entity.get_browse_image_url(
item_type, item_id, artwork_track_id
)
else:
item_thumbnail = item.get("image_url") # will not be proxied by HA
children.append(
BrowseMedia(
title=item["title"],
media_class=child_media_class["item"],
media_content_id=item_id,
media_content_type=item_type,
can_play=True,
can_expand=child_media_class["children"] is not None,
media_content_type=child_item_type,
can_play=can_play,
can_expand=can_expand,
thumbnail=item_thumbnail,
)
)
@ -124,7 +153,7 @@ async def build_item_response(entity, player, payload):
children_media_class=media_class["children"],
media_content_id=search_id,
media_content_type=search_type,
can_play=True,
can_play=search_type != "Favorites",
children=children,
can_expand=True,
)
@ -144,6 +173,7 @@ async def library_payload(hass, player):
for item in LIBRARY:
media_class = CONTENT_TYPE_MEDIA_CLASS[item]
result = await player.async_browse(
MEDIA_TYPE_TO_SQUEEZEBOX[item],
limit=1,
@ -155,7 +185,7 @@ async def library_payload(hass, player):
media_class=media_class["children"],
media_content_id=item,
media_content_type=item,
can_play=True,
can_play=item != "Favorites",
can_expand=True,
)
)
@ -184,10 +214,12 @@ async def generate_playlist(player, payload):
media_id = payload["search_id"]
if media_type not in SQUEEZEBOX_ID_BY_TYPE:
return None
raise BrowseError(f"Media type not supported: {media_type}")
browse_id = (SQUEEZEBOX_ID_BY_TYPE[media_type], media_id)
result = await player.async_browse(
"titles", limit=BROWSE_LIMIT, browse_id=browse_id
)
return result.get("items")
if result and "items" in result:
return result["items"]
raise BrowseError(f"Media not found: {media_type} / {media_id}")

View file

@ -591,7 +591,7 @@ class SqueezeBoxEntity(MediaPlayerEntity):
if media_content_type in [None, "library"]:
return await library_payload(self.hass, self._player)
if media_source.is_media_source_id(media_content_id):
if media_content_id and media_source.is_media_source_id(media_content_id):
return await media_source.async_browse_media(
self.hass, media_content_id, content_filter=media_source_content_filter
)

View file

@ -0,0 +1,133 @@
"""Setup the squeezebox tests."""
from collections.abc import Generator
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from homeassistant.components.media_player import MediaType
from homeassistant.components.squeezebox import const
from homeassistant.components.squeezebox.browse_media import (
MEDIA_TYPE_TO_SQUEEZEBOX,
SQUEEZEBOX_ID_BY_TYPE,
)
from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import format_mac
from tests.common import MockConfigEntry
TEST_HOST = "1.2.3.4"
TEST_PORT = "9000"
TEST_USE_HTTPS = False
SERVER_UUID = "12345678-1234-1234-1234-123456789012"
TEST_MAC = "aa:bb:cc:dd:ee:ff"
@pytest.fixture
def mock_setup_entry() -> Generator[AsyncMock]:
"""Override async_setup_entry."""
with patch(
"homeassistant.components.squeezebox.async_setup_entry", return_value=True
) as mock_setup_entry:
yield mock_setup_entry
@pytest.fixture
def config_entry(hass: HomeAssistant) -> MockConfigEntry:
"""Add the squeezebox mock config entry to hass."""
config_entry = MockConfigEntry(
domain=const.DOMAIN,
unique_id=SERVER_UUID,
data={
CONF_HOST: TEST_HOST,
CONF_PORT: TEST_PORT,
const.CONF_HTTPS: TEST_USE_HTTPS,
},
)
config_entry.add_to_hass(hass)
return config_entry
async def mock_async_browse(
media_type: MediaType, limit: int, browse_id: tuple | None = None
) -> dict | None:
"""Mock the async_browse method of pysqueezebox.Player."""
child_types = {
"favorites": "favorites",
"albums": "album",
"album": "track",
"genres": "genre",
"genre": "album",
"artists": "artist",
"artist": "album",
"titles": "title",
"title": "title",
"playlists": "playlist",
"playlist": "title",
}
fake_items = [
{
"title": "Fake Item 1",
"id": "1234",
"hasitems": False,
"item_type": child_types[media_type],
"artwork_track_id": "b35bb9e9",
},
{
"title": "Fake Item 2",
"id": "12345",
"hasitems": media_type == "favorites",
"item_type": child_types[media_type],
"image_url": "http://lms.internal:9000/html/images/favorites.png",
},
{
"title": "Fake Item 3",
"id": "123456",
"hasitems": media_type == "favorites",
"album_id": "123456" if media_type == "favorites" else None,
},
]
if browse_id:
search_type, search_id = browse_id
if search_id:
if search_type in SQUEEZEBOX_ID_BY_TYPE.values():
for item in fake_items:
if item["id"] == search_id:
return {
"title": item["title"],
"items": [item],
}
return None
if search_type in SQUEEZEBOX_ID_BY_TYPE.values():
return {
"title": search_type,
"items": fake_items,
}
return None
if media_type in MEDIA_TYPE_TO_SQUEEZEBOX.values():
return {
"title": media_type,
"items": fake_items,
}
return None
@pytest.fixture
def lms() -> MagicMock:
"""Mock a Lyrion Media Server with one mock player attached."""
lms = MagicMock()
player = MagicMock()
player.player_id = TEST_MAC
player.name = "Test Player"
player.power = False
player.async_browse = AsyncMock(side_effect=mock_async_browse)
player.async_load_playlist = AsyncMock()
player.async_update = AsyncMock()
player.generate_image_url_from_track_id = MagicMock(
return_value="http://lms.internal:9000/html/images/favorites.png"
)
lms.async_get_players = AsyncMock(return_value=[player])
lms.async_query = AsyncMock(return_value={"uuid": format_mac(TEST_MAC)})
return lms

View file

@ -0,0 +1,205 @@
"""Test the media browser interface."""
from unittest.mock import MagicMock, patch
import pytest
from homeassistant.components.media_player import (
ATTR_MEDIA_CONTENT_ID,
ATTR_MEDIA_CONTENT_TYPE,
DOMAIN as MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
BrowseError,
MediaType,
)
from homeassistant.components.squeezebox.browse_media import (
LIBRARY,
MEDIA_TYPE_TO_SQUEEZEBOX,
)
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
from tests.typing import WebSocketGenerator
@pytest.fixture(autouse=True)
async def setup_integration(
hass: HomeAssistant, config_entry: MockConfigEntry, lms: MagicMock
) -> None:
"""Fixture for setting up the component."""
with (
patch("homeassistant.components.squeezebox.Server", return_value=lms),
patch(
"homeassistant.components.squeezebox.media_player.start_server_discovery"
),
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done(wait_background_tasks=True)
async def test_async_browse_media_root(
hass: HomeAssistant,
config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test the async_browse_media function at the root level."""
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "",
"media_content_type": "library",
}
)
response = await client.receive_json()
assert response["success"]
result = response["result"]
for idx, item in enumerate(result["children"]):
assert item["title"] == LIBRARY[idx]
async def test_async_browse_media_with_subitems(
hass: HomeAssistant,
config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test each category with subitems."""
for category in ("Favorites", "Artists", "Albums", "Playlists", "Genres"):
with patch(
"homeassistant.components.squeezebox.browse_media.is_internal_request",
return_value=False,
):
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "",
"media_content_type": category,
}
)
response = await client.receive_json()
assert response["success"]
category_level = response["result"]
assert category_level["title"] == MEDIA_TYPE_TO_SQUEEZEBOX[category]
assert category_level["children"][0]["title"] == "Fake Item 1"
# Look up a subitem
search_type = category_level["children"][0]["media_content_type"]
search_id = category_level["children"][0]["media_content_id"]
await client.send_json(
{
"id": 2,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": search_id,
"media_content_type": search_type,
}
)
response = await client.receive_json()
assert response["success"]
search = response["result"]
assert search["title"] == "Fake Item 1"
async def test_async_browse_tracks(
hass: HomeAssistant,
config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test tracks (no subitems)."""
with patch(
"homeassistant.components.squeezebox.browse_media.is_internal_request",
return_value=True,
):
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "",
"media_content_type": "Tracks",
}
)
response = await client.receive_json()
assert response["success"]
tracks = response["result"]
assert tracks["title"] == "titles"
assert len(tracks["children"]) == 3
async def test_async_browse_error(
hass: HomeAssistant,
config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Search for a non-existent item and assert error."""
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "0",
"media_content_type": MediaType.ALBUM,
}
)
response = await client.receive_json()
assert not response["success"]
async def test_play_browse_item(
hass: HomeAssistant,
config_entry: MockConfigEntry,
) -> None:
"""Test play browse item."""
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: "media_player.test_player",
ATTR_MEDIA_CONTENT_ID: "1234",
ATTR_MEDIA_CONTENT_TYPE: "album",
},
)
async def test_play_browse_item_nonexistent(
hass: HomeAssistant, config_entry: MockConfigEntry
) -> None:
"""Test trying to play an item that doesn't exist."""
with pytest.raises(BrowseError):
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: "media_player.test_player",
ATTR_MEDIA_CONTENT_ID: "0",
ATTR_MEDIA_CONTENT_TYPE: "album",
},
blocking=True,
)
async def test_play_browse_item_bad_category(
hass: HomeAssistant,
config_entry: MockConfigEntry,
) -> None:
"""Test trying to play an item whose category doesn't exist."""
with pytest.raises(BrowseError):
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: "media_player.test_player",
ATTR_MEDIA_CONTENT_ID: "1234",
ATTR_MEDIA_CONTENT_TYPE: "bad_category",
},
blocking=True,
)