"""MediaPlayer platform for Music Assistant integration.""" from __future__ import annotations import asyncio from collections.abc import Awaitable, Callable, Coroutine, Mapping from contextlib import suppress import functools import os from typing import TYPE_CHECKING, Any from music_assistant_models.enums import ( EventType, MediaType, PlayerFeature, QueueOption, RepeatMode as MassRepeatMode, ) from music_assistant_models.errors import MediaNotFoundError, MusicAssistantError from music_assistant_models.event import MassEvent from music_assistant_models.media_items import ItemMapping, MediaItemType, Track from homeassistant.components import media_source from homeassistant.components.media_player import ( ATTR_MEDIA_EXTRA, BrowseMedia, MediaPlayerDeviceClass, MediaPlayerEnqueue, MediaPlayerEntity, MediaPlayerEntityFeature, MediaPlayerState, MediaType as HAMediaType, RepeatMode, async_process_play_media_url, ) from homeassistant.const import STATE_OFF from homeassistant.core import HomeAssistant from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers import entity_registry as er from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.util.dt import utc_from_timestamp from . import MusicAssistantConfigEntry from .const import ATTR_ACTIVE_QUEUE, ATTR_MASS_PLAYER_TYPE, DOMAIN from .entity import MusicAssistantEntity if TYPE_CHECKING: from music_assistant_client import MusicAssistantClient from music_assistant_models.player import Player from music_assistant_models.player_queue import PlayerQueue SUPPORTED_FEATURES = ( MediaPlayerEntityFeature.PAUSE | MediaPlayerEntityFeature.VOLUME_SET | MediaPlayerEntityFeature.STOP | MediaPlayerEntityFeature.PREVIOUS_TRACK | MediaPlayerEntityFeature.NEXT_TRACK | MediaPlayerEntityFeature.SHUFFLE_SET | MediaPlayerEntityFeature.REPEAT_SET | MediaPlayerEntityFeature.TURN_ON | MediaPlayerEntityFeature.TURN_OFF | MediaPlayerEntityFeature.PLAY | MediaPlayerEntityFeature.PLAY_MEDIA | MediaPlayerEntityFeature.VOLUME_STEP | MediaPlayerEntityFeature.CLEAR_PLAYLIST | MediaPlayerEntityFeature.BROWSE_MEDIA | MediaPlayerEntityFeature.MEDIA_ENQUEUE | MediaPlayerEntityFeature.MEDIA_ANNOUNCE | MediaPlayerEntityFeature.SEEK ) QUEUE_OPTION_MAP = { # map from HA enqueue options to MA enqueue options # which are the same but just in case MediaPlayerEnqueue.ADD: QueueOption.ADD, MediaPlayerEnqueue.NEXT: QueueOption.NEXT, MediaPlayerEnqueue.PLAY: QueueOption.PLAY, MediaPlayerEnqueue.REPLACE: QueueOption.REPLACE, } ATTR_RADIO_MODE = "radio_mode" ATTR_MEDIA_ID = "media_id" ATTR_MEDIA_TYPE = "media_type" ATTR_ARTIST = "artist" ATTR_ALBUM = "album" ATTR_URL = "url" ATTR_USE_PRE_ANNOUNCE = "use_pre_announce" ATTR_ANNOUNCE_VOLUME = "announce_volume" ATTR_SOURCE_PLAYER = "source_player" ATTR_AUTO_PLAY = "auto_play" def catch_musicassistant_error[_R, **P]( func: Callable[..., Awaitable[_R]], ) -> Callable[..., Coroutine[Any, Any, _R | None]]: """Check and log commands to players.""" @functools.wraps(func) async def wrapper( self: MusicAssistantPlayer, *args: P.args, **kwargs: P.kwargs ) -> _R | None: """Catch Music Assistant errors and convert to Home Assistant error.""" try: return await func(self, *args, **kwargs) except MusicAssistantError as err: error_msg = str(err) or err.__class__.__name__ raise HomeAssistantError(error_msg) from err return wrapper async def async_setup_entry( hass: HomeAssistant, entry: MusicAssistantConfigEntry, async_add_entities: AddEntitiesCallback, ) -> None: """Set up Music Assistant MediaPlayer(s) from Config Entry.""" mass = entry.runtime_data.mass added_ids = set() async def handle_player_added(event: MassEvent) -> None: """Handle Mass Player Added event.""" if TYPE_CHECKING: assert event.object_id is not None if event.object_id in added_ids: return added_ids.add(event.object_id) async_add_entities([MusicAssistantPlayer(mass, event.object_id)]) # register listener for new players entry.async_on_unload(mass.subscribe(handle_player_added, EventType.PLAYER_ADDED)) mass_players = [] # add all current players for player in mass.players: added_ids.add(player.player_id) mass_players.append(MusicAssistantPlayer(mass, player.player_id)) async_add_entities(mass_players) class MusicAssistantPlayer(MusicAssistantEntity, MediaPlayerEntity): """Representation of MediaPlayerEntity from Music Assistant Player.""" _attr_name = None _attr_media_image_remotely_accessible = True _attr_media_content_type = HAMediaType.MUSIC def __init__(self, mass: MusicAssistantClient, player_id: str) -> None: """Initialize MediaPlayer entity.""" super().__init__(mass, player_id) self._attr_icon = self.player.icon.replace("mdi-", "mdi:") self._attr_supported_features = SUPPORTED_FEATURES if PlayerFeature.SYNC in self.player.supported_features: self._attr_supported_features |= MediaPlayerEntityFeature.GROUPING self._attr_device_class = MediaPlayerDeviceClass.SPEAKER self._prev_time: float = 0 async def async_added_to_hass(self) -> None: """Register callbacks.""" await super().async_added_to_hass() # we subscribe to player queue time update but we only # accept a state change on big time jumps (e.g. seeking) async def queue_time_updated(event: MassEvent) -> None: if event.object_id != self.player.active_source: return if abs((self._prev_time or 0) - event.data) > 5: await self.async_on_update() self.async_write_ha_state() self._prev_time = event.data self.async_on_remove( self.mass.subscribe( queue_time_updated, EventType.QUEUE_TIME_UPDATED, ) ) @property def active_queue(self) -> PlayerQueue | None: """Return the active queue for this player (if any).""" if not self.player.active_source: return None return self.mass.player_queues.get(self.player.active_source) @property def extra_state_attributes(self) -> Mapping[str, Any]: """Return additional state attributes.""" return { ATTR_MASS_PLAYER_TYPE: self.player.type.value, ATTR_ACTIVE_QUEUE: ( self.active_queue.queue_id if self.active_queue else None ), } async def async_on_update(self) -> None: """Handle player updates.""" if not self.available: return player = self.player active_queue = self.active_queue # update generic attributes if player.powered and active_queue is not None: self._attr_state = MediaPlayerState(active_queue.state.value) if player.powered and player.state is not None: self._attr_state = MediaPlayerState(player.state.value) else: self._attr_state = MediaPlayerState(STATE_OFF) group_members_entity_ids: list[str] = [] if player.group_childs: # translate MA group_childs to HA group_members as entity id's entity_registry = er.async_get(self.hass) group_members_entity_ids = [ entity_id for child_id in player.group_childs if ( entity_id := entity_registry.async_get_entity_id( self.platform.domain, DOMAIN, child_id ) ) ] self._attr_group_members = group_members_entity_ids self._attr_volume_level = ( player.volume_level / 100 if player.volume_level is not None else None ) self._attr_is_volume_muted = player.volume_muted self._update_media_attributes(player, active_queue) self._update_media_image_url(player, active_queue) @catch_musicassistant_error async def async_media_play(self) -> None: """Send play command to device.""" await self.mass.players.player_command_play(self.player_id) @catch_musicassistant_error async def async_media_pause(self) -> None: """Send pause command to device.""" await self.mass.players.player_command_pause(self.player_id) @catch_musicassistant_error async def async_media_stop(self) -> None: """Send stop command to device.""" await self.mass.players.player_command_stop(self.player_id) @catch_musicassistant_error async def async_media_next_track(self) -> None: """Send next track command to device.""" await self.mass.players.player_command_next_track(self.player_id) @catch_musicassistant_error async def async_media_previous_track(self) -> None: """Send previous track command to device.""" await self.mass.players.player_command_previous_track(self.player_id) @catch_musicassistant_error async def async_media_seek(self, position: float) -> None: """Send seek command.""" position = int(position) await self.mass.players.player_command_seek(self.player_id, position) @catch_musicassistant_error async def async_mute_volume(self, mute: bool) -> None: """Mute the volume.""" await self.mass.players.player_command_volume_mute(self.player_id, mute) @catch_musicassistant_error async def async_set_volume_level(self, volume: float) -> None: """Send new volume_level to device.""" volume = int(volume * 100) await self.mass.players.player_command_volume_set(self.player_id, volume) @catch_musicassistant_error async def async_volume_up(self) -> None: """Send new volume_level to device.""" await self.mass.players.player_command_volume_up(self.player_id) @catch_musicassistant_error async def async_volume_down(self) -> None: """Send new volume_level to device.""" await self.mass.players.player_command_volume_down(self.player_id) @catch_musicassistant_error async def async_turn_on(self) -> None: """Turn on device.""" await self.mass.players.player_command_power(self.player_id, True) @catch_musicassistant_error async def async_turn_off(self) -> None: """Turn off device.""" await self.mass.players.player_command_power(self.player_id, False) @catch_musicassistant_error async def async_set_shuffle(self, shuffle: bool) -> None: """Set shuffle state.""" if not self.active_queue: return await self.mass.player_queues.queue_command_shuffle( self.active_queue.queue_id, shuffle ) @catch_musicassistant_error async def async_set_repeat(self, repeat: RepeatMode) -> None: """Set repeat state.""" if not self.active_queue: return await self.mass.player_queues.queue_command_repeat( self.active_queue.queue_id, MassRepeatMode(repeat) ) @catch_musicassistant_error async def async_clear_playlist(self) -> None: """Clear players playlist.""" if TYPE_CHECKING: assert self.player.active_source is not None if queue := self.mass.player_queues.get(self.player.active_source): await self.mass.player_queues.queue_command_clear(queue.queue_id) @catch_musicassistant_error async def async_play_media( self, media_type: MediaType | str, media_id: str, enqueue: MediaPlayerEnqueue | None = None, announce: bool | None = None, **kwargs: Any, ) -> None: """Send the play_media command to the media player.""" if media_source.is_media_source_id(media_id): # Handle media_source sourced_media = await media_source.async_resolve_media( self.hass, media_id, self.entity_id ) media_id = sourced_media.url media_id = async_process_play_media_url(self.hass, media_id) if announce: await self._async_handle_play_announcement( media_id, use_pre_announce=kwargs[ATTR_MEDIA_EXTRA].get("use_pre_announce"), announce_volume=kwargs[ATTR_MEDIA_EXTRA].get("announce_volume"), ) return # forward to our advanced play_media handler await self._async_handle_play_media( media_id=[media_id], enqueue=enqueue, media_type=media_type, radio_mode=kwargs[ATTR_MEDIA_EXTRA].get(ATTR_RADIO_MODE), ) @catch_musicassistant_error async def async_join_players(self, group_members: list[str]) -> None: """Join `group_members` as a player group with the current player.""" player_ids: list[str] = [] for child_entity_id in group_members: # resolve HA entity_id to MA player_id if (hass_state := self.hass.states.get(child_entity_id)) is None: continue if (mass_player_id := hass_state.attributes.get("mass_player_id")) is None: continue player_ids.append(mass_player_id) await self.mass.players.player_command_sync_many(self.player_id, player_ids) @catch_musicassistant_error async def async_unjoin_player(self) -> None: """Remove this player from any group.""" await self.mass.players.player_command_unsync(self.player_id) @catch_musicassistant_error async def _async_handle_play_media( self, media_id: list[str], enqueue: MediaPlayerEnqueue | QueueOption | None = None, radio_mode: bool | None = None, media_type: str | None = None, ) -> None: """Send the play_media command to the media player.""" media_uris: list[str] = [] item: MediaItemType | ItemMapping | None = None # work out (all) uri(s) to play for media_id_str in media_id: # URL or URI string if "://" in media_id_str: media_uris.append(media_id_str) continue # try content id as library id if media_type and media_id_str.isnumeric(): with suppress(MediaNotFoundError): item = await self.mass.music.get_item( MediaType(media_type), media_id_str, "library" ) if isinstance(item, MediaItemType | ItemMapping) and item.uri: media_uris.append(item.uri) continue # try local accessible filename elif await asyncio.to_thread(os.path.isfile, media_id_str): media_uris.append(media_id_str) continue if not media_uris: raise HomeAssistantError( f"Could not resolve {media_id} to playable media item" ) # determine active queue to send the play request to if TYPE_CHECKING: assert self.player.active_source is not None if queue := self.mass.player_queues.get(self.player.active_source): queue_id = queue.queue_id else: queue_id = self.player_id await self.mass.player_queues.play_media( queue_id, media=media_uris, option=self._convert_queueoption_to_media_player_enqueue(enqueue), radio_mode=radio_mode if radio_mode else False, ) @catch_musicassistant_error async def _async_handle_play_announcement( self, url: str, use_pre_announce: bool | None = None, announce_volume: int | None = None, ) -> None: """Send the play_announcement command to the media player.""" await self.mass.players.play_announcement( self.player_id, url, use_pre_announce, announce_volume ) async def async_browse_media( self, media_content_type: MediaType | str | None = None, media_content_id: str | None = None, ) -> BrowseMedia: """Implement the websocket media browsing helper.""" return await media_source.async_browse_media( self.hass, media_content_id, content_filter=lambda item: item.media_content_type.startswith("audio/"), ) def _update_media_image_url( self, player: Player, queue: PlayerQueue | None ) -> None: """Update image URL for the active queue item.""" if queue is None or queue.current_item is None: self._attr_media_image_url = None return if image_url := self.mass.get_media_item_image_url(queue.current_item): self._attr_media_image_remotely_accessible = ( self.mass.server_url not in image_url ) self._attr_media_image_url = image_url return self._attr_media_image_url = None def _update_media_attributes( self, player: Player, queue: PlayerQueue | None ) -> None: """Update media attributes for the active queue item.""" # pylint: disable=too-many-statements self._attr_media_artist = None self._attr_media_album_artist = None self._attr_media_album_name = None self._attr_media_title = None self._attr_media_content_id = None self._attr_media_duration = None self._attr_media_position = None self._attr_media_position_updated_at = None if queue is None and player.current_media: # player has some external source active self._attr_media_content_id = player.current_media.uri self._attr_app_id = player.active_source self._attr_media_title = player.current_media.title self._attr_media_artist = player.current_media.artist self._attr_media_album_name = player.current_media.album self._attr_media_duration = player.current_media.duration # shuffle and repeat are not (yet) supported for external sources self._attr_shuffle = None self._attr_repeat = None if TYPE_CHECKING: assert player.elapsed_time is not None self._attr_media_position = int(player.elapsed_time) self._attr_media_position_updated_at = ( utc_from_timestamp(player.elapsed_time_last_updated) if player.elapsed_time_last_updated else None ) if TYPE_CHECKING: assert player.elapsed_time is not None self._prev_time = player.elapsed_time return if queue is None: # player has no MA queue active self._attr_source = player.active_source self._attr_app_id = player.active_source return # player has an MA queue active (either its own queue or some group queue) self._attr_app_id = DOMAIN self._attr_shuffle = queue.shuffle_enabled self._attr_repeat = queue.repeat_mode.value if not (cur_item := queue.current_item): # queue is empty return self._attr_media_content_id = queue.current_item.uri self._attr_media_duration = queue.current_item.duration self._attr_media_position = int(queue.elapsed_time) self._attr_media_position_updated_at = utc_from_timestamp( queue.elapsed_time_last_updated ) self._prev_time = queue.elapsed_time # handle stream title (radio station icy metadata) if (stream_details := cur_item.streamdetails) and stream_details.stream_title: self._attr_media_album_name = cur_item.name if " - " in stream_details.stream_title: stream_title_parts = stream_details.stream_title.split(" - ", 1) self._attr_media_title = stream_title_parts[1] self._attr_media_artist = stream_title_parts[0] else: self._attr_media_title = stream_details.stream_title return if not (media_item := cur_item.media_item): # queue is not playing a regular media item (edge case?!) self._attr_media_title = cur_item.name return # queue is playing regular media item self._attr_media_title = media_item.name # for tracks we can extract more info if media_item.media_type == MediaType.TRACK: if TYPE_CHECKING: assert isinstance(media_item, Track) self._attr_media_artist = media_item.artist_str if media_item.version: self._attr_media_title += f" ({media_item.version})" if media_item.album: self._attr_media_album_name = media_item.album.name self._attr_media_album_artist = getattr( media_item.album, "artist_str", None ) def _convert_queueoption_to_media_player_enqueue( self, queue_option: MediaPlayerEnqueue | QueueOption | None ) -> QueueOption | None: """Convert a QueueOption to a MediaPlayerEnqueue.""" if isinstance(queue_option, MediaPlayerEnqueue): queue_option = QUEUE_OPTION_MAP.get(queue_option) return queue_option