diff --git a/homeassistant/components/cast/media_player.py b/homeassistant/components/cast/media_player.py index da32dfd6ae7..75d3de06856 100644 --- a/homeassistant/components/cast/media_player.py +++ b/homeassistant/components/cast/media_player.py @@ -7,6 +7,7 @@ from contextlib import suppress from datetime import datetime import json import logging +from typing import Any import pychromecast from pychromecast.controllers.homeassistant import HomeAssistantController @@ -52,7 +53,8 @@ from homeassistant.const import ( STATE_PAUSED, STATE_PLAYING, ) -from homeassistant.core import HomeAssistant, callback +from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback +from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers import config_validation as cv from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.entity import DeviceInfo @@ -232,9 +234,9 @@ class CastDevice: self._status_listener = CastStatusListener( self, chromecast, self.mz_mgr, self._mz_only ) - self._chromecast.start() + chromecast.start() - async def _async_disconnect(self): + async def _async_disconnect(self) -> None: """Disconnect Chromecast object if it is set.""" if self._chromecast is not None: _LOGGER.debug( @@ -246,7 +248,7 @@ class CastDevice: self._invalidate() - def _invalidate(self): + def _invalidate(self) -> None: """Invalidate some attributes.""" self._chromecast = None self.mz_mgr = None @@ -254,7 +256,7 @@ class CastDevice: self._status_listener.invalidate() self._status_listener = None - async def _async_cast_discovered(self, discover: ChromecastInfo): + async def _async_cast_discovered(self, discover: ChromecastInfo) -> None: """Handle discovery of new Chromecast.""" if self._cast_info.uuid != discover.uuid: # Discovered is not our device. @@ -263,13 +265,19 @@ class CastDevice: _LOGGER.debug("Discovered chromecast with same UUID: %s", discover) self._cast_info = discover - async def _async_cast_removed(self, discover: ChromecastInfo): + async def _async_cast_removed(self, discover: ChromecastInfo) -> None: """Handle removal of Chromecast.""" - async def _async_stop(self, event): + async def _async_stop(self, event: Event) -> None: """Disconnect socket on Home Assistant stop.""" await self._async_disconnect() + def _get_chromecast(self) -> pychromecast.Chromecast: + """Ensure chromecast is available, to facilitate type checking.""" + if self._chromecast is None: + raise HomeAssistantError("Chromecast is not available.") + return self._chromecast + class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): """Representation of a Cast device on the network.""" @@ -292,7 +300,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): self._attr_available = False self._hass_cast_controller: HomeAssistantController | None = None - self._cast_view_remove_handler = None + self._cast_view_remove_handler: CALLBACK_TYPE | None = None self._attr_unique_id = str(cast_info.uuid) self._attr_device_info = DeviceInfo( identifiers={(CAST_DOMAIN, str(cast_info.uuid).replace("-", ""))}, @@ -307,7 +315,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): ]: self._attr_device_class = MediaPlayerDeviceClass.SPEAKER - async def async_added_to_hass(self): + async def async_added_to_hass(self) -> None: """Create chromecast object when added to hass.""" self._async_setup(self.entity_id) @@ -491,62 +499,63 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): return media_controller - def turn_on(self): + def turn_on(self) -> None: """Turn on the cast device.""" - if not self._chromecast.is_idle: + chromecast = self._get_chromecast() + if not chromecast.is_idle: # Already turned on return - if self._chromecast.app_id is not None: + if chromecast.app_id is not None: # Quit the previous app before starting splash screen or media player - self._chromecast.quit_app() + chromecast.quit_app() # The only way we can turn the Chromecast is on is by launching an app - if self._chromecast.cast_type == pychromecast.const.CAST_TYPE_CHROMECAST: + if chromecast.cast_type == pychromecast.const.CAST_TYPE_CHROMECAST: app_data = {"media_id": CAST_SPLASH, "media_type": "image/png"} - quick_play(self._chromecast, "default_media_receiver", app_data) + quick_play(chromecast, "default_media_receiver", app_data) else: - self._chromecast.start_app(pychromecast.config.APP_MEDIA_RECEIVER) + chromecast.start_app(pychromecast.config.APP_MEDIA_RECEIVER) - def turn_off(self): + def turn_off(self) -> None: """Turn off the cast device.""" - self._chromecast.quit_app() + self._get_chromecast().quit_app() - def mute_volume(self, mute): + def mute_volume(self, mute: bool) -> None: """Mute the volume.""" - self._chromecast.set_volume_muted(mute) + self._get_chromecast().set_volume_muted(mute) - def set_volume_level(self, volume): + def set_volume_level(self, volume: float) -> None: """Set volume level, range 0..1.""" - self._chromecast.set_volume(volume) + self._get_chromecast().set_volume(volume) - def media_play(self): + def media_play(self) -> None: """Send play command.""" media_controller = self._media_controller() media_controller.play() - def media_pause(self): + def media_pause(self) -> None: """Send pause command.""" media_controller = self._media_controller() media_controller.pause() - def media_stop(self): + def media_stop(self) -> None: """Send stop command.""" media_controller = self._media_controller() media_controller.stop() - def media_previous_track(self): + def media_previous_track(self) -> None: """Send previous track command.""" media_controller = self._media_controller() media_controller.queue_prev() - def media_next_track(self): + def media_next_track(self) -> None: """Send next track command.""" media_controller = self._media_controller() media_controller.queue_next() - def media_seek(self, position): + def media_seek(self, position: float) -> None: """Seek the media to a specific location.""" media_controller = self._media_controller() media_controller.seek(position) @@ -589,11 +598,14 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): children=sorted(children, key=lambda c: c.title), ) - async def async_browse_media(self, media_content_type=None, media_content_id=None): + async def async_browse_media( + self, media_content_type: str | None = None, media_content_id: str | None = None + ) -> BrowseMedia: """Implement the websocket media browsing helper.""" content_filter = None - if self._chromecast.cast_type in ( + chromecast = self._get_chromecast() + if chromecast.cast_type in ( pychromecast.const.CAST_TYPE_AUDIO, pychromecast.const.CAST_TYPE_GROUP, ): @@ -612,7 +624,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): self.hass, media_content_type, media_content_id, - self._chromecast.cast_type, + chromecast.cast_type, ) if browse_media: return browse_media @@ -621,8 +633,11 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): self.hass, media_content_id, content_filter=content_filter ) - async def async_play_media(self, media_type, media_id, **kwargs): + async def async_play_media( + self, media_type: str, media_id: str, **kwargs: Any + ) -> None: """Play a piece of media.""" + chromecast = self._get_chromecast() # Handle media_source if media_source.is_media_source_id(media_id): sourced_media = await media_source.async_resolve_media( @@ -648,9 +663,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): if "app_id" in app_data: app_id = app_data.pop("app_id") _LOGGER.info("Starting Cast app by ID %s", app_id) - await self.hass.async_add_executor_job( - self._chromecast.start_app, app_id - ) + await self.hass.async_add_executor_job(chromecast.start_app, app_id) if app_data: _LOGGER.warning( "Extra keys %s were ignored. Please use app_name to cast media", @@ -661,7 +674,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): app_name = app_data.pop("app_name") try: await self.hass.async_add_executor_job( - quick_play, self._chromecast, app_name, app_data + quick_play, chromecast, app_name, app_data ) except NotImplementedError: _LOGGER.error("App %s not supported", app_name) @@ -670,7 +683,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): # Try the cast platforms for platform in self.hass.data[CAST_DOMAIN]["cast_platform"].values(): result = await platform.async_play_media( - self.hass, self.entity_id, self._chromecast, media_type, media_id + self.hass, self.entity_id, chromecast, media_type, media_id ) if result: return @@ -735,7 +748,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): app_data, ) await self.hass.async_add_executor_job( - quick_play, self._chromecast, "default_media_receiver", app_data + quick_play, chromecast, "default_media_receiver", app_data ) def _media_status(self): @@ -761,7 +774,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): return (media_status, media_status_received) @property - def state(self): + def state(self) -> str | None: """Return the state of the player.""" # The lovelace app loops media to prevent timing out, don't show that if self.app_id == CAST_APP_ID_HOMEASSISTANT_LOVELACE: @@ -785,7 +798,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): return None @property - def media_content_id(self): + def media_content_id(self) -> str | None: """Content ID of current playing media.""" # The lovelace app loops media to prevent timing out, don't show that if self.app_id == CAST_APP_ID_HOMEASSISTANT_LOVELACE: @@ -794,7 +807,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): return media_status.content_id if media_status else None @property - def media_content_type(self): + def media_content_type(self) -> str | None: """Content type of current playing media.""" # The lovelace app loops media to prevent timing out, don't show that if self.app_id == CAST_APP_ID_HOMEASSISTANT_LOVELACE: