Refactor Shelly wrapper to coordinator (#79628)

This commit is contained in:
Shay Levy 2022-10-05 15:39:58 +03:00 committed by GitHub
parent 4d3d22320f
commit 22c68b95bf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 336 additions and 321 deletions

View file

@ -14,8 +14,9 @@ import async_timeout
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_DEVICE_ID, CONF_HOST, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.helpers import device_registry, update_coordinator
from homeassistant.helpers import device_registry
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
AIOSHELLY_DEVICE_TIMEOUT_SEC,
@ -44,13 +45,13 @@ from .const import (
from .utils import device_update_info, get_block_device_name, get_rpc_device_name
class BlockDeviceWrapper(update_coordinator.DataUpdateCoordinator):
"""Wrapper for a Shelly block based device with Home Assistant specific functions."""
class ShellyBlockCoordinator(DataUpdateCoordinator):
"""Coordinator for a Shelly block based device."""
def __init__(
self, hass: HomeAssistant, entry: ConfigEntry, device: BlockDevice
) -> None:
"""Initialize the Shelly device wrapper."""
"""Initialize the Shelly block device coordinator."""
self.device_id: str | None = None
if sleep_period := entry.data[CONF_SLEEP_PERIOD]:
@ -186,7 +187,7 @@ class BlockDeviceWrapper(update_coordinator.DataUpdateCoordinator):
"""Fetch data."""
if sleep_period := self.entry.data.get(CONF_SLEEP_PERIOD):
# Sleeping device, no point polling it, just mark it unavailable
raise update_coordinator.UpdateFailed(
raise UpdateFailed(
f"Sleeping device did not update within {sleep_period} seconds interval"
)
@ -196,7 +197,7 @@ class BlockDeviceWrapper(update_coordinator.DataUpdateCoordinator):
await self.device.update()
device_update_info(self.hass, self.device, self.entry)
except OSError as err:
raise update_coordinator.UpdateFailed("Error fetching data") from err
raise UpdateFailed("Error fetching data") from err
@property
def model(self) -> str:
@ -214,7 +215,7 @@ class BlockDeviceWrapper(update_coordinator.DataUpdateCoordinator):
return self.device.firmware_version if self.device.initialized else ""
def async_setup(self) -> None:
"""Set up the wrapper."""
"""Set up the coordinator."""
dev_reg = device_registry.async_get(self.hass)
entry = dev_reg.async_get_or_create(
config_entry_id=self.entry.entry_id,
@ -265,23 +266,23 @@ class BlockDeviceWrapper(update_coordinator.DataUpdateCoordinator):
LOGGER.debug("Result of OTA update call: %s", result)
def shutdown(self) -> None:
"""Shutdown the wrapper."""
"""Shutdown the coordinator."""
self.device.shutdown()
@callback
def _handle_ha_stop(self, _event: Event) -> None:
"""Handle Home Assistant stopping."""
LOGGER.debug("Stopping BlockDeviceWrapper for %s", self.name)
LOGGER.debug("Stopping block device coordinator for %s", self.name)
self.shutdown()
class ShellyDeviceRestWrapper(update_coordinator.DataUpdateCoordinator):
"""Rest Wrapper for a Shelly device with Home Assistant specific functions."""
class ShellyRestCoordinator(DataUpdateCoordinator):
"""Coordinator for a Shelly REST device."""
def __init__(
self, hass: HomeAssistant, device: BlockDevice, entry: ConfigEntry
) -> None:
"""Initialize the Shelly device wrapper."""
"""Initialize the Shelly REST device coordinator."""
if (
device.settings["device"]["type"]
in BATTERY_DEVICES_WITH_PERMANENT_CONNECTION
@ -316,7 +317,7 @@ class ShellyDeviceRestWrapper(update_coordinator.DataUpdateCoordinator):
return
device_update_info(self.hass, self.device, self.entry)
except OSError as err:
raise update_coordinator.UpdateFailed("Error fetching data") from err
raise UpdateFailed("Error fetching data") from err
@property
def mac(self) -> str:
@ -324,13 +325,13 @@ class ShellyDeviceRestWrapper(update_coordinator.DataUpdateCoordinator):
return cast(str, self.device.settings["device"]["mac"])
class RpcDeviceWrapper(update_coordinator.DataUpdateCoordinator):
"""Wrapper for a Shelly RPC based device with Home Assistant specific functions."""
class ShellyRpcCoordinator(DataUpdateCoordinator):
"""Coordinator for a Shelly RPC based device."""
def __init__(
self, hass: HomeAssistant, entry: ConfigEntry, device: RpcDevice
) -> None:
"""Initialize the Shelly device wrapper."""
"""Initialize the Shelly RPC device coordinator."""
self.device_id: str | None = None
device_name = get_rpc_device_name(device) if device.initialized else entry.title
@ -413,7 +414,7 @@ class RpcDeviceWrapper(update_coordinator.DataUpdateCoordinator):
await self.device.initialize()
device_update_info(self.hass, self.device, self.entry)
except OSError as err:
raise update_coordinator.UpdateFailed("Device disconnected") from err
raise UpdateFailed("Device disconnected") from err
@property
def model(self) -> str:
@ -431,7 +432,7 @@ class RpcDeviceWrapper(update_coordinator.DataUpdateCoordinator):
return self.device.firmware_version if self.device.initialized else ""
def async_setup(self) -> None:
"""Set up the wrapper."""
"""Set up the coordinator."""
dev_reg = device_registry.async_get(self.hass)
entry = dev_reg.async_get_or_create(
config_entry_id=self.entry.entry_id,
@ -482,17 +483,17 @@ class RpcDeviceWrapper(update_coordinator.DataUpdateCoordinator):
LOGGER.debug("OTA update call successful")
async def shutdown(self) -> None:
"""Shutdown the wrapper."""
"""Shutdown the coordinator."""
await self.device.shutdown()
async def _handle_ha_stop(self, _event: Event) -> None:
"""Handle Home Assistant stopping."""
LOGGER.debug("Stopping RpcDeviceWrapper for %s", self.name)
LOGGER.debug("Stopping RPC device coordinator for %s", self.name)
await self.shutdown()
class RpcPollingWrapper(update_coordinator.DataUpdateCoordinator):
"""Polling Wrapper for a Shelly RPC based device."""
class ShellyRpcPollingCoordinator(DataUpdateCoordinator):
"""Polling coordinator for a Shelly RPC based device."""
def __init__(
self, hass: HomeAssistant, entry: ConfigEntry, device: RpcDevice
@ -513,14 +514,14 @@ class RpcPollingWrapper(update_coordinator.DataUpdateCoordinator):
async def _async_update_data(self) -> None:
"""Fetch data."""
if not self.device.connected:
raise update_coordinator.UpdateFailed("Device disconnected")
raise UpdateFailed("Device disconnected")
try:
LOGGER.debug("Polling Shelly RPC Device - %s", self.name)
async with async_timeout.timeout(AIOSHELLY_DEVICE_TIMEOUT_SEC):
await self.device.update_status()
except (OSError, aioshelly.exceptions.RPCTimeout) as err:
raise update_coordinator.UpdateFailed("Device disconnected") from err
raise UpdateFailed("Device disconnected") from err
@property
def model(self) -> str: