Rename bound TypeVars (#70975)
This commit is contained in:
parent
9672cddb07
commit
cdafbbe10f
5 changed files with 29 additions and 21 deletions
|
@ -61,18 +61,20 @@ from .data import EventListenAddr, get_domain_data
|
||||||
|
|
||||||
PARALLEL_UPDATES = 0
|
PARALLEL_UPDATES = 0
|
||||||
|
|
||||||
_T = TypeVar("_T", bound="DlnaDmrEntity")
|
_DlnaDmrEntityT = TypeVar("_DlnaDmrEntityT", bound="DlnaDmrEntity")
|
||||||
_R = TypeVar("_R")
|
_R = TypeVar("_R")
|
||||||
_P = ParamSpec("_P")
|
_P = ParamSpec("_P")
|
||||||
|
|
||||||
|
|
||||||
def catch_request_errors(
|
def catch_request_errors(
|
||||||
func: Callable[Concatenate[_T, _P], Awaitable[_R]]
|
func: Callable[Concatenate[_DlnaDmrEntityT, _P], Awaitable[_R]]
|
||||||
) -> Callable[Concatenate[_T, _P], Coroutine[Any, Any, _R | None]]:
|
) -> Callable[Concatenate[_DlnaDmrEntityT, _P], Coroutine[Any, Any, _R | None]]:
|
||||||
"""Catch UpnpError errors."""
|
"""Catch UpnpError errors."""
|
||||||
|
|
||||||
@functools.wraps(func)
|
@functools.wraps(func)
|
||||||
async def wrapper(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R | None:
|
async def wrapper(
|
||||||
|
self: _DlnaDmrEntityT, *args: _P.args, **kwargs: _P.kwargs
|
||||||
|
) -> _R | None:
|
||||||
"""Catch UpnpError errors and check availability before and after request."""
|
"""Catch UpnpError errors and check availability before and after request."""
|
||||||
if not self.available:
|
if not self.available:
|
||||||
_LOGGER.warning(
|
_LOGGER.warning(
|
||||||
|
|
|
@ -9,18 +9,20 @@ from typing_extensions import Concatenate, ParamSpec
|
||||||
|
|
||||||
from . import EvilGeniusEntity
|
from . import EvilGeniusEntity
|
||||||
|
|
||||||
_T = TypeVar("_T", bound=EvilGeniusEntity)
|
_EvilGeniusEntityT = TypeVar("_EvilGeniusEntityT", bound=EvilGeniusEntity)
|
||||||
_R = TypeVar("_R")
|
_R = TypeVar("_R")
|
||||||
_P = ParamSpec("_P")
|
_P = ParamSpec("_P")
|
||||||
|
|
||||||
|
|
||||||
def update_when_done(
|
def update_when_done(
|
||||||
func: Callable[Concatenate[_T, _P], Awaitable[_R]]
|
func: Callable[Concatenate[_EvilGeniusEntityT, _P], Awaitable[_R]]
|
||||||
) -> Callable[Concatenate[_T, _P], Coroutine[Any, Any, _R]]:
|
) -> Callable[Concatenate[_EvilGeniusEntityT, _P], Coroutine[Any, Any, _R]]:
|
||||||
"""Decorate function to trigger update when function is done."""
|
"""Decorate function to trigger update when function is done."""
|
||||||
|
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
async def wrapper(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
|
async def wrapper(
|
||||||
|
self: _EvilGeniusEntityT, *args: _P.args, **kwargs: _P.kwargs
|
||||||
|
) -> _R:
|
||||||
"""Wrap function."""
|
"""Wrap function."""
|
||||||
result = await func(self, *args, **kwargs)
|
result = await func(self, *args, **kwargs)
|
||||||
await self.coordinator.async_request_refresh()
|
await self.coordinator.async_request_refresh()
|
||||||
|
|
|
@ -9,21 +9,23 @@ from homeassistant.exceptions import HomeAssistantError
|
||||||
|
|
||||||
from .entity import PlugwiseEntity
|
from .entity import PlugwiseEntity
|
||||||
|
|
||||||
_P = ParamSpec("_P")
|
_PlugwiseEntityT = TypeVar("_PlugwiseEntityT", bound=PlugwiseEntity)
|
||||||
_R = TypeVar("_R")
|
_R = TypeVar("_R")
|
||||||
_T = TypeVar("_T", bound=PlugwiseEntity)
|
_P = ParamSpec("_P")
|
||||||
|
|
||||||
|
|
||||||
def plugwise_command(
|
def plugwise_command(
|
||||||
func: Callable[Concatenate[_T, _P], Awaitable[_R]]
|
func: Callable[Concatenate[_PlugwiseEntityT, _P], Awaitable[_R]]
|
||||||
) -> Callable[Concatenate[_T, _P], Coroutine[Any, Any, _R]]:
|
) -> Callable[Concatenate[_PlugwiseEntityT, _P], Coroutine[Any, Any, _R]]:
|
||||||
"""Decorate Plugwise calls that send commands/make changes to the device.
|
"""Decorate Plugwise calls that send commands/make changes to the device.
|
||||||
|
|
||||||
A decorator that wraps the passed in function, catches Plugwise errors,
|
A decorator that wraps the passed in function, catches Plugwise errors,
|
||||||
and requests an coordinator update to update status of the devices asap.
|
and requests an coordinator update to update status of the devices asap.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
async def handler(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R:
|
async def handler(
|
||||||
|
self: _PlugwiseEntityT, *args: _P.args, **kwargs: _P.kwargs
|
||||||
|
) -> _R:
|
||||||
try:
|
try:
|
||||||
return await func(self, *args, **kwargs)
|
return await func(self, *args, **kwargs)
|
||||||
except PlugwiseException as error:
|
except PlugwiseException as error:
|
||||||
|
|
|
@ -76,7 +76,7 @@ SENSOR_TYPES: tuple[SensorEntityDescription, ...] = (
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
_T = TypeVar("_T", bound="SonarrSensor")
|
_SonarrSensorT = TypeVar("_SonarrSensorT", bound="SonarrSensor")
|
||||||
_P = ParamSpec("_P")
|
_P = ParamSpec("_P")
|
||||||
|
|
||||||
|
|
||||||
|
@ -109,8 +109,8 @@ async def async_setup_entry(
|
||||||
|
|
||||||
|
|
||||||
def sonarr_exception_handler(
|
def sonarr_exception_handler(
|
||||||
func: Callable[Concatenate[_T, _P], Awaitable[None]]
|
func: Callable[Concatenate[_SonarrSensorT, _P], Awaitable[None]]
|
||||||
) -> Callable[Concatenate[_T, _P], Coroutine[Any, Any, None]]:
|
) -> Callable[Concatenate[_SonarrSensorT, _P], Coroutine[Any, Any, None]]:
|
||||||
"""Decorate Sonarr calls to handle Sonarr exceptions.
|
"""Decorate Sonarr calls to handle Sonarr exceptions.
|
||||||
|
|
||||||
A decorator that wraps the passed in function, catches Sonarr errors,
|
A decorator that wraps the passed in function, catches Sonarr errors,
|
||||||
|
@ -118,7 +118,9 @@ def sonarr_exception_handler(
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
async def wrapper(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> None:
|
async def wrapper(
|
||||||
|
self: _SonarrSensorT, *args: _P.args, **kwargs: _P.kwargs
|
||||||
|
) -> None:
|
||||||
try:
|
try:
|
||||||
await func(self, *args, **kwargs)
|
await func(self, *args, **kwargs)
|
||||||
self.last_update_success = True
|
self.last_update_success = True
|
||||||
|
|
|
@ -31,7 +31,7 @@ from .const import DATA_AVAILABLE, DATA_VLC, DEFAULT_NAME, DOMAIN, LOGGER
|
||||||
|
|
||||||
MAX_VOLUME = 500
|
MAX_VOLUME = 500
|
||||||
|
|
||||||
_T = TypeVar("_T", bound="VlcDevice")
|
_VlcDeviceT = TypeVar("_VlcDeviceT", bound="VlcDevice")
|
||||||
_P = ParamSpec("_P")
|
_P = ParamSpec("_P")
|
||||||
|
|
||||||
|
|
||||||
|
@ -48,12 +48,12 @@ async def async_setup_entry(
|
||||||
|
|
||||||
|
|
||||||
def catch_vlc_errors(
|
def catch_vlc_errors(
|
||||||
func: Callable[Concatenate[_T, _P], Awaitable[None]]
|
func: Callable[Concatenate[_VlcDeviceT, _P], Awaitable[None]]
|
||||||
) -> Callable[Concatenate[_T, _P], Coroutine[Any, Any, None]]:
|
) -> Callable[Concatenate[_VlcDeviceT, _P], Coroutine[Any, Any, None]]:
|
||||||
"""Catch VLC errors."""
|
"""Catch VLC errors."""
|
||||||
|
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
async def wrapper(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> None:
|
async def wrapper(self: _VlcDeviceT, *args: _P.args, **kwargs: _P.kwargs) -> None:
|
||||||
"""Catch VLC errors and modify availability."""
|
"""Catch VLC errors and modify availability."""
|
||||||
try:
|
try:
|
||||||
await func(self, *args, **kwargs)
|
await func(self, *args, **kwargs)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue