Migrate spotify to aiospotify (#127728)

This commit is contained in:
Joost Lekkerkerker 2024-10-16 17:04:05 +02:00 committed by GitHub
parent 11ac8f8006
commit 494511e099
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 511 additions and 481 deletions

View file

@ -3,16 +3,16 @@
from __future__ import annotations
from datetime import timedelta
from typing import Any
from typing import TYPE_CHECKING
import aiohttp
import requests
from spotipy import Spotify, SpotifyException
from spotifyaio import Device, SpotifyClient, SpotifyConnectionError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.const import CONF_ACCESS_TOKEN, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.config_entry_oauth2_flow import (
OAuth2Session,
async_get_config_entry_implementation,
@ -53,40 +53,36 @@ async def async_setup_entry(hass: HomeAssistant, entry: SpotifyConfigEntry) -> b
except aiohttp.ClientError as err:
raise ConfigEntryNotReady from err
spotify = Spotify(auth=session.token["access_token"])
spotify = SpotifyClient(async_get_clientsession(hass))
coordinator = SpotifyCoordinator(hass, spotify, session)
spotify.authenticate(session.token[CONF_ACCESS_TOKEN])
async def _refresh_token() -> str:
await session.async_ensure_token_valid()
token = session.token[CONF_ACCESS_TOKEN]
if TYPE_CHECKING:
assert isinstance(token, str)
return token
spotify.refresh_token_function = _refresh_token
coordinator = SpotifyCoordinator(hass, spotify)
await coordinator.async_config_entry_first_refresh()
async def _update_devices() -> list[dict[str, Any]]:
if not session.valid_token:
await session.async_ensure_token_valid()
await hass.async_add_executor_job(
spotify.set_auth, session.token["access_token"]
)
async def _update_devices() -> list[Device]:
try:
devices: dict[str, Any] | None = await hass.async_add_executor_job(
spotify.devices
)
except (requests.RequestException, SpotifyException) as err:
return await spotify.get_devices()
except SpotifyConnectionError as err:
raise UpdateFailed from err
if devices is None:
return []
return devices.get("devices", [])
device_coordinator: DataUpdateCoordinator[list[dict[str, Any]]] = (
DataUpdateCoordinator(
device_coordinator: DataUpdateCoordinator[list[Device]] = DataUpdateCoordinator(
hass,
LOGGER,
name=f"{entry.title} Devices",
update_interval=timedelta(minutes=5),
update_method=_update_devices,
)
)
await device_coordinator.async_config_entry_first_refresh()
entry.runtime_data = SpotifyData(coordinator, session, device_coordinator)

View file

@ -3,11 +3,17 @@
from __future__ import annotations
from enum import StrEnum
from functools import partial
import logging
from typing import Any
from typing import TYPE_CHECKING, Any, TypedDict
from spotipy import Spotify
from spotifyaio import (
Artist,
BasePlaylist,
SimplifiedAlbum,
SimplifiedTrack,
SpotifyClient,
Track,
)
import yarl
from homeassistant.components.media_player import (
@ -18,7 +24,6 @@ from homeassistant.components.media_player import (
)
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from homeassistant.helpers.config_entry_oauth2_flow import OAuth2Session
from .const import DOMAIN, MEDIA_PLAYER_PREFIX, MEDIA_TYPE_SHOW, PLAYABLE_MEDIA_TYPES
from .util import fetch_image_url
@ -29,6 +34,62 @@ BROWSE_LIMIT = 48
_LOGGER = logging.getLogger(__name__)
class ItemPayload(TypedDict):
"""TypedDict for item payload."""
name: str
type: str
uri: str
id: str | None
thumbnail: str | None
def _get_artist_item_payload(artist: Artist) -> ItemPayload:
return {
"id": artist.artist_id,
"name": artist.name,
"type": MediaType.ARTIST,
"uri": artist.uri,
"thumbnail": fetch_image_url(artist.images),
}
def _get_album_item_payload(album: SimplifiedAlbum) -> ItemPayload:
return {
"id": album.album_id,
"name": album.name,
"type": MediaType.ALBUM,
"uri": album.uri,
"thumbnail": fetch_image_url(album.images),
}
def _get_playlist_item_payload(playlist: BasePlaylist) -> ItemPayload:
return {
"id": playlist.playlist_id,
"name": playlist.name,
"type": MediaType.PLAYLIST,
"uri": playlist.uri,
"thumbnail": fetch_image_url(playlist.images),
}
def _get_track_item_payload(
track: SimplifiedTrack, show_thumbnails: bool = True
) -> ItemPayload:
return {
"id": track.track_id,
"name": track.name,
"type": MediaType.TRACK,
"uri": track.uri,
"thumbnail": (
fetch_image_url(track.album.images)
if show_thumbnails and isinstance(track, Track)
else None
),
}
class BrowsableMedia(StrEnum):
"""Enum of browsable media."""
@ -192,14 +253,13 @@ async def async_browse_media(
result = await async_browse_media_internal(
hass,
info.coordinator.client,
info.session,
info.coordinator.current_user,
media_content_type,
media_content_id,
can_play_artist=can_play_artist,
)
# Build new URLs with config entry specifyers
# Build new URLs with config entry specifiers
result.media_content_id = str(parsed_url.with_name(result.media_content_id))
if result.children:
for child in result.children:
@ -209,8 +269,7 @@ async def async_browse_media(
async def async_browse_media_internal(
hass: HomeAssistant,
spotify: Spotify,
session: OAuth2Session,
spotify: SpotifyClient,
current_user: dict[str, Any],
media_content_type: str | None,
media_content_id: str | None,
@ -219,15 +278,7 @@ async def async_browse_media_internal(
) -> BrowseMedia:
"""Browse spotify media."""
if media_content_type in (None, f"{MEDIA_PLAYER_PREFIX}library"):
return await hass.async_add_executor_job(
partial(library_payload, can_play_artist=can_play_artist)
)
if not session.valid_token:
await session.async_ensure_token_valid()
await hass.async_add_executor_job(
spotify.set_auth, session.token["access_token"]
)
return await library_payload(can_play_artist=can_play_artist)
# Strip prefix
if media_content_type:
@ -237,22 +288,19 @@ async def async_browse_media_internal(
"media_content_type": media_content_type,
"media_content_id": media_content_id,
}
response = await hass.async_add_executor_job(
partial(
build_item_response,
response = await build_item_response(
spotify,
current_user,
payload,
can_play_artist=can_play_artist,
)
)
if response is None:
raise BrowseError(f"Media not found: {media_content_type} / {media_content_id}")
return response
def build_item_response( # noqa: C901
spotify: Spotify,
async def build_item_response( # noqa: C901
spotify: SpotifyClient,
user: dict[str, Any],
payload: dict[str, str | None],
*,
@ -265,80 +313,112 @@ def build_item_response( # noqa: C901
if media_content_type is None or media_content_id is None:
return None
title = None
image = None
media: dict[str, Any] | None = None
items = []
title: str | None = None
image: str | None = None
items: list[ItemPayload] = []
if media_content_type == BrowsableMedia.CURRENT_USER_PLAYLISTS:
if media := spotify.current_user_playlists(limit=BROWSE_LIMIT):
items = media.get("items", [])
if playlists := await spotify.get_playlists_for_current_user():
items = [_get_playlist_item_payload(playlist) for playlist in playlists]
elif media_content_type == BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS:
if media := spotify.current_user_followed_artists(limit=BROWSE_LIMIT):
items = media.get("artists", {}).get("items", [])
if artists := await spotify.get_followed_artists():
items = [_get_artist_item_payload(artist) for artist in artists]
elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_ALBUMS:
if media := spotify.current_user_saved_albums(limit=BROWSE_LIMIT):
items = [item["album"] for item in media.get("items", [])]
if saved_albums := await spotify.get_saved_albums():
items = [
_get_album_item_payload(saved_album.album)
for saved_album in saved_albums
]
elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_TRACKS:
if media := spotify.current_user_saved_tracks(limit=BROWSE_LIMIT):
items = [item["track"] for item in media.get("items", [])]
if media := await spotify.get_saved_tracks():
items = [
_get_track_item_payload(saved_track.track) for saved_track in media
]
elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_SHOWS:
if media := spotify.current_user_saved_shows(limit=BROWSE_LIMIT):
items = [item["show"] for item in media.get("items", [])]
if media := await spotify.get_saved_shows():
items = [
{
"id": saved_show.show.show_id,
"name": saved_show.show.name,
"type": MEDIA_TYPE_SHOW,
"uri": saved_show.show.uri,
"thumbnail": fetch_image_url(saved_show.show.images),
}
for saved_show in media
]
elif media_content_type == BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED:
if media := spotify.current_user_recently_played(limit=BROWSE_LIMIT):
items = [item["track"] for item in media.get("items", [])]
if media := await spotify.get_recently_played_tracks():
items = [_get_track_item_payload(item.track) for item in media]
elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_ARTISTS:
if media := spotify.current_user_top_artists(limit=BROWSE_LIMIT):
items = media.get("items", [])
if media := await spotify.get_top_artists():
items = [_get_artist_item_payload(artist) for artist in media]
elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_TRACKS:
if media := spotify.current_user_top_tracks(limit=BROWSE_LIMIT):
items = media.get("items", [])
if media := await spotify.get_top_tracks():
items = [_get_track_item_payload(track) for track in media]
elif media_content_type == BrowsableMedia.FEATURED_PLAYLISTS:
if media := spotify.featured_playlists(
country=user["country"], limit=BROWSE_LIMIT
):
items = media.get("playlists", {}).get("items", [])
if media := await spotify.get_featured_playlists():
items = [_get_playlist_item_payload(playlist) for playlist in media]
elif media_content_type == BrowsableMedia.CATEGORIES:
if media := spotify.categories(country=user["country"], limit=BROWSE_LIMIT):
items = media.get("categories", {}).get("items", [])
if media := await spotify.get_categories():
items = [
{
"id": category.category_id,
"name": category.name,
"type": "category_playlists",
"uri": category.category_id,
"thumbnail": category.icons[0].url if category.icons else None,
}
for category in media
]
elif media_content_type == "category_playlists":
if (
media := spotify.category_playlists(
category_id=media_content_id,
country=user["country"],
limit=BROWSE_LIMIT,
)
) and (category := spotify.category(media_content_id, country=user["country"])):
title = category.get("name")
image = fetch_image_url(category, key="icons")
items = media.get("playlists", {}).get("items", [])
media := await spotify.get_category_playlists(category_id=media_content_id)
) and (category := await spotify.get_category(media_content_id)):
title = category.name
image = category.icons[0].url if category.icons else None
items = [_get_playlist_item_payload(playlist) for playlist in media]
elif media_content_type == BrowsableMedia.NEW_RELEASES:
if media := spotify.new_releases(country=user["country"], limit=BROWSE_LIMIT):
items = media.get("albums", {}).get("items", [])
if media := await spotify.get_new_releases():
items = [_get_album_item_payload(album) for album in media]
elif media_content_type == MediaType.PLAYLIST:
if media := spotify.playlist(media_content_id):
items = [item["track"] for item in media.get("tracks", {}).get("items", [])]
if media := await spotify.get_playlist(media_content_id):
title = media.name
image = media.images[0].url if media.images else None
items = [
_get_track_item_payload(playlist_track.track)
for playlist_track in media.tracks.items
]
elif media_content_type == MediaType.ALBUM:
if media := spotify.album(media_content_id):
items = media.get("tracks", {}).get("items", [])
if media := await spotify.get_album(media_content_id):
title = media.name
image = media.images[0].url if media.images else None
items = [
_get_track_item_payload(track, show_thumbnails=False)
for track in media.tracks
]
elif media_content_type == MediaType.ARTIST:
if (media := spotify.artist_albums(media_content_id, limit=BROWSE_LIMIT)) and (
artist := spotify.artist(media_content_id)
if (media := await spotify.get_artist_albums(media_content_id)) and (
artist := await spotify.get_artist(media_content_id)
):
title = artist.get("name")
image = fetch_image_url(artist)
items = media.get("items", [])
title = artist.name
image = artist.images[0].url if artist.images else None
items = [_get_album_item_payload(album) for album in media]
elif media_content_type == MEDIA_TYPE_SHOW:
if (media := spotify.show_episodes(media_content_id, limit=BROWSE_LIMIT)) and (
show := spotify.show(media_content_id)
if (media := await spotify.get_show_episodes(media_content_id)) and (
show := await spotify.get_show(media_content_id)
):
title = show.get("name")
image = fetch_image_url(show)
items = media.get("items", [])
if media is None:
return None
title = show.name
image = show.images[0].url if show.images else None
items = [
{
"id": episode.episode_id,
"name": episode.name,
"type": MediaType.EPISODE,
"uri": episode.uri,
"thumbnail": fetch_image_url(episode.images),
}
for episode in media
]
try:
media_class = CONTENT_TYPE_MEDIA_CLASS[media_content_type]
@ -359,9 +439,7 @@ def build_item_response( # noqa: C901
media_item.children = []
for item in items:
try:
item_id = item["id"]
except KeyError:
if (item_id := item["id"]) is None:
_LOGGER.debug("Missing ID for media item: %s", item)
continue
media_item.children.append(
@ -372,21 +450,21 @@ def build_item_response( # noqa: C901
media_class=MediaClass.PLAYLIST,
media_content_id=item_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}category_playlists",
thumbnail=fetch_image_url(item, key="icons"),
title=item.get("name"),
thumbnail=item["thumbnail"],
title=item["name"],
)
)
return media_item
if title is None:
title = LIBRARY_MAP.get(media_content_id, "Unknown")
if "name" in media:
title = media["name"]
can_play = media_content_type in PLAYABLE_MEDIA_TYPES and (
media_content_type != MediaType.ARTIST or can_play_artist
)
if TYPE_CHECKING:
assert title
browse_media = BrowseMedia(
can_expand=True,
can_play=can_play,
@ -407,23 +485,16 @@ def build_item_response( # noqa: C901
except (MissingMediaInformation, UnknownMediaType):
continue
if "images" in media:
browse_media.thumbnail = fetch_image_url(media)
return browse_media
def item_payload(item: dict[str, Any], *, can_play_artist: bool) -> BrowseMedia:
def item_payload(item: ItemPayload, *, can_play_artist: bool) -> BrowseMedia:
"""Create response payload for a single media item.
Used by async_browse_media.
"""
try:
media_type = item["type"]
media_id = item["uri"]
except KeyError as err:
_LOGGER.debug("Missing type or URI for media item: %s", item)
raise MissingMediaInformation from err
try:
media_class = CONTENT_TYPE_MEDIA_CLASS[media_type]
@ -440,25 +511,19 @@ def item_payload(item: dict[str, Any], *, can_play_artist: bool) -> BrowseMedia:
media_type != MediaType.ARTIST or can_play_artist
)
browse_media = BrowseMedia(
return BrowseMedia(
can_expand=can_expand,
can_play=can_play,
children_media_class=media_class["children"],
media_class=media_class["parent"],
media_content_id=media_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_type}",
title=item.get("name", "Unknown"),
title=item["name"],
thumbnail=item["thumbnail"],
)
if "images" in item:
browse_media.thumbnail = fetch_image_url(item)
elif MediaType.ALBUM in item:
browse_media.thumbnail = fetch_image_url(item[MediaType.ALBUM])
return browse_media
def library_payload(*, can_play_artist: bool) -> BrowseMedia:
async def library_payload(*, can_play_artist: bool) -> BrowseMedia:
"""Create response payload to describe contents of a specific library.
Used by async_browse_media.
@ -474,10 +539,16 @@ def library_payload(*, can_play_artist: bool) -> BrowseMedia:
)
browse_media.children = []
for item in [{"name": n, "type": t} for t, n in LIBRARY_MAP.items()]:
for item_type, item_name in LIBRARY_MAP.items():
browse_media.children.append(
item_payload(
{"name": item["name"], "type": item["type"], "uri": item["type"]},
{
"name": item_name,
"type": item_type,
"uri": item_type,
"id": None,
"thumbnail": None,
},
can_play_artist=can_play_artist,
)
)

View file

@ -6,10 +6,12 @@ from collections.abc import Mapping
import logging
from typing import Any
from spotipy import Spotify
from spotifyaio import SpotifyClient
from homeassistant.config_entries import SOURCE_REAUTH, ConfigFlowResult
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_NAME, CONF_TOKEN
from homeassistant.helpers import config_entry_oauth2_flow
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN, SPOTIFY_SCOPES
@ -34,27 +36,24 @@ class SpotifyFlowHandler(
async def async_oauth_create_entry(self, data: dict[str, Any]) -> ConfigFlowResult:
"""Create an entry for Spotify."""
spotify = Spotify(auth=data["token"]["access_token"])
spotify = SpotifyClient(async_get_clientsession(self.hass))
spotify.authenticate(data[CONF_TOKEN][CONF_ACCESS_TOKEN])
try:
current_user = await self.hass.async_add_executor_job(spotify.current_user)
current_user = await spotify.get_current_user()
except Exception: # noqa: BLE001
return self.async_abort(reason="connection_error")
name = data["id"] = current_user["id"]
name = current_user.display_name
if current_user.get("display_name"):
name = current_user["display_name"]
data["name"] = name
await self.async_set_unique_id(current_user["id"])
await self.async_set_unique_id(current_user.user_id)
if self.source == SOURCE_REAUTH:
self._abort_if_unique_id_mismatch(reason="reauth_account_mismatch")
return self.async_update_reload_and_abort(
self._get_reauth_entry(), title=name, data=data
)
return self.async_create_entry(title=name, data=data)
return self.async_create_entry(title=name, data={**data, CONF_NAME: name})
async def async_step_reauth(
self, entry_data: Mapping[str, Any]

View file

@ -3,13 +3,17 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
from typing import Any
from spotipy import Spotify, SpotifyException
from spotifyaio import (
PlaybackState,
Playlist,
SpotifyClient,
SpotifyConnectionError,
UserProfile,
)
from homeassistant.components.media_player import MediaType
from homeassistant.core import HomeAssistant
from homeassistant.helpers.config_entry_oauth2_flow import OAuth2Session
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
import homeassistant.util.dt as dt_util
@ -22,25 +26,24 @@ _LOGGER = logging.getLogger(__name__)
class SpotifyCoordinatorData:
"""Class to hold Spotify data."""
current_playback: dict[str, Any]
current_playback: PlaybackState | None
position_updated_at: datetime | None
playlist: dict[str, Any] | None
playlist: Playlist | None
dj_playlist: bool = False
# This is a minimal representation of the DJ playlist that Spotify now offers
# The DJ is not fully integrated with the playlist API, so needs to have the
# playlist response mocked in order to maintain functionality
SPOTIFY_DJ_PLAYLIST = {"uri": "spotify:playlist:37i9dQZF1EYkqdzj48dyYq", "name": "DJ"}
# The DJ is not fully integrated with the playlist API, so we need to guard
# against trying to fetch it as a regular playlist
SPOTIFY_DJ_PLAYLIST_URI = "spotify:playlist:37i9dQZF1EYkqdzj48dyYq"
class SpotifyCoordinator(DataUpdateCoordinator[SpotifyCoordinatorData]):
"""Class to manage fetching Spotify data."""
current_user: dict[str, Any]
current_user: UserProfile
def __init__(
self, hass: HomeAssistant, client: Spotify, session: OAuth2Session
) -> None:
def __init__(self, hass: HomeAssistant, client: SpotifyClient) -> None:
"""Initialize."""
super().__init__(
hass,
@ -49,65 +52,46 @@ class SpotifyCoordinator(DataUpdateCoordinator[SpotifyCoordinatorData]):
update_interval=timedelta(seconds=30),
)
self.client = client
self._playlist: dict[str, Any] | None = None
self.session = session
self._playlist: Playlist | None = None
async def _async_setup(self) -> None:
"""Set up the coordinator."""
try:
self.current_user = await self.hass.async_add_executor_job(self.client.me)
except SpotifyException as err:
self.current_user = await self.client.get_current_user()
except SpotifyConnectionError as err:
raise UpdateFailed("Error communicating with Spotify API") from err
if not self.current_user:
raise UpdateFailed("Could not retrieve user")
async def _async_update_data(self) -> SpotifyCoordinatorData:
if not self.session.valid_token:
await self.session.async_ensure_token_valid()
await self.hass.async_add_executor_job(
self.client.set_auth, self.session.token["access_token"]
current = await self.client.get_playback()
if not current:
return SpotifyCoordinatorData(
current_playback=None, position_updated_at=None, playlist=None
)
return await self.hass.async_add_executor_job(self._sync_update_data)
def _sync_update_data(self) -> SpotifyCoordinatorData:
current = self.client.current_playback(additional_types=[MediaType.EPISODE])
currently_playing = current or {}
# Record the last updated time, because Spotify's timestamp property is unreliable
# and doesn't actually return the fetch time as is mentioned in the API description
position_updated_at = dt_util.utcnow() if current is not None else None
position_updated_at = dt_util.utcnow()
context = currently_playing.get("context") or {}
# For some users in some cases, the uri is formed like
# "spotify:user:{name}:playlist:{id}" and spotipy wants
# the type to be playlist.
uri = context.get("uri")
if uri is not None:
parts = uri.split(":")
if len(parts) == 5 and parts[1] == "user" and parts[3] == "playlist":
uri = ":".join([parts[0], parts[3], parts[4]])
if context and (self._playlist is None or self._playlist["uri"] != uri):
dj_playlist = False
if (context := current.context) is not None:
if self._playlist is None or self._playlist.uri != context.uri:
self._playlist = None
if context["type"] == MediaType.PLAYLIST:
# The Spotify API does not currently support doing a lookup for
# the DJ playlist,so just use the minimal mock playlist object
if uri == SPOTIFY_DJ_PLAYLIST["uri"]:
self._playlist = SPOTIFY_DJ_PLAYLIST
else:
if context.uri == SPOTIFY_DJ_PLAYLIST_URI:
dj_playlist = True
elif context.context_type == MediaType.PLAYLIST:
# Make sure any playlist lookups don't break the current
# playback state update
try:
self._playlist = self.client.playlist(uri)
except SpotifyException:
self._playlist = await self.client.get_playlist(context.uri)
except SpotifyConnectionError:
_LOGGER.debug(
"Unable to load spotify playlist '%s'. "
"Continuing without playlist data",
uri,
context.uri,
)
self._playlist = None
return SpotifyCoordinatorData(
current_playback=currently_playing,
current_playback=current,
position_updated_at=position_updated_at,
playlist=self._playlist,
dj_playlist=dj_playlist,
)

View file

@ -9,6 +9,6 @@
"iot_class": "cloud_polling",
"loggers": ["spotipy"],
"quality_scale": "silver",
"requirements": ["spotipy==2.23.0"],
"requirements": ["spotifyaio==0.6.0"],
"zeroconf": ["_spotify-connect._tcp.local."]
}

View file

@ -4,12 +4,19 @@ from __future__ import annotations
from collections.abc import Callable
import datetime as dt
from datetime import timedelta
import logging
from typing import Any, Concatenate
from typing import TYPE_CHECKING, Any
import requests
from spotipy import SpotifyException
from spotifyaio import (
Device,
Episode,
Item,
ItemType,
PlaybackState,
ProductType,
RepeatMode as SpotifyRepeatMode,
Track,
)
from yarl import URL
from homeassistant.components.media_player import (
@ -22,9 +29,7 @@ from homeassistant.components.media_player import (
MediaType,
RepeatMode,
)
from homeassistant.const import CONF_ID
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
@ -36,12 +41,9 @@ from . import SpotifyConfigEntry
from .browse_media import async_browse_media_internal
from .const import DOMAIN, MEDIA_PLAYER_PREFIX, PLAYABLE_MEDIA_TYPES
from .coordinator import SpotifyCoordinator
from .util import fetch_image_url
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(seconds=30)
SUPPORT_SPOTIFY = (
MediaPlayerEntityFeature.BROWSE_MEDIA
| MediaPlayerEntityFeature.NEXT_TRACK
@ -57,9 +59,9 @@ SUPPORT_SPOTIFY = (
)
REPEAT_MODE_MAPPING_TO_HA = {
"context": RepeatMode.ALL,
"off": RepeatMode.OFF,
"track": RepeatMode.ONE,
SpotifyRepeatMode.CONTEXT: RepeatMode.ALL,
SpotifyRepeatMode.OFF: RepeatMode.OFF,
SpotifyRepeatMode.TRACK: RepeatMode.ONE,
}
REPEAT_MODE_MAPPING_TO_SPOTIFY = {
@ -74,39 +76,25 @@ async def async_setup_entry(
) -> None:
"""Set up Spotify based on a config entry."""
data = entry.runtime_data
assert entry.unique_id is not None
spotify = SpotifyMediaPlayer(
data.coordinator,
data.devices,
entry.data[CONF_ID],
entry.unique_id,
entry.title,
)
async_add_entities([spotify])
def spotify_exception_handler[_SpotifyMediaPlayerT: SpotifyMediaPlayer, **_P, _R](
func: Callable[Concatenate[_SpotifyMediaPlayerT, _P], _R],
) -> Callable[Concatenate[_SpotifyMediaPlayerT, _P], _R | None]:
"""Decorate Spotify calls to handle Spotify exception.
def ensure_item[_R](
func: Callable[[SpotifyMediaPlayer, Item], _R],
) -> Callable[[SpotifyMediaPlayer], _R | None]:
"""Ensure that the currently playing item is available."""
A decorator that wraps the passed in function, catches Spotify errors,
aiohttp exceptions and handles the availability of the media player.
"""
def wrapper(
self: _SpotifyMediaPlayerT, *args: _P.args, **kwargs: _P.kwargs
) -> _R | None:
try:
result = func(self, *args, **kwargs)
except requests.RequestException:
self._attr_available = False
def wrapper(self: SpotifyMediaPlayer) -> _R | None:
if not self.currently_playing or not self.currently_playing.item:
return None
except SpotifyException as exc:
self._attr_available = False
if exc.reason == "NO_ACTIVE_DEVICE":
raise HomeAssistantError("No active playback device found") from None
raise HomeAssistantError(f"Spotify error: {exc.reason}") from exc
self._attr_available = True
return result
return func(self, self.currently_playing.item)
return wrapper
@ -122,7 +110,7 @@ class SpotifyMediaPlayer(CoordinatorEntity[SpotifyCoordinator], MediaPlayerEntit
def __init__(
self,
coordinator: SpotifyCoordinator,
device_coordinator: DataUpdateCoordinator[list[dict[str, Any]]],
device_coordinator: DataUpdateCoordinator[list[Device]],
user_id: str,
name: str,
) -> None:
@ -135,25 +123,23 @@ class SpotifyMediaPlayer(CoordinatorEntity[SpotifyCoordinator], MediaPlayerEntit
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, user_id)},
manufacturer="Spotify AB",
model=f"Spotify {coordinator.current_user['product']}",
model=f"Spotify {coordinator.current_user.product}",
name=f"Spotify {name}",
entry_type=DeviceEntryType.SERVICE,
configuration_url="https://open.spotify.com",
)
@property
def currently_playing(self) -> dict[str, Any]:
def currently_playing(self) -> PlaybackState | None:
"""Return the current playback."""
return self.coordinator.data.current_playback
@property
def supported_features(self) -> MediaPlayerEntityFeature:
"""Return the supported features."""
if self.coordinator.current_user["product"] != "premium":
if self.coordinator.current_user.product != ProductType.PREMIUM:
return MediaPlayerEntityFeature(0)
if not self.currently_playing or self.currently_playing.get("device", {}).get(
"is_restricted"
):
if not self.currently_playing or self.currently_playing.device.is_restricted:
return MediaPlayerEntityFeature.SELECT_SOURCE
return SUPPORT_SPOTIFY
@ -162,7 +148,7 @@ class SpotifyMediaPlayer(CoordinatorEntity[SpotifyCoordinator], MediaPlayerEntit
"""Return the playback state."""
if not self.currently_playing:
return MediaPlayerState.IDLE
if self.currently_playing["is_playing"]:
if self.currently_playing.is_playing:
return MediaPlayerState.PLAYING
return MediaPlayerState.PAUSED
@ -171,41 +157,32 @@ class SpotifyMediaPlayer(CoordinatorEntity[SpotifyCoordinator], MediaPlayerEntit
"""Return the device volume."""
if not self.currently_playing:
return None
return self.currently_playing.get("device", {}).get("volume_percent", 0) / 100
return self.currently_playing.device.volume_percent / 100
@property
def media_content_id(self) -> str | None:
@ensure_item
def media_content_id(self, item: Item) -> str: # noqa: PLR0206
"""Return the media URL."""
if not self.currently_playing:
return None
item = self.currently_playing.get("item") or {}
return item.get("uri")
return item.uri
@property
def media_content_type(self) -> str | None:
@ensure_item
def media_content_type(self, item: Item) -> str: # noqa: PLR0206
"""Return the media type."""
if not self.currently_playing:
return None
item = self.currently_playing.get("item") or {}
is_episode = item.get("type") == MediaType.EPISODE
return MediaType.PODCAST if is_episode else MediaType.MUSIC
return MediaType.PODCAST if item.type == MediaType.EPISODE else MediaType.MUSIC
@property
def media_duration(self) -> int | None:
@ensure_item
def media_duration(self, item: Item) -> int: # noqa: PLR0206
"""Duration of current playing media in seconds."""
if self.currently_playing is None or self.currently_playing.get("item") is None:
return None
return self.currently_playing["item"]["duration_ms"] / 1000
return item.duration_ms / 1000
@property
def media_position(self) -> int | None:
"""Position of current playing media in seconds."""
if (
not self.currently_playing
or self.currently_playing.get("progress_ms") is None
):
if not self.currently_playing or self.currently_playing.progress_ms is None:
return None
return self.currently_playing["progress_ms"] / 1000
return self.currently_playing.progress_ms / 1000
@property
def media_position_updated_at(self) -> dt.datetime | None:
@ -215,131 +192,125 @@ class SpotifyMediaPlayer(CoordinatorEntity[SpotifyCoordinator], MediaPlayerEntit
return self.coordinator.data.position_updated_at
@property
def media_image_url(self) -> str | None:
@ensure_item
def media_image_url(self, item: Item) -> str | None: # noqa: PLR0206
"""Return the media image URL."""
if not self.currently_playing or self.currently_playing.get("item") is None:
if item.type == ItemType.EPISODE:
if TYPE_CHECKING:
assert isinstance(item, Episode)
if item.images:
return item.images[0].url
if item.show and item.show.images:
return item.show.images[0].url
return None
item = self.currently_playing["item"]
if item["type"] == MediaType.EPISODE:
if item["images"]:
return fetch_image_url(item)
if item["show"]["images"]:
return fetch_image_url(item["show"])
if TYPE_CHECKING:
assert isinstance(item, Track)
if not item.album.images:
return None
if not item["album"]["images"]:
return None
return fetch_image_url(item["album"])
return item.album.images[0].url
@property
def media_title(self) -> str | None:
@ensure_item
def media_title(self, item: Item) -> str: # noqa: PLR0206
"""Return the media title."""
if not self.currently_playing:
return None
item = self.currently_playing.get("item") or {}
return item.get("name")
return item.name
@property
def media_artist(self) -> str | None:
@ensure_item
def media_artist(self, item: Item) -> str: # noqa: PLR0206
"""Return the media artist."""
if not self.currently_playing or self.currently_playing.get("item") is None:
return None
if item.type == ItemType.EPISODE:
if TYPE_CHECKING:
assert isinstance(item, Episode)
return item.show.publisher
item = self.currently_playing["item"]
if item["type"] == MediaType.EPISODE:
return item["show"]["publisher"]
return ", ".join(artist["name"] for artist in item["artists"])
if TYPE_CHECKING:
assert isinstance(item, Track)
return ", ".join(artist.name for artist in item.artists)
@property
def media_album_name(self) -> str | None:
@ensure_item
def media_album_name(self, item: Item) -> str: # noqa: PLR0206
"""Return the media album."""
if not self.currently_playing or self.currently_playing.get("item") is None:
return None
if item.type == ItemType.EPISODE:
if TYPE_CHECKING:
assert isinstance(item, Episode)
return item.show.name
item = self.currently_playing["item"]
if item["type"] == MediaType.EPISODE:
return item["show"]["name"]
return item["album"]["name"]
if TYPE_CHECKING:
assert isinstance(item, Track)
return item.album.name
@property
def media_track(self) -> int | None:
@ensure_item
def media_track(self, item: Item) -> int | None: # noqa: PLR0206
"""Track number of current playing media, music track only."""
if not self.currently_playing:
if item.type == ItemType.EPISODE:
return None
item = self.currently_playing.get("item") or {}
return item.get("track_number")
if TYPE_CHECKING:
assert isinstance(item, Track)
return item.track_number
@property
def media_playlist(self):
def media_playlist(self) -> str | None:
"""Title of Playlist currently playing."""
if self.coordinator.data.dj_playlist:
return "DJ"
if self.coordinator.data.playlist is None:
return None
return self.coordinator.data.playlist["name"]
return self.coordinator.data.playlist.name
@property
def source(self) -> str | None:
"""Return the current playback device."""
if not self.currently_playing:
return None
return self.currently_playing.get("device", {}).get("name")
return self.currently_playing.device.name
@property
def source_list(self) -> list[str] | None:
"""Return a list of source devices."""
return [device["name"] for device in self.devices.data]
return [device.name for device in self.devices.data]
@property
def shuffle(self) -> bool | None:
"""Shuffling state."""
if not self.currently_playing:
return None
return self.currently_playing.get("shuffle_state")
return self.currently_playing.shuffle
@property
def repeat(self) -> RepeatMode | None:
"""Return current repeat mode."""
if (
not self.currently_playing
or (repeat_state := self.currently_playing.get("repeat_state")) is None
):
if not self.currently_playing:
return None
return REPEAT_MODE_MAPPING_TO_HA.get(repeat_state)
return REPEAT_MODE_MAPPING_TO_HA.get(self.currently_playing.repeat_mode)
@spotify_exception_handler
def set_volume_level(self, volume: float) -> None:
async def async_set_volume_level(self, volume: float) -> None:
"""Set the volume level."""
self.coordinator.client.volume(int(volume * 100))
await self.coordinator.client.set_volume(int(volume * 100))
@spotify_exception_handler
def media_play(self) -> None:
async def async_media_play(self) -> None:
"""Start or resume playback."""
self.coordinator.client.start_playback()
await self.coordinator.client.start_playback()
@spotify_exception_handler
def media_pause(self) -> None:
async def async_media_pause(self) -> None:
"""Pause playback."""
self.coordinator.client.pause_playback()
await self.coordinator.client.pause_playback()
@spotify_exception_handler
def media_previous_track(self) -> None:
async def async_media_previous_track(self) -> None:
"""Skip to previous track."""
self.coordinator.client.previous_track()
await self.coordinator.client.previous_track()
@spotify_exception_handler
def media_next_track(self) -> None:
async def async_media_next_track(self) -> None:
"""Skip to next track."""
self.coordinator.client.next_track()
await self.coordinator.client.next_track()
@spotify_exception_handler
def media_seek(self, position: float) -> None:
async def async_media_seek(self, position: float) -> None:
"""Send seek command."""
self.coordinator.client.seek_track(int(position * 1000))
await self.coordinator.client.seek_track(int(position * 1000))
@spotify_exception_handler
def play_media(
async def async_play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Play media."""
@ -363,12 +334,8 @@ class SpotifyMediaPlayer(CoordinatorEntity[SpotifyCoordinator], MediaPlayerEntit
_LOGGER.error("Media type %s is not supported", media_type)
return
if (
self.currently_playing
and not self.currently_playing.get("device")
and self.devices.data
):
kwargs["device_id"] = self.devices.data[0].get("id")
if not self.currently_playing and self.devices.data:
kwargs["device_id"] = self.devices.data[0].device_id
if enqueue == MediaPlayerEnqueue.ADD:
if media_type not in {
@ -379,32 +346,29 @@ class SpotifyMediaPlayer(CoordinatorEntity[SpotifyCoordinator], MediaPlayerEntit
raise ValueError(
f"Media type {media_type} is not supported when enqueue is ADD"
)
self.coordinator.client.add_to_queue(media_id, kwargs.get("device_id"))
return
self.coordinator.client.start_playback(**kwargs)
@spotify_exception_handler
def select_source(self, source: str) -> None:
"""Select playback device."""
for device in self.devices.data:
if device["name"] == source:
self.coordinator.client.transfer_playback(
device["id"], self.state == MediaPlayerState.PLAYING
await self.coordinator.client.add_to_queue(
media_id, kwargs.get("device_id")
)
return
@spotify_exception_handler
def set_shuffle(self, shuffle: bool) -> None:
"""Enable/Disable shuffle mode."""
self.coordinator.client.shuffle(shuffle)
await self.coordinator.client.start_playback(**kwargs)
@spotify_exception_handler
def set_repeat(self, repeat: RepeatMode) -> None:
async def async_select_source(self, source: str) -> None:
"""Select playback device."""
for device in self.devices.data:
if device.name == source:
await self.coordinator.client.transfer_playback(device.device_id)
return
async def async_set_shuffle(self, shuffle: bool) -> None:
"""Enable/Disable shuffle mode."""
await self.coordinator.client.set_shuffle(state=shuffle)
async def async_set_repeat(self, repeat: RepeatMode) -> None:
"""Set repeat mode."""
if repeat not in REPEAT_MODE_MAPPING_TO_SPOTIFY:
raise ValueError(f"Unsupported repeat mode: {repeat}")
self.coordinator.client.repeat(REPEAT_MODE_MAPPING_TO_SPOTIFY[repeat])
await self.coordinator.client.set_repeat(REPEAT_MODE_MAPPING_TO_SPOTIFY[repeat])
async def async_browse_media(
self,
@ -416,7 +380,6 @@ class SpotifyMediaPlayer(CoordinatorEntity[SpotifyCoordinator], MediaPlayerEntit
return await async_browse_media_internal(
self.hass,
self.coordinator.client,
self.coordinator.session,
self.coordinator.current_user,
media_content_type,
media_content_id,

View file

@ -1,7 +1,8 @@
"""Models for use in Spotify integration."""
from dataclasses import dataclass
from typing import Any
from spotifyaio import Device
from homeassistant.helpers.config_entry_oauth2_flow import OAuth2Session
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
@ -15,4 +16,4 @@ class SpotifyData:
coordinator: SpotifyCoordinator
session: OAuth2Session
devices: DataUpdateCoordinator[list[dict[str, Any]]]
devices: DataUpdateCoordinator[list[Device]]

View file

@ -2,8 +2,7 @@
from __future__ import annotations
from typing import Any
from spotifyaio import Image
import yarl
from .const import MEDIA_PLAYER_PREFIX
@ -19,12 +18,11 @@ def resolve_spotify_media_type(media_content_type: str) -> str:
return media_content_type.removeprefix(MEDIA_PLAYER_PREFIX)
def fetch_image_url(item: dict[str, Any], key="images") -> str | None:
def fetch_image_url(images: list[Image]) -> str | None:
"""Fetch image url."""
source = item.get(key, [])
if isinstance(source, list) and source:
return source[0].get("url")
if not images:
return None
return images[0].url
def spotify_uri_from_media_browser_url(media_content_id: str) -> str:

View file

@ -2700,7 +2700,7 @@ speak2mary==1.4.0
speedtest-cli==2.1.3
# homeassistant.components.spotify
spotipy==2.23.0
spotifyaio==0.6.0
# homeassistant.components.sql
sqlparse==0.5.0

View file

@ -2146,7 +2146,7 @@ speak2mary==1.4.0
speedtest-cli==2.1.3
# homeassistant.components.spotify
spotipy==2.23.0
spotifyaio==0.6.0
# homeassistant.components.sql
sqlparse==0.5.0

View file

@ -2,9 +2,33 @@
from collections.abc import Generator
import time
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, patch
import pytest
from spotifyaio.models import (
Album,
Artist,
ArtistResponse,
CategoriesResponse,
Category,
CategoryPlaylistResponse,
Devices,
FeaturedPlaylistResponse,
NewReleasesResponse,
NewReleasesResponseInner,
PlaybackState,
PlayedTrackResponse,
Playlist,
PlaylistResponse,
SavedAlbumResponse,
SavedShowResponse,
SavedTrackResponse,
Show,
ShowEpisodesResponse,
TopArtistsResponse,
TopTracksResponse,
UserProfile,
)
from homeassistant.components.application_credentials import (
ClientCredential,
@ -14,7 +38,7 @@ from homeassistant.components.spotify.const import DOMAIN, SPOTIFY_SCOPES
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry, load_json_value_fixture
from tests.common import MockConfigEntry, load_fixture
SCOPES = " ".join(SPOTIFY_SCOPES)
@ -60,48 +84,74 @@ async def setup_credentials(hass: HomeAssistant) -> None:
@pytest.fixture
def mock_spotify() -> Generator[MagicMock]:
def mock_spotify() -> Generator[AsyncMock]:
"""Mock the Spotify API."""
with (
patch(
"homeassistant.components.spotify.Spotify",
autospec=True,
"homeassistant.components.spotify.SpotifyClient", autospec=True
) as spotify_mock,
patch(
"homeassistant.components.spotify.config_flow.Spotify",
"homeassistant.components.spotify.config_flow.SpotifyClient",
new=spotify_mock,
),
):
client = spotify_mock.return_value
# All these fixtures can be retrieved using the Web API client at
# https://developer.spotify.com/documentation/web-api
current_user = load_json_value_fixture("current_user.json", DOMAIN)
client.current_user.return_value = current_user
client.me.return_value = current_user
for fixture, method in (
("devices.json", "devices"),
("current_user_playlist.json", "current_user_playlists"),
("playback.json", "current_playback"),
("followed_artists.json", "current_user_followed_artists"),
("saved_albums.json", "current_user_saved_albums"),
("saved_tracks.json", "current_user_saved_tracks"),
("saved_shows.json", "current_user_saved_shows"),
("recently_played_tracks.json", "current_user_recently_played"),
("top_artists.json", "current_user_top_artists"),
("top_tracks.json", "current_user_top_tracks"),
("featured_playlists.json", "featured_playlists"),
("categories.json", "categories"),
("category_playlists.json", "category_playlists"),
("category.json", "category"),
("new_releases.json", "new_releases"),
("playlist.json", "playlist"),
("album.json", "album"),
("artist.json", "artist"),
("artist_albums.json", "artist_albums"),
("show_episodes.json", "show_episodes"),
("show.json", "show"),
for fixture, method, obj in (
(
"current_user_playlist.json",
"get_playlists_for_current_user",
PlaylistResponse,
),
("saved_albums.json", "get_saved_albums", SavedAlbumResponse),
("saved_tracks.json", "get_saved_tracks", SavedTrackResponse),
("saved_shows.json", "get_saved_shows", SavedShowResponse),
(
"recently_played_tracks.json",
"get_recently_played_tracks",
PlayedTrackResponse,
),
("top_artists.json", "get_top_artists", TopArtistsResponse),
("top_tracks.json", "get_top_tracks", TopTracksResponse),
("show_episodes.json", "get_show_episodes", ShowEpisodesResponse),
("artist_albums.json", "get_artist_albums", NewReleasesResponseInner),
):
getattr(client, method).return_value = load_json_value_fixture(
fixture, DOMAIN
getattr(client, method).return_value = obj.from_json(
load_fixture(fixture, DOMAIN)
).items
for fixture, method, obj in (
(
"playback.json",
"get_playback",
PlaybackState,
),
("current_user.json", "get_current_user", UserProfile),
("category.json", "get_category", Category),
("playlist.json", "get_playlist", Playlist),
("album.json", "get_album", Album),
("artist.json", "get_artist", Artist),
("show.json", "get_show", Show),
):
getattr(client, method).return_value = obj.from_json(
load_fixture(fixture, DOMAIN)
)
client.get_followed_artists.return_value = ArtistResponse.from_json(
load_fixture("followed_artists.json", DOMAIN)
).artists.items
client.get_featured_playlists.return_value = FeaturedPlaylistResponse.from_json(
load_fixture("featured_playlists.json", DOMAIN)
).playlists.items
client.get_categories.return_value = CategoriesResponse.from_json(
load_fixture("categories.json", DOMAIN)
).categories.items
client.get_category_playlists.return_value = CategoryPlaylistResponse.from_json(
load_fixture("category_playlists.json", DOMAIN)
).playlists.items
client.get_new_releases.return_value = NewReleasesResponse.from_json(
load_fixture("new_releases.json", DOMAIN)
).albums.items
client.get_devices.return_value = Devices.from_json(
load_fixture("devices.json", DOMAIN)
).devices
yield spotify_mock

View file

@ -5,7 +5,7 @@ from ipaddress import ip_address
from unittest.mock import MagicMock, patch
import pytest
from spotipy import SpotifyException
from spotifyaio import SpotifyConnectionError
from homeassistant.components import zeroconf
from homeassistant.components.spotify.const import DOMAIN
@ -111,6 +111,7 @@ async def test_full_flow(
):
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] is FlowResultType.CREATE_ENTRY
assert len(hass.config_entries.async_entries(DOMAIN)) == 1, result
assert result["type"] is FlowResultType.CREATE_ENTRY
@ -122,6 +123,7 @@ async def test_full_flow(
"type": "Bearer",
"expires_in": 60,
}
assert result["result"].unique_id == "1112264111"
@pytest.mark.usefixtures("current_request_with_host")
@ -157,9 +159,7 @@ async def test_abort_if_spotify_error(
},
)
mock_spotify.return_value.current_user.side_effect = SpotifyException(
400, -1, "message"
)
mock_spotify.return_value.get_current_user.side_effect = SpotifyConnectionError
result = await hass.config_entries.flow.async_configure(result["flow_id"])
@ -200,7 +200,7 @@ async def test_reauthentication(
"https://accounts.spotify.com/api/token",
json={
"refresh_token": "new-refresh-token",
"access_token": "mew-access-token",
"access_token": "new-access-token",
"type": "Bearer",
"expires_in": 60,
},
@ -213,11 +213,10 @@ async def test_reauthentication(
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reauth_successful"
mock_config_entry.data["token"].pop("expires_at")
assert mock_config_entry.data["token"] == {
"refresh_token": "new-refresh-token",
"access_token": "mew-access-token",
"access_token": "new-access-token",
"type": "Bearer",
"expires_in": 60,
}
@ -237,9 +236,6 @@ async def test_reauth_account_mismatch(
result = await mock_config_entry.start_reauth_flow(hass)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reauth_confirm"
result = await hass.config_entries.flow.async_configure(result["flow_id"], {})
state = config_entry_oauth2_flow._encode_jwt(
@ -262,7 +258,9 @@ async def test_reauth_account_mismatch(
},
)
mock_spotify.return_value.current_user.return_value["id"] = "new_user_id"
mock_spotify.return_value.get_current_user.return_value.user_id = (
"different_user_id"
)
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] is FlowResultType.ABORT

View file

@ -3,7 +3,7 @@
from unittest.mock import MagicMock
import pytest
from spotipy import SpotifyException
from spotifyaio import SpotifyConnectionError
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
@ -33,8 +33,8 @@ async def test_setup(
@pytest.mark.parametrize(
"method",
[
"me",
"devices",
"get_current_user",
"get_devices",
],
)
async def test_setup_with_required_calls_failing(
@ -44,22 +44,7 @@ async def test_setup_with_required_calls_failing(
method: str,
) -> None:
"""Test the Spotify setup with required calls failing."""
getattr(mock_spotify.return_value, method).side_effect = SpotifyException(
400, "Bad Request", "Bad Request"
)
mock_config_entry.add_to_hass(hass)
assert not await hass.config_entries.async_setup(mock_config_entry.entry_id)
@pytest.mark.usefixtures("setup_credentials")
async def test_no_current_user(
hass: HomeAssistant,
mock_spotify: MagicMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the Spotify setup with required calls failing."""
mock_spotify.return_value.me.return_value = None
getattr(mock_spotify.return_value, method).side_effect = SpotifyConnectionError
mock_config_entry.add_to_hass(hass)
assert not await hass.config_entries.async_setup(mock_config_entry.entry_id)

View file

@ -5,7 +5,12 @@ from unittest.mock import MagicMock, patch
from freezegun.api import FrozenDateTimeFactory
import pytest
from spotipy import SpotifyException
from spotifyaio import (
PlaybackState,
ProductType,
RepeatMode as SpotifyRepeatMode,
SpotifyConnectionError,
)
from syrupy import SnapshotAssertion
from homeassistant.components.media_player import (
@ -49,21 +54,22 @@ from . import setup_integration
from tests.common import (
MockConfigEntry,
async_fire_time_changed,
load_json_value_fixture,
load_fixture,
snapshot_platform,
)
@pytest.mark.freeze_time("2023-10-21")
@pytest.mark.usefixtures("setup_credentials")
async def test_entities(
hass: HomeAssistant,
mock_spotify: MagicMock,
freezer: FrozenDateTimeFactory,
mock_config_entry: MockConfigEntry,
entity_registry: er.EntityRegistry,
snapshot: SnapshotAssertion,
) -> None:
"""Test the Spotify entities."""
freezer.move_to("2023-10-21")
with patch("secrets.token_hex", return_value="mock-token"):
await setup_integration(hass, mock_config_entry)
@ -72,18 +78,19 @@ async def test_entities(
)
@pytest.mark.freeze_time("2023-10-21")
@pytest.mark.usefixtures("setup_credentials")
async def test_podcast(
hass: HomeAssistant,
mock_spotify: MagicMock,
freezer: FrozenDateTimeFactory,
mock_config_entry: MockConfigEntry,
entity_registry: er.EntityRegistry,
snapshot: SnapshotAssertion,
) -> None:
"""Test the Spotify entities while listening a podcast."""
mock_spotify.return_value.current_playback.return_value = load_json_value_fixture(
"playback_episode.json", DOMAIN
freezer.move_to("2023-10-21")
mock_spotify.return_value.get_playback.return_value = PlaybackState.from_json(
load_fixture("playback_episode.json", DOMAIN)
)
with patch("secrets.token_hex", return_value="mock-token"):
await setup_integration(hass, mock_config_entry)
@ -100,7 +107,7 @@ async def test_free_account(
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the Spotify entities with a free account."""
mock_spotify.return_value.me.return_value["product"] = "free"
mock_spotify.return_value.get_current_user.return_value.product = ProductType.FREE
await setup_integration(hass, mock_config_entry)
state = hass.states.get("media_player.spotify_spotify_1")
assert state
@ -114,9 +121,7 @@ async def test_restricted_device(
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the Spotify entities with a restricted device."""
mock_spotify.return_value.current_playback.return_value["device"][
"is_restricted"
] = True
mock_spotify.return_value.get_playback.return_value.device.is_restricted = True
await setup_integration(hass, mock_config_entry)
state = hass.states.get("media_player.spotify_spotify_1")
assert state
@ -132,7 +137,7 @@ async def test_spotify_dj_list(
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the Spotify entities with a Spotify DJ playlist."""
mock_spotify.return_value.current_playback.return_value["context"]["uri"] = (
mock_spotify.return_value.get_playback.return_value.context.uri = (
"spotify:playlist:37i9dQZF1EYkqdzj48dyYq"
)
await setup_integration(hass, mock_config_entry)
@ -148,9 +153,7 @@ async def test_fetching_playlist_does_not_fail(
mock_config_entry: MockConfigEntry,
) -> None:
"""Test failing fetching playlist does not fail update."""
mock_spotify.return_value.playlist.side_effect = SpotifyException(
404, "Not Found", "msg"
)
mock_spotify.return_value.get_playlist.side_effect = SpotifyConnectionError
await setup_integration(hass, mock_config_entry)
state = hass.states.get("media_player.spotify_spotify_1")
assert state
@ -164,7 +167,7 @@ async def test_idle(
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the Spotify entities in idle state."""
mock_spotify.return_value.current_playback.return_value = {}
mock_spotify.return_value.get_playback.return_value = {}
await setup_integration(hass, mock_config_entry)
state = hass.states.get("media_player.spotify_spotify_1")
assert state
@ -211,9 +214,9 @@ async def test_repeat_mode(
"""Test the Spotify media player repeat mode."""
await setup_integration(hass, mock_config_entry)
for mode, spotify_mode in (
(RepeatMode.ALL, "context"),
(RepeatMode.ONE, "track"),
(RepeatMode.OFF, "off"),
(RepeatMode.ALL, SpotifyRepeatMode.CONTEXT),
(RepeatMode.ONE, SpotifyRepeatMode.TRACK),
(RepeatMode.OFF, SpotifyRepeatMode.OFF),
):
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
@ -221,8 +224,8 @@ async def test_repeat_mode(
{ATTR_ENTITY_ID: "media_player.spotify_spotify_1", ATTR_MEDIA_REPEAT: mode},
blocking=True,
)
mock_spotify.return_value.repeat.assert_called_once_with(spotify_mode)
mock_spotify.return_value.repeat.reset_mock()
mock_spotify.return_value.set_repeat.assert_called_once_with(spotify_mode)
mock_spotify.return_value.set_repeat.reset_mock()
@pytest.mark.usefixtures("setup_credentials")
@ -243,8 +246,8 @@ async def test_shuffle(
},
blocking=True,
)
mock_spotify.return_value.shuffle.assert_called_once_with(shuffle)
mock_spotify.return_value.shuffle.reset_mock()
mock_spotify.return_value.set_shuffle.assert_called_once_with(state=shuffle)
mock_spotify.return_value.set_shuffle.reset_mock()
@pytest.mark.usefixtures("setup_credentials")
@ -264,7 +267,7 @@ async def test_volume_level(
},
blocking=True,
)
mock_spotify.return_value.volume.assert_called_with(50)
mock_spotify.return_value.set_volume.assert_called_with(50)
@pytest.mark.usefixtures("setup_credentials")
@ -447,7 +450,7 @@ async def test_select_source(
blocking=True,
)
mock_spotify.return_value.transfer_playback.assert_called_with(
"21dac6b0e0a1f181870fdc9749b2656466557666", True
"21dac6b0e0a1f181870fdc9749b2656466557666"
)
@ -464,9 +467,7 @@ async def test_source_devices(
assert state.attributes[ATTR_INPUT_SOURCE_LIST] == ["DESKTOP-BKC5SIK"]
mock_spotify.return_value.devices.side_effect = SpotifyException(
404, "Not Found", "msg"
)
mock_spotify.return_value.get_devices.side_effect = SpotifyConnectionError
freezer.tick(timedelta(minutes=5))
async_fire_time_changed(hass)
await hass.async_block_till_done()
@ -477,20 +478,6 @@ async def test_source_devices(
assert state.attributes[ATTR_INPUT_SOURCE_LIST] == ["DESKTOP-BKC5SIK"]
@pytest.mark.usefixtures("setup_credentials")
async def test_no_source_devices(
hass: HomeAssistant,
mock_spotify: MagicMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the Spotify media player with no source devices."""
mock_spotify.return_value.devices.return_value = None
await setup_integration(hass, mock_config_entry)
state = hass.states.get("media_player.spotify_spotify_1")
assert ATTR_INPUT_SOURCE_LIST not in state.attributes
@pytest.mark.usefixtures("setup_credentials")
async def test_paused_playback(
hass: HomeAssistant,
@ -498,7 +485,7 @@ async def test_paused_playback(
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the Spotify media player with paused playback."""
mock_spotify.return_value.current_playback.return_value["is_playing"] = False
mock_spotify.return_value.get_playback.return_value.is_playing = False
await setup_integration(hass, mock_config_entry)
state = hass.states.get("media_player.spotify_spotify_1")
assert state
@ -512,9 +499,9 @@ async def test_fallback_show_image(
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the Spotify media player with a fallback image."""
playback = load_json_value_fixture("playback_episode.json", DOMAIN)
playback["item"]["images"] = []
mock_spotify.return_value.current_playback.return_value = playback
playback = PlaybackState.from_json(load_fixture("playback_episode.json", DOMAIN))
playback.item.images = []
mock_spotify.return_value.get_playback.return_value = playback
with patch("secrets.token_hex", return_value="mock-token"):
await setup_integration(hass, mock_config_entry)
state = hass.states.get("media_player.spotify_spotify_1")
@ -532,10 +519,10 @@ async def test_no_episode_images(
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the Spotify media player with no episode images."""
playback = load_json_value_fixture("playback_episode.json", DOMAIN)
playback["item"]["images"] = []
playback["item"]["show"]["images"] = []
mock_spotify.return_value.current_playback.return_value = playback
playback = PlaybackState.from_json(load_fixture("playback_episode.json", DOMAIN))
playback.item.images = []
playback.item.show.images = []
mock_spotify.return_value.get_playback.return_value = playback
await setup_integration(hass, mock_config_entry)
state = hass.states.get("media_player.spotify_spotify_1")
assert state
@ -549,9 +536,7 @@ async def test_no_album_images(
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the Spotify media player with no album images."""
mock_spotify.return_value.current_playback.return_value["item"]["album"][
"images"
] = []
mock_spotify.return_value.get_playback.return_value.item.album.images = []
await setup_integration(hass, mock_config_entry)
state = hass.states.get("media_player.spotify_spotify_1")
assert state