Improve entity type hints [p] (#77871)

This commit is contained in:
epenet 2022-09-06 09:51:33 +02:00 committed by GitHub
parent 6f564e4f51
commit 474844744b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 130 additions and 107 deletions

View file

@ -100,7 +100,7 @@ class PanasonicBluRay(MediaPlayerEntity):
"""When was the position of the current playing media valid.""" """When was the position of the current playing media valid."""
return self._position_valid return self._position_valid
def update(self): def update(self) -> None:
"""Update the internal state by querying the device.""" """Update the internal state by querying the device."""
# This can take 5+ seconds to complete # This can take 5+ seconds to complete
state = self._device.get_play_status() state = self._device.get_play_status()
@ -125,7 +125,7 @@ class PanasonicBluRay(MediaPlayerEntity):
self._position_valid = utcnow() self._position_valid = utcnow()
self._duration = state[2] self._duration = state[2]
def turn_off(self): def turn_off(self) -> None:
""" """
Instruct the device to turn standby. Instruct the device to turn standby.
@ -139,21 +139,21 @@ class PanasonicBluRay(MediaPlayerEntity):
self._state = STATE_OFF self._state = STATE_OFF
def turn_on(self): def turn_on(self) -> None:
"""Wake the device back up from standby.""" """Wake the device back up from standby."""
if self._state == STATE_OFF: if self._state == STATE_OFF:
self._device.send_key("POWER") self._device.send_key("POWER")
self._state = STATE_IDLE self._state = STATE_IDLE
def media_play(self): def media_play(self) -> None:
"""Send play command.""" """Send play command."""
self._device.send_key("PLAYBACK") self._device.send_key("PLAYBACK")
def media_pause(self): def media_pause(self) -> None:
"""Send pause command.""" """Send pause command."""
self._device.send_key("PAUSE") self._device.send_key("PAUSE")
def media_stop(self): def media_stop(self) -> None:
"""Send stop command.""" """Send stop command."""
self._device.send_key("STOP") self._device.send_key("STOP")

View file

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import Any
from panasonic_viera import Keys from panasonic_viera import Keys
@ -12,6 +13,7 @@ from homeassistant.components.media_player import (
MediaPlayerEntityFeature, MediaPlayerEntityFeature,
) )
from homeassistant.components.media_player.browse_media import ( from homeassistant.components.media_player.browse_media import (
BrowseMedia,
async_process_play_media_url, async_process_play_media_url,
) )
from homeassistant.components.media_player.const import MEDIA_TYPE_URL from homeassistant.components.media_player.const import MEDIA_TYPE_URL
@ -111,7 +113,7 @@ class PanasonicVieraTVEntity(MediaPlayerEntity):
return self._remote.state return self._remote.state
@property @property
def available(self): def available(self) -> bool:
"""Return True if the device is available.""" """Return True if the device is available."""
return self._remote.available return self._remote.available
@ -125,35 +127,35 @@ class PanasonicVieraTVEntity(MediaPlayerEntity):
"""Boolean if volume is currently muted.""" """Boolean if volume is currently muted."""
return self._remote.muted return self._remote.muted
async def async_update(self): async def async_update(self) -> None:
"""Retrieve the latest data.""" """Retrieve the latest data."""
await self._remote.async_update() await self._remote.async_update()
async def async_turn_on(self): async def async_turn_on(self) -> None:
"""Turn on the media player.""" """Turn on the media player."""
await self._remote.async_turn_on(context=self._context) await self._remote.async_turn_on(context=self._context)
async def async_turn_off(self): async def async_turn_off(self) -> None:
"""Turn off media player.""" """Turn off media player."""
await self._remote.async_turn_off() await self._remote.async_turn_off()
async def async_volume_up(self): async def async_volume_up(self) -> None:
"""Volume up the media player.""" """Volume up the media player."""
await self._remote.async_send_key(Keys.volume_up) await self._remote.async_send_key(Keys.volume_up)
async def async_volume_down(self): async def async_volume_down(self) -> None:
"""Volume down media player.""" """Volume down media player."""
await self._remote.async_send_key(Keys.volume_down) await self._remote.async_send_key(Keys.volume_down)
async def async_mute_volume(self, mute): async def async_mute_volume(self, mute: bool) -> None:
"""Send mute command.""" """Send mute command."""
await self._remote.async_set_mute(mute) await self._remote.async_set_mute(mute)
async def async_set_volume_level(self, volume): async def async_set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1.""" """Set volume level, range 0..1."""
await self._remote.async_set_volume(volume) await self._remote.async_set_volume(volume)
async def async_media_play_pause(self): async def async_media_play_pause(self) -> None:
"""Simulate play pause media player.""" """Simulate play pause media player."""
if self._remote.playing: if self._remote.playing:
await self._remote.async_send_key(Keys.pause) await self._remote.async_send_key(Keys.pause)
@ -162,29 +164,31 @@ class PanasonicVieraTVEntity(MediaPlayerEntity):
await self._remote.async_send_key(Keys.play) await self._remote.async_send_key(Keys.play)
self._remote.playing = True self._remote.playing = True
async def async_media_play(self): async def async_media_play(self) -> None:
"""Send play command.""" """Send play command."""
await self._remote.async_send_key(Keys.play) await self._remote.async_send_key(Keys.play)
self._remote.playing = True self._remote.playing = True
async def async_media_pause(self): async def async_media_pause(self) -> None:
"""Send pause command.""" """Send pause command."""
await self._remote.async_send_key(Keys.pause) await self._remote.async_send_key(Keys.pause)
self._remote.playing = False self._remote.playing = False
async def async_media_stop(self): async def async_media_stop(self) -> None:
"""Stop playback.""" """Stop playback."""
await self._remote.async_send_key(Keys.stop) await self._remote.async_send_key(Keys.stop)
async def async_media_next_track(self): async def async_media_next_track(self) -> None:
"""Send the fast forward command.""" """Send the fast forward command."""
await self._remote.async_send_key(Keys.fast_forward) await self._remote.async_send_key(Keys.fast_forward)
async def async_media_previous_track(self): async def async_media_previous_track(self) -> None:
"""Send the rewind command.""" """Send the rewind command."""
await self._remote.async_send_key(Keys.rewind) await self._remote.async_send_key(Keys.rewind)
async def async_play_media(self, media_type, media_id, **kwargs): async def async_play_media(
self, media_type: str, media_id: str, **kwargs: Any
) -> None:
"""Play media.""" """Play media."""
if media_source.is_media_source_id(media_id): if media_source.is_media_source_id(media_id):
media_type = MEDIA_TYPE_URL media_type = MEDIA_TYPE_URL
@ -200,6 +204,8 @@ class PanasonicVieraTVEntity(MediaPlayerEntity):
media_id = async_process_play_media_url(self.hass, media_id) media_id = async_process_play_media_url(self.hass, media_id)
await self._remote.async_play_media(media_type, media_id) await self._remote.async_play_media(media_type, media_id)
async def async_browse_media(self, media_content_type=None, media_content_id=None): async def async_browse_media(
self, media_content_type: str | None = None, media_content_id: str | None = None
) -> BrowseMedia:
"""Implement the websocket media browsing helper.""" """Implement the websocket media browsing helper."""
return await media_source.async_browse_media(self.hass, media_content_id) return await media_source.async_browse_media(self.hass, media_content_id)

View file

@ -1,6 +1,9 @@
"""Remote control support for Panasonic Viera TV.""" """Remote control support for Panasonic Viera TV."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Iterable
from typing import Any
from homeassistant.components.remote import RemoteEntity from homeassistant.components.remote import RemoteEntity
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME, STATE_ON from homeassistant.const import CONF_NAME, STATE_ON
@ -71,7 +74,7 @@ class PanasonicVieraRemoteEntity(RemoteEntity):
return self._name return self._name
@property @property
def available(self): def available(self) -> bool:
"""Return True if the device is available.""" """Return True if the device is available."""
return self._remote.available return self._remote.available
@ -80,15 +83,15 @@ class PanasonicVieraRemoteEntity(RemoteEntity):
"""Return true if device is on.""" """Return true if device is on."""
return self._remote.state == STATE_ON return self._remote.state == STATE_ON
async def async_turn_on(self, **kwargs): async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the device on.""" """Turn the device on."""
await self._remote.async_turn_on(context=self._context) await self._remote.async_turn_on(context=self._context)
async def async_turn_off(self, **kwargs): async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the device off.""" """Turn the device off."""
await self._remote.async_turn_off() await self._remote.async_turn_off()
async def async_send_command(self, command, **kwargs): async def async_send_command(self, command: Iterable[str], **kwargs: Any) -> None:
"""Send a command to one device.""" """Send a command to one device."""
for cmd in command: for cmd in command:
await self._remote.async_send_key(cmd) await self._remote.async_send_key(cmd)

View file

@ -104,7 +104,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
"""Return the state of the player.""" """Return the state of the player."""
return self._player_state return self._player_state
def turn_on(self): def turn_on(self) -> None:
"""Turn the media player on.""" """Turn the media player on."""
if self._player_state != STATE_OFF: if self._player_state != STATE_OFF:
return return
@ -134,7 +134,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
self._player_state = STATE_IDLE self._player_state = STATE_IDLE
self.schedule_update_ha_state() self.schedule_update_ha_state()
def turn_off(self): def turn_off(self) -> None:
"""Turn the media player off.""" """Turn the media player off."""
if self._pianobar is None: if self._pianobar is None:
_LOGGER.info("Pianobar subprocess already stopped") _LOGGER.info("Pianobar subprocess already stopped")
@ -151,19 +151,19 @@ class PandoraMediaPlayer(MediaPlayerEntity):
self._player_state = STATE_OFF self._player_state = STATE_OFF
self.schedule_update_ha_state() self.schedule_update_ha_state()
def media_play(self): def media_play(self) -> None:
"""Send play command.""" """Send play command."""
self._send_pianobar_command(SERVICE_MEDIA_PLAY_PAUSE) self._send_pianobar_command(SERVICE_MEDIA_PLAY_PAUSE)
self._player_state = STATE_PLAYING self._player_state = STATE_PLAYING
self.schedule_update_ha_state() self.schedule_update_ha_state()
def media_pause(self): def media_pause(self) -> None:
"""Send pause command.""" """Send pause command."""
self._send_pianobar_command(SERVICE_MEDIA_PLAY_PAUSE) self._send_pianobar_command(SERVICE_MEDIA_PLAY_PAUSE)
self._player_state = STATE_PAUSED self._player_state = STATE_PAUSED
self.schedule_update_ha_state() self.schedule_update_ha_state()
def media_next_track(self): def media_next_track(self) -> None:
"""Go to next track.""" """Go to next track."""
self._send_pianobar_command(SERVICE_MEDIA_NEXT_TRACK) self._send_pianobar_command(SERVICE_MEDIA_NEXT_TRACK)
self.schedule_update_ha_state() self.schedule_update_ha_state()
@ -204,7 +204,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
"""Duration of current playing media in seconds.""" """Duration of current playing media in seconds."""
return self._media_duration return self._media_duration
def select_source(self, source): def select_source(self, source: str) -> None:
"""Choose a different Pandora station and play it.""" """Choose a different Pandora station and play it."""
try: try:
station_index = self._stations.index(source) station_index = self._stations.index(source)

View file

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import Any
from pencompy.pencompy import Pencompy from pencompy.pencompy import Pencompy
import voluptuous as vol import voluptuous as vol
@ -90,15 +91,15 @@ class PencomRelay(SwitchEntity):
"""Return a relay's state.""" """Return a relay's state."""
return self._state return self._state
def turn_on(self, **kwargs): def turn_on(self, **kwargs: Any) -> None:
"""Turn a relay on.""" """Turn a relay on."""
self._hub.set(self._board, self._addr, True) self._hub.set(self._board, self._addr, True)
def turn_off(self, **kwargs): def turn_off(self, **kwargs: Any) -> None:
"""Turn a relay off.""" """Turn a relay off."""
self._hub.set(self._board, self._addr, False) self._hub.set(self._board, self._addr, False)
def update(self): def update(self) -> None:
"""Refresh a relay's state.""" """Refresh a relay's state."""
self._state = self._hub.get(self._board, self._addr) self._state = self._hub.get(self._board, self._addr)

View file

@ -1,6 +1,8 @@
"""Media Player component to integrate TVs exposing the Joint Space API.""" """Media Player component to integrate TVs exposing the Joint Space API."""
from __future__ import annotations from __future__ import annotations
from typing import Any
from haphilipsjs import ConnectionFailure from haphilipsjs import ConnectionFailure
from homeassistant.components.media_player import ( from homeassistant.components.media_player import (
@ -135,7 +137,7 @@ class PhilipsTVMediaPlayer(
"""List of available input sources.""" """List of available input sources."""
return list(self._sources.values()) return list(self._sources.values())
async def async_select_source(self, source): async def async_select_source(self, source: str) -> None:
"""Set the input source.""" """Set the input source."""
if source_id := _inverted(self._sources).get(source): if source_id := _inverted(self._sources).get(source):
await self._tv.setSource(source_id) await self._tv.setSource(source_id)
@ -151,7 +153,7 @@ class PhilipsTVMediaPlayer(
"""Boolean if volume is currently muted.""" """Boolean if volume is currently muted."""
return self._tv.muted return self._tv.muted
async def async_turn_on(self): async def async_turn_on(self) -> None:
"""Turn on the device.""" """Turn on the device."""
if self._tv.on and self._tv.powerstate: if self._tv.on and self._tv.powerstate:
await self._tv.setPowerState("On") await self._tv.setPowerState("On")
@ -160,7 +162,7 @@ class PhilipsTVMediaPlayer(
await self.coordinator.turn_on.async_run(self.hass, self._context) await self.coordinator.turn_on.async_run(self.hass, self._context)
await self._async_update_soon() await self._async_update_soon()
async def async_turn_off(self): async def async_turn_off(self) -> None:
"""Turn off the device.""" """Turn off the device."""
if self._state == STATE_ON: if self._state == STATE_ON:
await self._tv.sendKey("Standby") await self._tv.sendKey("Standby")
@ -169,17 +171,17 @@ class PhilipsTVMediaPlayer(
else: else:
_LOGGER.debug("Ignoring turn off when already in expected state") _LOGGER.debug("Ignoring turn off when already in expected state")
async def async_volume_up(self): async def async_volume_up(self) -> None:
"""Send volume up command.""" """Send volume up command."""
await self._tv.sendKey("VolumeUp") await self._tv.sendKey("VolumeUp")
await self._async_update_soon() await self._async_update_soon()
async def async_volume_down(self): async def async_volume_down(self) -> None:
"""Send volume down command.""" """Send volume down command."""
await self._tv.sendKey("VolumeDown") await self._tv.sendKey("VolumeDown")
await self._async_update_soon() await self._async_update_soon()
async def async_mute_volume(self, mute): async def async_mute_volume(self, mute: bool) -> None:
"""Send mute command.""" """Send mute command."""
if self._tv.muted != mute: if self._tv.muted != mute:
await self._tv.sendKey("Mute") await self._tv.sendKey("Mute")
@ -187,22 +189,22 @@ class PhilipsTVMediaPlayer(
else: else:
_LOGGER.debug("Ignoring request when already in expected state") _LOGGER.debug("Ignoring request when already in expected state")
async def async_set_volume_level(self, volume): async def async_set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1.""" """Set volume level, range 0..1."""
await self._tv.setVolume(volume, self._tv.muted) await self._tv.setVolume(volume, self._tv.muted)
await self._async_update_soon() await self._async_update_soon()
async def async_media_previous_track(self): async def async_media_previous_track(self) -> None:
"""Send rewind command.""" """Send rewind command."""
await self._tv.sendKey("Previous") await self._tv.sendKey("Previous")
await self._async_update_soon() await self._async_update_soon()
async def async_media_next_track(self): async def async_media_next_track(self) -> None:
"""Send fast forward command.""" """Send fast forward command."""
await self._tv.sendKey("Next") await self._tv.sendKey("Next")
await self._async_update_soon() await self._async_update_soon()
async def async_media_play_pause(self): async def async_media_play_pause(self) -> None:
"""Send pause command to media player.""" """Send pause command to media player."""
if self._tv.quirk_playpause_spacebar: if self._tv.quirk_playpause_spacebar:
await self._tv.sendUnicode(" ") await self._tv.sendUnicode(" ")
@ -210,17 +212,17 @@ class PhilipsTVMediaPlayer(
await self._tv.sendKey("PlayPause") await self._tv.sendKey("PlayPause")
await self._async_update_soon() await self._async_update_soon()
async def async_media_play(self): async def async_media_play(self) -> None:
"""Send pause command to media player.""" """Send pause command to media player."""
await self._tv.sendKey("Play") await self._tv.sendKey("Play")
await self._async_update_soon() await self._async_update_soon()
async def async_media_pause(self): async def async_media_pause(self) -> None:
"""Send play command to media player.""" """Send play command to media player."""
await self._tv.sendKey("Pause") await self._tv.sendKey("Pause")
await self._async_update_soon() await self._async_update_soon()
async def async_media_stop(self): async def async_media_stop(self) -> None:
"""Send play command to media player.""" """Send play command to media player."""
await self._tv.sendKey("Stop") await self._tv.sendKey("Stop")
await self._async_update_soon() await self._async_update_soon()
@ -268,7 +270,9 @@ class PhilipsTVMediaPlayer(
if app := self._tv.applications.get(self._tv.application_id): if app := self._tv.applications.get(self._tv.application_id):
return app.get("label") return app.get("label")
async def async_play_media(self, media_type, media_id, **kwargs): async def async_play_media(
self, media_type: str, media_id: str, **kwargs: Any
) -> None:
"""Play a piece of media.""" """Play a piece of media."""
_LOGGER.debug("Call play media type <%s>, Id <%s>", media_type, media_id) _LOGGER.debug("Call play media type <%s>, Id <%s>", media_type, media_id)
@ -460,7 +464,7 @@ class PhilipsTVMediaPlayer(
_LOGGER.warning("Failed to fetch image") _LOGGER.warning("Failed to fetch image")
return None, None return None, None
async def async_get_media_image(self): async def async_get_media_image(self) -> tuple[bytes | None, str | None]:
"""Serve album art. Returns (content, content_type).""" """Serve album art. Returns (content, content_type)."""
return await self.async_get_browse_image( return await self.async_get_browse_image(
self.media_content_type, self.media_content_id, None self.media_content_type, self.media_content_id, None

View file

@ -1,5 +1,7 @@
"""Remote control support for Apple TV.""" """Remote control support for Apple TV."""
import asyncio import asyncio
from collections.abc import Iterable
from typing import Any
from homeassistant.components.remote import ( from homeassistant.components.remote import (
ATTR_DELAY_SECS, ATTR_DELAY_SECS,
@ -58,7 +60,7 @@ class PhilipsTVRemote(CoordinatorEntity[PhilipsTVDataUpdateCoordinator], RemoteE
self._tv.on and (self._tv.powerstate == "On" or self._tv.powerstate is None) self._tv.on and (self._tv.powerstate == "On" or self._tv.powerstate is None)
) )
async def async_turn_on(self, **kwargs): async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the device on.""" """Turn the device on."""
if self._tv.on and self._tv.powerstate: if self._tv.on and self._tv.powerstate:
await self._tv.setPowerState("On") await self._tv.setPowerState("On")
@ -66,7 +68,7 @@ class PhilipsTVRemote(CoordinatorEntity[PhilipsTVDataUpdateCoordinator], RemoteE
await self.coordinator.turn_on.async_run(self.hass, self._context) await self.coordinator.turn_on.async_run(self.hass, self._context)
self.async_write_ha_state() self.async_write_ha_state()
async def async_turn_off(self, **kwargs): async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the device off.""" """Turn the device off."""
if self._tv.on: if self._tv.on:
await self._tv.sendKey("Standby") await self._tv.sendKey("Standby")
@ -74,7 +76,7 @@ class PhilipsTVRemote(CoordinatorEntity[PhilipsTVDataUpdateCoordinator], RemoteE
else: else:
LOGGER.debug("Tv was already turned off") LOGGER.debug("Tv was already turned off")
async def async_send_command(self, command, **kwargs): async def async_send_command(self, command: Iterable[str], **kwargs: Any) -> None:
"""Send a command to one device.""" """Send a command to one device."""
num_repeats = kwargs[ATTR_NUM_REPEATS] num_repeats = kwargs[ATTR_NUM_REPEATS]
delay = kwargs.get(ATTR_DELAY_SECS, DEFAULT_DELAY_SECS) delay = kwargs.get(ATTR_DELAY_SECS, DEFAULT_DELAY_SECS)

View file

@ -131,7 +131,7 @@ class PingBinarySensor(RestoreEntity, BinarySensorEntity):
await self._ping.async_update() await self._ping.async_update()
self._available = True self._available = True
async def async_added_to_hass(self): async def async_added_to_hass(self) -> None:
"""Restore previous state on restart to avoid blocking startup.""" """Restore previous state on restart to avoid blocking startup."""
await super().async_added_to_hass() await super().async_added_to_hass()

View file

@ -210,31 +210,31 @@ class PioneerDevice(MediaPlayerEntity):
"""Title of current playing media.""" """Title of current playing media."""
return self._selected_source return self._selected_source
def turn_off(self): def turn_off(self) -> None:
"""Turn off media player.""" """Turn off media player."""
self.telnet_command("PF") self.telnet_command("PF")
def volume_up(self): def volume_up(self) -> None:
"""Volume up media player.""" """Volume up media player."""
self.telnet_command("VU") self.telnet_command("VU")
def volume_down(self): def volume_down(self) -> None:
"""Volume down media player.""" """Volume down media player."""
self.telnet_command("VD") self.telnet_command("VD")
def set_volume_level(self, volume): def set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1.""" """Set volume level, range 0..1."""
# 60dB max # 60dB max
self.telnet_command(f"{round(volume * MAX_VOLUME):03}VL") self.telnet_command(f"{round(volume * MAX_VOLUME):03}VL")
def mute_volume(self, mute): def mute_volume(self, mute: bool) -> None:
"""Mute (true) or unmute (false) media player.""" """Mute (true) or unmute (false) media player."""
self.telnet_command("MO" if mute else "MF") self.telnet_command("MO" if mute else "MF")
def turn_on(self): def turn_on(self) -> None:
"""Turn the media player on.""" """Turn the media player on."""
self.telnet_command("PO") self.telnet_command("PO")
def select_source(self, source): def select_source(self, source: str) -> None:
"""Select input source.""" """Select input source."""
self.telnet_command(f"{self._source_name_to_number.get(source)}FN") self.telnet_command(f"{self._source_name_to_number.get(source)}FN")

View file

@ -107,7 +107,7 @@ class PjLinkDevice(MediaPlayerEntity):
projector.authenticate(self._password) projector.authenticate(self._password)
return projector return projector
def update(self): def update(self) -> None:
"""Get the latest state from the device.""" """Get the latest state from the device."""
with self.projector() as projector: with self.projector() as projector:
@ -161,22 +161,22 @@ class PjLinkDevice(MediaPlayerEntity):
"""Return all available input sources.""" """Return all available input sources."""
return self._source_list return self._source_list
def turn_off(self): def turn_off(self) -> None:
"""Turn projector off.""" """Turn projector off."""
with self.projector() as projector: with self.projector() as projector:
projector.set_power("off") projector.set_power("off")
def turn_on(self): def turn_on(self) -> None:
"""Turn projector on.""" """Turn projector on."""
with self.projector() as projector: with self.projector() as projector:
projector.set_power("on") projector.set_power("on")
def mute_volume(self, mute): def mute_volume(self, mute: bool) -> None:
"""Mute (true) of unmute (false) media player.""" """Mute (true) of unmute (false) media player."""
with self.projector() as projector: with self.projector() as projector:
projector.set_mute(MUTE_AUDIO, mute) projector.set_mute(MUTE_AUDIO, mute)
def select_source(self, source): def select_source(self, source: str) -> None:
"""Set the input source.""" """Set the input source."""
source = self._source_name_mapping[source] source = self._source_name_mapping[source]
with self.projector() as projector: with self.projector() as projector:

View file

@ -4,7 +4,7 @@ from __future__ import annotations
from collections.abc import Callable from collections.abc import Callable
from functools import wraps from functools import wraps
import logging import logging
from typing import TypeVar from typing import Any, TypeVar
import plexapi.exceptions import plexapi.exceptions
import requests.exceptions import requests.exceptions
@ -15,6 +15,7 @@ from homeassistant.components.media_player import (
MediaPlayerEntity, MediaPlayerEntity,
MediaPlayerEntityFeature, MediaPlayerEntityFeature,
) )
from homeassistant.components.media_player.browse_media import BrowseMedia
from homeassistant.components.media_player.const import MEDIA_TYPE_MUSIC from homeassistant.components.media_player.const import MEDIA_TYPE_MUSIC
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_IDLE, STATE_PAUSED, STATE_PLAYING from homeassistant.const import STATE_IDLE, STATE_PAUSED, STATE_PLAYING
@ -147,7 +148,7 @@ class PlexMediaPlayer(MediaPlayerEntity):
# Initializes other attributes # Initializes other attributes
self.session = session self.session = session
async def async_added_to_hass(self): async def async_added_to_hass(self) -> None:
"""Run when about to be added to hass.""" """Run when about to be added to hass."""
_LOGGER.debug("Added %s [%s]", self.entity_id, self.unique_id) _LOGGER.debug("Added %s [%s]", self.entity_id, self.unique_id)
self.async_on_remove( self.async_on_remove(
@ -408,7 +409,7 @@ class PlexMediaPlayer(MediaPlayerEntity):
MediaPlayerEntityFeature.BROWSE_MEDIA | MediaPlayerEntityFeature.PLAY_MEDIA MediaPlayerEntityFeature.BROWSE_MEDIA | MediaPlayerEntityFeature.PLAY_MEDIA
) )
def set_volume_level(self, volume): def set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1.""" """Set volume level, range 0..1."""
if self.device and "playback" in self._device_protocol_capabilities: if self.device and "playback" in self._device_protocol_capabilities:
self.device.setVolume(int(volume * 100), self._active_media_plexapi_type) self.device.setVolume(int(volume * 100), self._active_media_plexapi_type)
@ -432,7 +433,7 @@ class PlexMediaPlayer(MediaPlayerEntity):
return self._volume_muted return self._volume_muted
return None return None
def mute_volume(self, mute): def mute_volume(self, mute: bool) -> None:
"""Mute the volume. """Mute the volume.
Since we can't actually mute, we'll: Since we can't actually mute, we'll:
@ -449,37 +450,37 @@ class PlexMediaPlayer(MediaPlayerEntity):
else: else:
self.set_volume_level(self._previous_volume_level) self.set_volume_level(self._previous_volume_level)
def media_play(self): def media_play(self) -> None:
"""Send play command.""" """Send play command."""
if self.device and "playback" in self._device_protocol_capabilities: if self.device and "playback" in self._device_protocol_capabilities:
self.device.play(self._active_media_plexapi_type) self.device.play(self._active_media_plexapi_type)
def media_pause(self): def media_pause(self) -> None:
"""Send pause command.""" """Send pause command."""
if self.device and "playback" in self._device_protocol_capabilities: if self.device and "playback" in self._device_protocol_capabilities:
self.device.pause(self._active_media_plexapi_type) self.device.pause(self._active_media_plexapi_type)
def media_stop(self): def media_stop(self) -> None:
"""Send stop command.""" """Send stop command."""
if self.device and "playback" in self._device_protocol_capabilities: if self.device and "playback" in self._device_protocol_capabilities:
self.device.stop(self._active_media_plexapi_type) self.device.stop(self._active_media_plexapi_type)
def media_seek(self, position): def media_seek(self, position: float) -> None:
"""Send the seek command.""" """Send the seek command."""
if self.device and "playback" in self._device_protocol_capabilities: if self.device and "playback" in self._device_protocol_capabilities:
self.device.seekTo(position * 1000, self._active_media_plexapi_type) self.device.seekTo(position * 1000, self._active_media_plexapi_type)
def media_next_track(self): def media_next_track(self) -> None:
"""Send next track command.""" """Send next track command."""
if self.device and "playback" in self._device_protocol_capabilities: if self.device and "playback" in self._device_protocol_capabilities:
self.device.skipNext(self._active_media_plexapi_type) self.device.skipNext(self._active_media_plexapi_type)
def media_previous_track(self): def media_previous_track(self) -> None:
"""Send previous track command.""" """Send previous track command."""
if self.device and "playback" in self._device_protocol_capabilities: if self.device and "playback" in self._device_protocol_capabilities:
self.device.skipPrevious(self._active_media_plexapi_type) self.device.skipPrevious(self._active_media_plexapi_type)
def play_media(self, media_type, media_id, **kwargs): def play_media(self, media_type: str, media_id: str, **kwargs: Any) -> None:
"""Play a piece of media.""" """Play a piece of media."""
if not (self.device and "playback" in self._device_protocol_capabilities): if not (self.device and "playback" in self._device_protocol_capabilities):
raise HomeAssistantError( raise HomeAssistantError(
@ -538,7 +539,9 @@ class PlexMediaPlayer(MediaPlayerEntity):
via_device=(PLEX_DOMAIN, self.plex_server.machine_identifier), via_device=(PLEX_DOMAIN, self.plex_server.machine_identifier),
) )
async def async_browse_media(self, media_content_type=None, media_content_id=None): async def async_browse_media(
self, media_content_type: str | None = None, media_content_id: str | None = None
) -> BrowseMedia:
"""Implement the websocket media browsing helper.""" """Implement the websocket media browsing helper."""
is_internal = is_internal_request(self.hass) is_internal = is_internal_request(self.hass)
return await self.hass.async_add_executor_job( return await self.hass.async_add_executor_job(

View file

@ -89,7 +89,7 @@ class PlexSensor(SensorEntity):
function=self._async_refresh_sensor, function=self._async_refresh_sensor,
).async_call ).async_call
async def async_added_to_hass(self): async def async_added_to_hass(self) -> None:
"""Run when about to be added to hass.""" """Run when about to be added to hass."""
server_id = self._server.machine_identifier server_id = self._server.machine_identifier
self.async_on_remove( self.async_on_remove(
@ -100,7 +100,7 @@ class PlexSensor(SensorEntity):
) )
) )
async def _async_refresh_sensor(self): async def _async_refresh_sensor(self) -> None:
"""Set instance object and trigger an entity state update.""" """Set instance object and trigger an entity state update."""
_LOGGER.debug("Refreshing sensor [%s]", self.unique_id) _LOGGER.debug("Refreshing sensor [%s]", self.unique_id)
self._attr_native_value = len(self._server.sensor_attributes) self._attr_native_value = len(self._server.sensor_attributes)
@ -147,7 +147,7 @@ class PlexLibrarySectionSensor(SensorEntity):
self._attr_unique_id = f"library-{self.server_id}-{plex_library_section.uuid}" self._attr_unique_id = f"library-{self.server_id}-{plex_library_section.uuid}"
self._attr_native_unit_of_measurement = "Items" self._attr_native_unit_of_measurement = "Items"
async def async_added_to_hass(self): async def async_added_to_hass(self) -> None:
"""Run when about to be added to hass.""" """Run when about to be added to hass."""
self.async_on_remove( self.async_on_remove(
async_dispatcher_connect( async_dispatcher_connect(
@ -158,7 +158,7 @@ class PlexLibrarySectionSensor(SensorEntity):
) )
await self.async_refresh_sensor() await self.async_refresh_sensor()
async def async_refresh_sensor(self): async def async_refresh_sensor(self) -> None:
"""Update state and attributes for the library sensor.""" """Update state and attributes for the library sensor."""
_LOGGER.debug("Refreshing library sensor for '%s'", self.name) _LOGGER.debug("Refreshing library sensor for '%s'", self.name)
try: try:

View file

@ -68,7 +68,7 @@ class PocketCastsSensor(SensorEntity):
"""Return the icon for the sensor.""" """Return the icon for the sensor."""
return ICON return ICON
def update(self): def update(self) -> None:
"""Update sensor values.""" """Update sensor values."""
try: try:
self._state = len(self._api.new_releases) self._state = len(self._api.new_releases)

View file

@ -77,14 +77,14 @@ class MinutPointBinarySensor(MinutPointEntity, BinarySensorEntity):
self._async_unsub_hook_dispatcher_connect = None self._async_unsub_hook_dispatcher_connect = None
self._events = EVENTS[device_name] self._events = EVENTS[device_name]
async def async_added_to_hass(self): async def async_added_to_hass(self) -> None:
"""Call when entity is added to HOme Assistant.""" """Call when entity is added to HOme Assistant."""
await super().async_added_to_hass() await super().async_added_to_hass()
self._async_unsub_hook_dispatcher_connect = async_dispatcher_connect( self._async_unsub_hook_dispatcher_connect = async_dispatcher_connect(
self.hass, SIGNAL_WEBHOOK, self._webhook_event self.hass, SIGNAL_WEBHOOK, self._webhook_event
) )
async def async_will_remove_from_hass(self): async def async_will_remove_from_hass(self) -> None:
"""Disconnect dispatcher listener when removed.""" """Disconnect dispatcher listener when removed."""
await super().async_will_remove_from_hass() await super().async_will_remove_from_hass()
if self._async_unsub_hook_dispatcher_connect: if self._async_unsub_hook_dispatcher_connect:

View file

@ -1,6 +1,7 @@
"""Control switches.""" """Control switches."""
from datetime import timedelta from datetime import timedelta
import logging import logging
from typing import Any
from ProgettiHWSW.relay import Relay from ProgettiHWSW.relay import Relay
import async_timeout import async_timeout
@ -65,17 +66,17 @@ class ProgettihwswSwitch(CoordinatorEntity, SwitchEntity):
self._switch = switch self._switch = switch
self._name = name self._name = name
async def async_turn_on(self, **kwargs): async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the switch on.""" """Turn the switch on."""
await self._switch.control(True) await self._switch.control(True)
await self.coordinator.async_request_refresh() await self.coordinator.async_request_refresh()
async def async_turn_off(self, **kwargs): async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the switch off.""" """Turn the switch off."""
await self._switch.control(False) await self._switch.control(False)
await self.coordinator.async_request_refresh() await self.coordinator.async_request_refresh()
async def async_toggle(self, **kwargs): async def async_toggle(self, **kwargs: Any) -> None:
"""Toggle the state of switch.""" """Toggle the state of switch."""
await self._switch.toggle() await self._switch.toggle()
await self.coordinator.async_request_refresh() await self.coordinator.async_request_refresh()

View file

@ -1,6 +1,8 @@
"""Support for Proliphix NT10e Thermostats.""" """Support for Proliphix NT10e Thermostats."""
from __future__ import annotations from __future__ import annotations
from typing import Any
import proliphix import proliphix
import voluptuous as vol import voluptuous as vol
@ -63,7 +65,7 @@ class ProliphixThermostat(ClimateEntity):
self._pdp = pdp self._pdp = pdp
self._name = None self._name = None
def update(self): def update(self) -> None:
"""Update the data from the thermostat.""" """Update the data from the thermostat."""
self._pdp.update() self._pdp.update()
self._name = self._pdp.name self._name = self._pdp.name
@ -114,7 +116,7 @@ class ProliphixThermostat(ClimateEntity):
"""Return available HVAC modes.""" """Return available HVAC modes."""
return [] return []
def set_temperature(self, **kwargs): def set_temperature(self, **kwargs: Any) -> None:
"""Set new target temperature.""" """Set new target temperature."""
if (temperature := kwargs.get(ATTR_TEMPERATURE)) is None: if (temperature := kwargs.get(ATTR_TEMPERATURE)) is None:
return return

View file

@ -93,7 +93,7 @@ class ProxmoxBinarySensor(ProxmoxEntity, BinarySensorEntity):
return data["status"] == "running" return data["status"] == "running"
@property @property
def available(self): def available(self) -> bool:
"""Return sensor availability.""" """Return sensor availability."""
return super().available and self.coordinator.data is not None return super().available and self.coordinator.data is not None

View file

@ -130,12 +130,12 @@ class PS4Device(MediaPlayerEntity):
self._region, self._region,
) )
async def async_added_to_hass(self): async def async_added_to_hass(self) -> None:
"""Subscribe PS4 events.""" """Subscribe PS4 events."""
self.hass.data[PS4_DATA].devices.append(self) self.hass.data[PS4_DATA].devices.append(self)
self.check_region() self.check_region()
async def async_update(self): async def async_update(self) -> None:
"""Retrieve the latest data.""" """Retrieve the latest data."""
if self._ps4.ddp_protocol is not None: if self._ps4.ddp_protocol is not None:
# Request Status with asyncio transport. # Request Status with asyncio transport.
@ -365,7 +365,7 @@ class PS4Device(MediaPlayerEntity):
self._unique_id = format_unique_id(self._creds, status["host-id"]) self._unique_id = format_unique_id(self._creds, status["host-id"])
async def async_will_remove_from_hass(self): async def async_will_remove_from_hass(self) -> None:
"""Remove Entity from Home Assistant.""" """Remove Entity from Home Assistant."""
# Close TCP Transport. # Close TCP Transport.
if self._ps4.connected: if self._ps4.connected:
@ -439,27 +439,27 @@ class PS4Device(MediaPlayerEntity):
"""List of available input sources.""" """List of available input sources."""
return self._source_list return self._source_list
async def async_turn_off(self): async def async_turn_off(self) -> None:
"""Turn off media player.""" """Turn off media player."""
await self._ps4.standby() await self._ps4.standby()
async def async_turn_on(self): async def async_turn_on(self) -> None:
"""Turn on the media player.""" """Turn on the media player."""
self._ps4.wakeup() self._ps4.wakeup()
async def async_toggle(self): async def async_toggle(self) -> None:
"""Toggle media player.""" """Toggle media player."""
await self._ps4.toggle() await self._ps4.toggle()
async def async_media_pause(self): async def async_media_pause(self) -> None:
"""Send keypress ps to return to menu.""" """Send keypress ps to return to menu."""
await self.async_send_remote_control("ps") await self.async_send_remote_control("ps")
async def async_media_stop(self): async def async_media_stop(self) -> None:
"""Send keypress ps to return to menu.""" """Send keypress ps to return to menu."""
await self.async_send_remote_control("ps") await self.async_send_remote_control("ps")
async def async_select_source(self, source): async def async_select_source(self, source: str) -> None:
"""Select input source.""" """Select input source."""
for title_id, data in self._games.items(): for title_id, data in self._games.items():
game = data[ATTR_MEDIA_TITLE] game = data[ATTR_MEDIA_TITLE]

View file

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import Any
from pulsectl import Pulse, PulseError from pulsectl import Pulse, PulseError
import voluptuous as vol import voluptuous as vol
@ -100,7 +101,7 @@ class PALoopbackSwitch(SwitchEntity):
return None return None
@property @property
def available(self): def available(self) -> bool:
"""Return true when connected to server.""" """Return true when connected to server."""
return self._pa_svr.connected return self._pa_svr.connected
@ -114,7 +115,7 @@ class PALoopbackSwitch(SwitchEntity):
"""Return true if device is on.""" """Return true if device is on."""
return self._module_idx is not None return self._module_idx is not None
def turn_on(self, **kwargs): def turn_on(self, **kwargs: Any) -> None:
"""Turn the device on.""" """Turn the device on."""
if not self.is_on: if not self.is_on:
self._pa_svr.module_load( self._pa_svr.module_load(
@ -124,13 +125,13 @@ class PALoopbackSwitch(SwitchEntity):
else: else:
_LOGGER.warning(IGNORED_SWITCH_WARN) _LOGGER.warning(IGNORED_SWITCH_WARN)
def turn_off(self, **kwargs): def turn_off(self, **kwargs: Any) -> None:
"""Turn the device off.""" """Turn the device off."""
if self.is_on: if self.is_on:
self._pa_svr.module_unload(self._module_idx) self._pa_svr.module_unload(self._module_idx)
else: else:
_LOGGER.warning(IGNORED_SWITCH_WARN) _LOGGER.warning(IGNORED_SWITCH_WARN)
def update(self): def update(self) -> None:
"""Refresh state in case an alternate process modified this data.""" """Refresh state in case an alternate process modified this data."""
self._module_idx = self._get_module_idx() self._module_idx = self._get_module_idx()

View file

@ -109,7 +109,7 @@ class PushCamera(Camera):
self.webhook_id = webhook_id self.webhook_id = webhook_id
self.webhook_url = webhook.async_generate_url(hass, webhook_id) self.webhook_url = webhook.async_generate_url(hass, webhook_id)
async def async_added_to_hass(self): async def async_added_to_hass(self) -> None:
"""Call when entity is added to hass.""" """Call when entity is added to hass."""
self.hass.data[PUSH_CAMERA_DATA][self.webhook_id] = self self.hass.data[PUSH_CAMERA_DATA][self.webhook_id] = self

View file

@ -114,7 +114,7 @@ class PushBulletNotificationSensor(SensorEntity):
self._attr_name = f"Pushbullet {description.key}" self._attr_name = f"Pushbullet {description.key}"
def update(self): def update(self) -> None:
"""Fetch the latest data from the sensor. """Fetch the latest data from the sensor.
This will fetch the 'sensor reading' into self._state but also all This will fetch the 'sensor reading' into self._state but also all

View file

@ -111,7 +111,7 @@ class PyLoadSensor(SensorEntity):
"""Return the unit of measurement of this entity, if any.""" """Return the unit of measurement of this entity, if any."""
return self._unit_of_measurement return self._unit_of_measurement
def update(self): def update(self) -> None:
"""Update state of sensor.""" """Update state of sensor."""
try: try:
self.api.update() self.api.update()