Improve entity type hints [c] (#77023)

This commit is contained in:
epenet 2022-08-19 13:02:46 +02:00 committed by GitHub
parent 80c1c11b1a
commit 0f792eb92e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 63 additions and 57 deletions

View file

@ -135,11 +135,13 @@ class WebDavCalendarEntity(CalendarEntity):
"""Return the next upcoming event.""" """Return the next upcoming event."""
return self._event return self._event
async def async_get_events(self, hass, start_date, end_date): async def async_get_events(
self, hass: HomeAssistant, start_date: datetime, end_date: datetime
) -> list[CalendarEvent]:
"""Get all events in a specific time frame.""" """Get all events in a specific time frame."""
return await self.data.async_get_events(hass, start_date, end_date) return await self.data.async_get_events(hass, start_date, end_date)
def update(self): def update(self) -> None:
"""Update event data.""" """Update event data."""
self.data.update() self.data.update()
self._event = self.data.event self._event = self.data.event

View file

@ -1,6 +1,8 @@
"""Support for interfacing with an instance of getchannels.com.""" """Support for interfacing with an instance of getchannels.com."""
from __future__ import annotations from __future__ import annotations
from typing import Any
from pychannels import Channels from pychannels import Channels
import voluptuous as vol import voluptuous as vol
@ -167,7 +169,7 @@ class ChannelsPlayer(MediaPlayerEntity):
return None return None
def update(self): def update(self) -> None:
"""Retrieve latest state.""" """Retrieve latest state."""
self.update_favorite_channels() self.update_favorite_channels()
self.update_state(self.client.status()) self.update_state(self.client.status())
@ -211,39 +213,39 @@ class ChannelsPlayer(MediaPlayerEntity):
return None return None
def mute_volume(self, mute): def mute_volume(self, mute: bool) -> None:
"""Mute (true) or unmute (false) player.""" """Mute (true) or unmute (false) player."""
if mute != self.muted: if mute != self.muted:
response = self.client.toggle_muted() response = self.client.toggle_muted()
self.update_state(response) self.update_state(response)
def media_stop(self): def media_stop(self) -> None:
"""Send media_stop command to player.""" """Send media_stop command to player."""
self.status = "stopped" self.status = "stopped"
response = self.client.stop() response = self.client.stop()
self.update_state(response) self.update_state(response)
def media_play(self): def media_play(self) -> None:
"""Send media_play command to player.""" """Send media_play command to player."""
response = self.client.resume() response = self.client.resume()
self.update_state(response) self.update_state(response)
def media_pause(self): def media_pause(self) -> None:
"""Send media_pause command to player.""" """Send media_pause command to player."""
response = self.client.pause() response = self.client.pause()
self.update_state(response) self.update_state(response)
def media_next_track(self): def media_next_track(self) -> None:
"""Seek ahead.""" """Seek ahead."""
response = self.client.skip_forward() response = self.client.skip_forward()
self.update_state(response) self.update_state(response)
def media_previous_track(self): def media_previous_track(self) -> None:
"""Seek back.""" """Seek back."""
response = self.client.skip_backward() response = self.client.skip_backward()
self.update_state(response) self.update_state(response)
def select_source(self, source): def select_source(self, source: str) -> None:
"""Select a channel to tune to.""" """Select a channel to tune to."""
for channel in self.favorite_channels: for channel in self.favorite_channels:
if channel["name"] == source: if channel["name"] == source:
@ -251,7 +253,7 @@ class ChannelsPlayer(MediaPlayerEntity):
self.update_state(response) self.update_state(response)
break break
def play_media(self, media_type, media_id, **kwargs): def play_media(self, media_type: str, media_id: str, **kwargs: Any) -> None:
"""Send the play_media command to the player.""" """Send the play_media command to the player."""
if media_type == MEDIA_TYPE_CHANNEL: if media_type == MEDIA_TYPE_CHANNEL:
response = self.client.play_channel(media_id) response = self.client.play_channel(media_id)

View file

@ -284,7 +284,7 @@ class CityBikesStation(SensorEntity):
self._station_id = station_id self._station_id = station_id
self.entity_id = entity_id self.entity_id = entity_id
async def async_update(self): async def async_update(self) -> None:
"""Update station state.""" """Update station state."""
for station in self._network.stations: for station in self._network.stations:
if station[ATTR_ID] == self._station_id: if station[ATTR_ID] == self._station_id:

View file

@ -78,7 +78,7 @@ class ClementineDevice(MediaPlayerEntity):
self._client = client self._client = client
self._attr_name = name self._attr_name = name
def update(self): def update(self) -> None:
"""Retrieve the latest data from the Clementine Player.""" """Retrieve the latest data from the Clementine Player."""
try: try:
client = self._client client = self._client
@ -115,14 +115,14 @@ class ClementineDevice(MediaPlayerEntity):
self._attr_state = STATE_OFF self._attr_state = STATE_OFF
raise raise
def select_source(self, source): def select_source(self, source: str) -> None:
"""Select input source.""" """Select input source."""
client = self._client client = self._client
sources = [s for s in client.playlists.values() if s["name"] == source] sources = [s for s in client.playlists.values() if s["name"] == source]
if len(sources) == 1: if len(sources) == 1:
client.change_song(sources[0]["id"], 0) client.change_song(sources[0]["id"], 0)
async def async_get_media_image(self): async def async_get_media_image(self) -> tuple[bytes | None, str | None]:
"""Fetch media image of current playing image.""" """Fetch media image of current playing image."""
if self._client.current_track: if self._client.current_track:
image = bytes(self._client.current_track["art"]) image = bytes(self._client.current_track["art"])
@ -130,45 +130,45 @@ class ClementineDevice(MediaPlayerEntity):
return None, None return None, None
def volume_up(self): def volume_up(self) -> None:
"""Volume up the media player.""" """Volume up the media player."""
newvolume = min(self._client.volume + 4, 100) newvolume = min(self._client.volume + 4, 100)
self._client.set_volume(newvolume) self._client.set_volume(newvolume)
def volume_down(self): def volume_down(self) -> None:
"""Volume down media player.""" """Volume down media player."""
newvolume = max(self._client.volume - 4, 0) newvolume = max(self._client.volume - 4, 0)
self._client.set_volume(newvolume) self._client.set_volume(newvolume)
def mute_volume(self, mute): def mute_volume(self, mute: bool) -> None:
"""Send mute command.""" """Send mute command."""
self._client.set_volume(0) self._client.set_volume(0)
def set_volume_level(self, volume): def set_volume_level(self, volume: float) -> None:
"""Set volume level.""" """Set volume level."""
self._client.set_volume(int(100 * volume)) self._client.set_volume(int(100 * volume))
def media_play_pause(self): def media_play_pause(self) -> None:
"""Simulate play pause media player.""" """Simulate play pause media player."""
if self.state == STATE_PLAYING: if self.state == STATE_PLAYING:
self.media_pause() self.media_pause()
else: else:
self.media_play() self.media_play()
def media_play(self): def media_play(self) -> None:
"""Send play command.""" """Send play command."""
self._attr_state = STATE_PLAYING self._attr_state = STATE_PLAYING
self._client.play() self._client.play()
def media_pause(self): def media_pause(self) -> None:
"""Send media pause command to media player.""" """Send media pause command to media player."""
self._attr_state = STATE_PAUSED self._attr_state = STATE_PAUSED
self._client.pause() self._client.pause()
def media_next_track(self): def media_next_track(self) -> None:
"""Send next track command.""" """Send next track command."""
self._client.next() self._client.next()
def media_previous_track(self): def media_previous_track(self) -> None:
"""Send the previous track command.""" """Send the previous track command."""
self._client.previous() self._client.previous()

View file

@ -56,7 +56,7 @@ class CloudRemoteBinary(BinarySensorEntity):
"""Return True if entity is available.""" """Return True if entity is available."""
return self.cloud.remote.certificate is not None return self.cloud.remote.certificate is not None
async def async_added_to_hass(self): async def async_added_to_hass(self) -> None:
"""Register update dispatcher.""" """Register update dispatcher."""
async def async_state_update(data): async def async_state_update(data):
@ -68,7 +68,7 @@ class CloudRemoteBinary(BinarySensorEntity):
self.hass, DISPATCHER_REMOTE_UPDATE, async_state_update self.hass, DISPATCHER_REMOTE_UPDATE, async_state_update
) )
async def async_will_remove_from_hass(self): async def async_will_remove_from_hass(self) -> None:
"""Register update dispatcher.""" """Register update dispatcher."""
if self._unsub_dispatcher is not None: if self._unsub_dispatcher is not None:
self._unsub_dispatcher() self._unsub_dispatcher()

View file

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import Any
from pycmus import exceptions, remote from pycmus import exceptions, remote
import voluptuous as vol import voluptuous as vol
@ -115,7 +116,7 @@ class CmusDevice(MediaPlayerEntity):
self._attr_name = name or auto_name self._attr_name = name or auto_name
self.status = {} self.status = {}
def update(self): def update(self) -> None:
"""Get the latest data and update the state.""" """Get the latest data and update the state."""
try: try:
status = self._remote.cmus.get_status_dict() status = self._remote.cmus.get_status_dict()
@ -150,19 +151,19 @@ class CmusDevice(MediaPlayerEntity):
_LOGGER.warning("Received no status from cmus") _LOGGER.warning("Received no status from cmus")
def turn_off(self): def turn_off(self) -> None:
"""Service to send the CMUS the command to stop playing.""" """Service to send the CMUS the command to stop playing."""
self._remote.cmus.player_stop() self._remote.cmus.player_stop()
def turn_on(self): def turn_on(self) -> None:
"""Service to send the CMUS the command to start playing.""" """Service to send the CMUS the command to start playing."""
self._remote.cmus.player_play() self._remote.cmus.player_play()
def set_volume_level(self, volume): def set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1.""" """Set volume level, range 0..1."""
self._remote.cmus.set_volume(int(volume * 100)) self._remote.cmus.set_volume(int(volume * 100))
def volume_up(self): def volume_up(self) -> None:
"""Set the volume up.""" """Set the volume up."""
left = self.status["set"].get("vol_left") left = self.status["set"].get("vol_left")
right = self.status["set"].get("vol_right") right = self.status["set"].get("vol_right")
@ -174,7 +175,7 @@ class CmusDevice(MediaPlayerEntity):
if current_volume <= 100: if current_volume <= 100:
self._remote.cmus.set_volume(int(current_volume) + 5) self._remote.cmus.set_volume(int(current_volume) + 5)
def volume_down(self): def volume_down(self) -> None:
"""Set the volume down.""" """Set the volume down."""
left = self.status["set"].get("vol_left") left = self.status["set"].get("vol_left")
right = self.status["set"].get("vol_right") right = self.status["set"].get("vol_right")
@ -186,7 +187,7 @@ class CmusDevice(MediaPlayerEntity):
if current_volume <= 100: if current_volume <= 100:
self._remote.cmus.set_volume(int(current_volume) - 5) self._remote.cmus.set_volume(int(current_volume) - 5)
def play_media(self, media_type, media_id, **kwargs): def play_media(self, media_type: str, media_id: str, **kwargs: Any) -> None:
"""Send the play command.""" """Send the play command."""
if media_type in [MEDIA_TYPE_MUSIC, MEDIA_TYPE_PLAYLIST]: if media_type in [MEDIA_TYPE_MUSIC, MEDIA_TYPE_PLAYLIST]:
self._remote.cmus.player_play_file(media_id) self._remote.cmus.player_play_file(media_id)
@ -198,26 +199,26 @@ class CmusDevice(MediaPlayerEntity):
MEDIA_TYPE_PLAYLIST, MEDIA_TYPE_PLAYLIST,
) )
def media_pause(self): def media_pause(self) -> None:
"""Send the pause command.""" """Send the pause command."""
self._remote.cmus.player_pause() self._remote.cmus.player_pause()
def media_next_track(self): def media_next_track(self) -> None:
"""Send next track command.""" """Send next track command."""
self._remote.cmus.player_next() self._remote.cmus.player_next()
def media_previous_track(self): def media_previous_track(self) -> None:
"""Send next track command.""" """Send next track command."""
self._remote.cmus.player_prev() self._remote.cmus.player_prev()
def media_seek(self, position): def media_seek(self, position: float) -> None:
"""Send seek command.""" """Send seek command."""
self._remote.cmus.seek(position) self._remote.cmus.seek(position)
def media_play(self): def media_play(self) -> None:
"""Send the play command.""" """Send the play command."""
self._remote.cmus.player_play() self._remote.cmus.player_play()
def media_stop(self): def media_stop(self) -> None:
"""Send the stop command.""" """Send the stop command."""
self._remote.cmus.stop() self._remote.cmus.stop()

View file

@ -161,7 +161,7 @@ class AccountSensor(SensorEntity):
ATTR_NATIVE_BALANCE: f"{self._native_balance} {self._native_currency}", ATTR_NATIVE_BALANCE: f"{self._native_balance} {self._native_currency}",
} }
def update(self): def update(self) -> None:
"""Get the latest state of the sensor.""" """Get the latest state of the sensor."""
self._coinbase_data.update() self._coinbase_data.update()
for account in self._coinbase_data.accounts: for account in self._coinbase_data.accounts:
@ -233,7 +233,7 @@ class ExchangeRateSensor(SensorEntity):
"""Return the state attributes of the sensor.""" """Return the state attributes of the sensor."""
return {ATTR_ATTRIBUTION: ATTRIBUTION} return {ATTR_ATTRIBUTION: ATTRIBUTION}
def update(self): def update(self) -> None:
"""Get the latest state of the sensor.""" """Get the latest state of the sensor."""
self._coinbase_data.update() self._coinbase_data.update()
self._state = round( self._state = round(

View file

@ -101,7 +101,7 @@ class ComedHourlyPricingSensor(SensorEntity):
self._attr_name = name self._attr_name = name
self.offset = offset self.offset = offset
async def async_update(self): async def async_update(self) -> None:
"""Get the ComEd Hourly Pricing data from the web service.""" """Get the ComEd Hourly Pricing data from the web service."""
try: try:
sensor_type = self.entity_description.key sensor_type = self.entity_description.key

View file

@ -172,13 +172,13 @@ class CommandSwitch(SwitchEntity):
payload = self._value_template.render_with_possible_json_value(payload) payload = self._value_template.render_with_possible_json_value(payload)
self._attr_is_on = payload.lower() == "true" self._attr_is_on = payload.lower() == "true"
def turn_on(self, **kwargs) -> None: def turn_on(self, **kwargs: Any) -> None:
"""Turn the device on.""" """Turn the device on."""
if self._switch(self._command_on) and not self._command_state: if self._switch(self._command_on) and not self._command_state:
self._attr_is_on = True self._attr_is_on = True
self.schedule_update_ha_state() self.schedule_update_ha_state()
def turn_off(self, **kwargs) -> None: def turn_off(self, **kwargs: Any) -> None:
"""Turn the device off.""" """Turn the device off."""
if self._switch(self._command_off) and not self._command_state: if self._switch(self._command_off) and not self._command_state:
self._attr_is_on = False self._attr_is_on = False

View file

@ -90,7 +90,7 @@ class CompensationSensor(SensorEntity):
self._unique_id = unique_id self._unique_id = unique_id
self._name = name self._name = name
async def async_added_to_hass(self): async def async_added_to_hass(self) -> None:
"""Handle added to Hass.""" """Handle added to Hass."""
self.async_on_remove( self.async_on_remove(
async_track_state_change_event( async_track_state_change_event(

View file

@ -133,7 +133,7 @@ class Concord232ZoneSensor(BinarySensorEntity):
# True means "faulted" or "open" or "abnormal state" # True means "faulted" or "open" or "abnormal state"
return bool(self._zone["state"] != "Normal") return bool(self._zone["state"] != "Normal")
def update(self): def update(self) -> None:
"""Get updated stats from API.""" """Get updated stats from API."""
last_update = dt_util.utcnow() - self._client.last_zone_update last_update = dt_util.utcnow() - self._client.last_zone_update
_LOGGER.debug("Zone: %s ", self._zone) _LOGGER.debug("Zone: %s ", self._zone)

View file

@ -1,5 +1,6 @@
"""CoolMasterNet platform to control of CoolMasterNet Climate Devices.""" """CoolMasterNet platform to control of CoolMasterNet Climate Devices."""
import logging import logging
from typing import Any
from homeassistant.components.climate import ClimateEntity from homeassistant.components.climate import ClimateEntity
from homeassistant.components.climate.const import ClimateEntityFeature, HVACMode from homeassistant.components.climate.const import ClimateEntityFeature, HVACMode
@ -93,7 +94,7 @@ class CoolmasterClimate(CoordinatorEntity, ClimateEntity):
return self.unique_id return self.unique_id
@property @property
def temperature_unit(self): def temperature_unit(self) -> str:
"""Return the unit of measurement.""" """Return the unit of measurement."""
if self._unit.temperature_unit == "celsius": if self._unit.temperature_unit == "celsius":
return TEMP_CELSIUS return TEMP_CELSIUS
@ -134,20 +135,20 @@ class CoolmasterClimate(CoordinatorEntity, ClimateEntity):
"""Return the list of available fan modes.""" """Return the list of available fan modes."""
return FAN_MODES return FAN_MODES
async def async_set_temperature(self, **kwargs): async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set new target temperatures.""" """Set new target temperatures."""
if (temp := kwargs.get(ATTR_TEMPERATURE)) is not None: if (temp := kwargs.get(ATTR_TEMPERATURE)) is not None:
_LOGGER.debug("Setting temp of %s to %s", self.unique_id, str(temp)) _LOGGER.debug("Setting temp of %s to %s", self.unique_id, str(temp))
self._unit = await self._unit.set_thermostat(temp) self._unit = await self._unit.set_thermostat(temp)
self.async_write_ha_state() self.async_write_ha_state()
async def async_set_fan_mode(self, fan_mode): async def async_set_fan_mode(self, fan_mode: str) -> None:
"""Set new fan mode.""" """Set new fan mode."""
_LOGGER.debug("Setting fan mode of %s to %s", self.unique_id, fan_mode) _LOGGER.debug("Setting fan mode of %s to %s", self.unique_id, fan_mode)
self._unit = await self._unit.set_fan_speed(fan_mode) self._unit = await self._unit.set_fan_speed(fan_mode)
self.async_write_ha_state() self.async_write_ha_state()
async def async_set_hvac_mode(self, hvac_mode): async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None:
"""Set new operation mode.""" """Set new operation mode."""
_LOGGER.debug("Setting operation mode of %s to %s", self.unique_id, hvac_mode) _LOGGER.debug("Setting operation mode of %s to %s", self.unique_id, hvac_mode)
@ -157,13 +158,13 @@ class CoolmasterClimate(CoordinatorEntity, ClimateEntity):
self._unit = await self._unit.set_mode(HA_STATE_TO_CM[hvac_mode]) self._unit = await self._unit.set_mode(HA_STATE_TO_CM[hvac_mode])
await self.async_turn_on() await self.async_turn_on()
async def async_turn_on(self): async def async_turn_on(self) -> None:
"""Turn on.""" """Turn on."""
_LOGGER.debug("Turning %s on", self.unique_id) _LOGGER.debug("Turning %s on", self.unique_id)
self._unit = await self._unit.turn_on() self._unit = await self._unit.turn_on()
self.async_write_ha_state() self.async_write_ha_state()
async def async_turn_off(self): async def async_turn_off(self) -> None:
"""Turn off.""" """Turn off."""
_LOGGER.debug("Turning %s off", self.unique_id) _LOGGER.debug("Turning %s off", self.unique_id)
self._unit = await self._unit.turn_off() self._unit = await self._unit.turn_off()

View file

@ -53,7 +53,7 @@ class CoronavirusSensor(CoordinatorEntity, SensorEntity):
self.info_type = info_type self.info_type = info_type
@property @property
def available(self): def available(self) -> bool:
"""Return if sensor is available.""" """Return if sensor is available."""
return self.coordinator.last_update_success and ( return self.coordinator.last_update_success and (
self.country in self.coordinator.data or self.country == OPTION_WORLDWIDE self.country in self.coordinator.data or self.country == OPTION_WORLDWIDE

View file

@ -157,7 +157,7 @@ class CupsSensor(SensorEntity):
ATTR_PRINTER_URI_SUPPORTED: self._printer["printer-uri-supported"], ATTR_PRINTER_URI_SUPPORTED: self._printer["printer-uri-supported"],
} }
def update(self): def update(self) -> None:
"""Get the latest data and updates the states.""" """Get the latest data and updates the states."""
self.data.update() self.data.update()
self._printer = self.data.printers.get(self._name) self._printer = self.data.printers.get(self._name)
@ -234,7 +234,7 @@ class IPPSensor(SensorEntity):
return state_attributes return state_attributes
def update(self): def update(self) -> None:
"""Fetch new state data for the sensor.""" """Fetch new state data for the sensor."""
self.data.update() self.data.update()
self._attributes = self.data.attributes.get(self._name) self._attributes = self.data.attributes.get(self._name)
@ -309,7 +309,7 @@ class MarkerSensor(SensorEntity):
ATTR_PRINTER_NAME: printer_name, ATTR_PRINTER_NAME: printer_name,
} }
def update(self): def update(self) -> None:
"""Update the state of the sensor.""" """Update the state of the sensor."""
# Data fetching is done by CupsSensor/IPPSensor # Data fetching is done by CupsSensor/IPPSensor
self._attributes = self.data.attributes self._attributes = self.data.attributes

View file

@ -99,7 +99,7 @@ class CurrencylayerSensor(SensorEntity):
"""Return the state attributes of the sensor.""" """Return the state attributes of the sensor."""
return {ATTR_ATTRIBUTION: ATTRIBUTION} return {ATTR_ATTRIBUTION: ATTRIBUTION}
def update(self): def update(self) -> None:
"""Update current date.""" """Update current date."""
self.rest.update() self.rest.update()
if (value := self.rest.data) is not None: if (value := self.rest.data) is not None: