* Reduce data sharing between ConfigFlow and DataUpdateCoordinator Instead of fetching device information from the device once in `ConfigFlow` and then piping it through in `ConfigEntry.data`, only use as much as needed in `ConfigFlow.async_step_user`, then fetch again in `AirQCoordinator._async_update_data` if a key is missing. Additionally, factor `AirQCoordinator` out into a sumbodule. Add a simple test for `AirQCoordinator.device_info` update. Positive side effect: `AirQCoordinator.device_info` is updated explicitly, instead of dumping the entire content of (a fully compatible) `TypedDict`, retrieved from `aioairq`. * Remove tests ill-suited to this PR `test_config_flow.test_duplicate_error` slipped through by mistake, while `test_coordinator.test_fetch_device_info_on_first_update` may need a more thoroughly suite of accompanying tests * Ignore airq/coordinator.py ...newly separated from airq/__init__.py, that's already in this list * Reorder files alphabetically
61 lines
2 KiB
Python
61 lines
2 KiB
Python
"""The air-Q integration."""
|
|
from __future__ import annotations
|
|
|
|
from datetime import timedelta
|
|
import logging
|
|
|
|
from aioairq import AirQ
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import CONF_IP_ADDRESS, CONF_PASSWORD
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
|
from homeassistant.helpers.entity import DeviceInfo
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
|
|
|
from .const import DOMAIN, MANUFACTURER, TARGET_ROUTE, UPDATE_INTERVAL
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class AirQCoordinator(DataUpdateCoordinator):
|
|
"""Coordinator is responsible for querying the device at a specified route."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
entry: ConfigEntry,
|
|
) -> None:
|
|
"""Initialise a custom coordinator."""
|
|
super().__init__(
|
|
hass,
|
|
_LOGGER,
|
|
name=DOMAIN,
|
|
update_interval=timedelta(seconds=UPDATE_INTERVAL),
|
|
)
|
|
session = async_get_clientsession(hass)
|
|
self.airq = AirQ(
|
|
entry.data[CONF_IP_ADDRESS], entry.data[CONF_PASSWORD], session
|
|
)
|
|
self.device_id = entry.unique_id
|
|
assert self.device_id is not None
|
|
self.device_info = DeviceInfo(
|
|
manufacturer=MANUFACTURER,
|
|
identifiers={(DOMAIN, self.device_id)},
|
|
)
|
|
|
|
async def _async_update_data(self) -> dict:
|
|
"""Fetch the data from the device."""
|
|
if "name" not in self.device_info:
|
|
info = await self.airq.fetch_device_info()
|
|
self.device_info.update(
|
|
DeviceInfo(
|
|
name=info["name"],
|
|
model=info["model"],
|
|
sw_version=info["sw_version"],
|
|
hw_version=info["hw_version"],
|
|
)
|
|
)
|
|
|
|
data = await self.airq.get(TARGET_ROUTE)
|
|
return self.airq.drop_uncertainties_from_data(data)
|