Move tankerkoenig coordinator and base entity to its own file (#99416)

* Move tankerkoenig coordinator and entity to its own file

* Add coordinator.py and entity.py to .coveragerc
This commit is contained in:
Jan-Philipp Benecke 2023-08-31 19:39:17 +02:00 committed by GitHub
parent 22c5071270
commit 2e7018a152
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 148 additions and 128 deletions

View file

@ -1275,6 +1275,8 @@ omit =
homeassistant/components/tank_utility/sensor.py
homeassistant/components/tankerkoenig/__init__.py
homeassistant/components/tankerkoenig/binary_sensor.py
homeassistant/components/tankerkoenig/coordinator.py
homeassistant/components/tankerkoenig/entity.py
homeassistant/components/tankerkoenig/sensor.py
homeassistant/components/tapsaff/binary_sensor.py
homeassistant/components/tautulli/__init__.py

View file

@ -1,26 +1,18 @@
"""Ask tankerkoenig.de for petrol price information."""
from __future__ import annotations
from datetime import timedelta
import logging
from math import ceil
import pytankerkoenig
from requests.exceptions import RequestException
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_ID, CONF_API_KEY, CONF_SHOW_ON_MAP, Platform
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.exceptions import ConfigEntryNotReady
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
UpdateFailed,
)
from .const import CONF_FUEL_TYPES, CONF_STATIONS, DEFAULT_SCAN_INTERVAL, DOMAIN
from .const import DEFAULT_SCAN_INTERVAL, DOMAIN
from .coordinator import TankerkoenigDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
@ -70,117 +62,3 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
await hass.config_entries.async_reload(entry.entry_id)
class TankerkoenigDataUpdateCoordinator(DataUpdateCoordinator):
"""Get the latest data from the API."""
def __init__(
self,
hass: HomeAssistant,
entry: ConfigEntry,
logger: logging.Logger,
name: str,
update_interval: int,
) -> None:
"""Initialize the data object."""
super().__init__(
hass=hass,
logger=logger,
name=name,
update_interval=timedelta(minutes=update_interval),
)
self._api_key: str = entry.data[CONF_API_KEY]
self._selected_stations: list[str] = entry.data[CONF_STATIONS]
self.stations: dict[str, dict] = {}
self.fuel_types: list[str] = entry.data[CONF_FUEL_TYPES]
self.show_on_map: bool = entry.options[CONF_SHOW_ON_MAP]
def setup(self) -> bool:
"""Set up the tankerkoenig API."""
for station_id in self._selected_stations:
try:
station_data = pytankerkoenig.getStationData(self._api_key, station_id)
except pytankerkoenig.customException as err:
if any(x in str(err).lower() for x in ("api-key", "apikey")):
raise ConfigEntryAuthFailed(err) from err
station_data = {
"ok": False,
"message": err,
"exception": True,
}
if not station_data["ok"]:
_LOGGER.error(
"Error when adding station %s:\n %s",
station_id,
station_data["message"],
)
continue
self.add_station(station_data["station"])
if len(self.stations) > 10:
_LOGGER.warning(
"Found more than 10 stations to check. "
"This might invalidate your api-key on the long run. "
"Try using a smaller radius"
)
return True
async def _async_update_data(self) -> dict:
"""Get the latest data from tankerkoenig.de."""
_LOGGER.debug("Fetching new data from tankerkoenig.de")
station_ids = list(self.stations)
prices = {}
# The API seems to only return at most 10 results, so split the list in chunks of 10
# and merge it together.
for index in range(ceil(len(station_ids) / 10)):
data = await self.hass.async_add_executor_job(
pytankerkoenig.getPriceList,
self._api_key,
station_ids[index * 10 : (index + 1) * 10],
)
_LOGGER.debug("Received data: %s", data)
if not data["ok"]:
raise UpdateFailed(data["message"])
if "prices" not in data:
raise UpdateFailed(
"Did not receive price information from tankerkoenig.de"
)
prices.update(data["prices"])
return prices
def add_station(self, station: dict):
"""Add fuel station to the entity list."""
station_id = station["id"]
if station_id in self.stations:
_LOGGER.warning(
"Sensor for station with id %s was already created", station_id
)
return
self.stations[station_id] = station
_LOGGER.debug("add_station called for station: %s", station)
class TankerkoenigCoordinatorEntity(CoordinatorEntity):
"""Tankerkoenig base entity."""
_attr_has_entity_name = True
def __init__(
self, coordinator: TankerkoenigDataUpdateCoordinator, station: dict
) -> None:
"""Initialize the Tankerkoenig base entity."""
super().__init__(coordinator)
self._attr_device_info = DeviceInfo(
identifiers={(ATTR_ID, station["id"])},
name=f"{station['brand']} {station['street']} {station['houseNumber']}",
model=station["brand"],
configuration_url="https://www.tankerkoenig.de",
entry_type=DeviceEntryType.SERVICE,
)

View file

@ -12,8 +12,9 @@ from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import TankerkoenigCoordinatorEntity, TankerkoenigDataUpdateCoordinator
from .const import DOMAIN
from .coordinator import TankerkoenigDataUpdateCoordinator
from .entity import TankerkoenigCoordinatorEntity
_LOGGER = logging.getLogger(__name__)

View file

@ -0,0 +1,113 @@
"""The Tankerkoenig update coordinator."""
from __future__ import annotations
from datetime import timedelta
import logging
from math import ceil
import pytankerkoenig
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_API_KEY, CONF_SHOW_ON_MAP
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import CONF_FUEL_TYPES, CONF_STATIONS
_LOGGER = logging.getLogger(__name__)
class TankerkoenigDataUpdateCoordinator(DataUpdateCoordinator):
"""Get the latest data from the API."""
def __init__(
self,
hass: HomeAssistant,
entry: ConfigEntry,
logger: logging.Logger,
name: str,
update_interval: int,
) -> None:
"""Initialize the data object."""
super().__init__(
hass=hass,
logger=logger,
name=name,
update_interval=timedelta(minutes=update_interval),
)
self._api_key: str = entry.data[CONF_API_KEY]
self._selected_stations: list[str] = entry.data[CONF_STATIONS]
self.stations: dict[str, dict] = {}
self.fuel_types: list[str] = entry.data[CONF_FUEL_TYPES]
self.show_on_map: bool = entry.options[CONF_SHOW_ON_MAP]
def setup(self) -> bool:
"""Set up the tankerkoenig API."""
for station_id in self._selected_stations:
try:
station_data = pytankerkoenig.getStationData(self._api_key, station_id)
except pytankerkoenig.customException as err:
if any(x in str(err).lower() for x in ("api-key", "apikey")):
raise ConfigEntryAuthFailed(err) from err
station_data = {
"ok": False,
"message": err,
"exception": True,
}
if not station_data["ok"]:
_LOGGER.error(
"Error when adding station %s:\n %s",
station_id,
station_data["message"],
)
continue
self.add_station(station_data["station"])
if len(self.stations) > 10:
_LOGGER.warning(
"Found more than 10 stations to check. "
"This might invalidate your api-key on the long run. "
"Try using a smaller radius"
)
return True
async def _async_update_data(self) -> dict:
"""Get the latest data from tankerkoenig.de."""
_LOGGER.debug("Fetching new data from tankerkoenig.de")
station_ids = list(self.stations)
prices = {}
# The API seems to only return at most 10 results, so split the list in chunks of 10
# and merge it together.
for index in range(ceil(len(station_ids) / 10)):
data = await self.hass.async_add_executor_job(
pytankerkoenig.getPriceList,
self._api_key,
station_ids[index * 10 : (index + 1) * 10],
)
_LOGGER.debug("Received data: %s", data)
if not data["ok"]:
raise UpdateFailed(data["message"])
if "prices" not in data:
raise UpdateFailed(
"Did not receive price information from tankerkoenig.de"
)
prices.update(data["prices"])
return prices
def add_station(self, station: dict):
"""Add fuel station to the entity list."""
station_id = station["id"]
if station_id in self.stations:
_LOGGER.warning(
"Sensor for station with id %s was already created", station_id
)
return
self.stations[station_id] = station
_LOGGER.debug("add_station called for station: %s", station)

View file

@ -0,0 +1,25 @@
"""The tankerkoenig base entity."""
from homeassistant.const import ATTR_ID
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .coordinator import TankerkoenigDataUpdateCoordinator
class TankerkoenigCoordinatorEntity(CoordinatorEntity):
"""Tankerkoenig base entity."""
_attr_has_entity_name = True
def __init__(
self, coordinator: TankerkoenigDataUpdateCoordinator, station: dict
) -> None:
"""Initialize the Tankerkoenig base entity."""
super().__init__(coordinator)
self._attr_device_info = DeviceInfo(
identifiers={(ATTR_ID, station["id"])},
name=f"{station['brand']} {station['street']} {station['houseNumber']}",
model=station["brand"],
configuration_url="https://www.tankerkoenig.de",
entry_type=DeviceEntryType.SERVICE,
)

View file

@ -9,7 +9,6 @@ from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE, CURRENCY_EURO
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import TankerkoenigCoordinatorEntity, TankerkoenigDataUpdateCoordinator
from .const import (
ATTR_BRAND,
ATTR_CITY,
@ -21,6 +20,8 @@ from .const import (
ATTRIBUTION,
DOMAIN,
)
from .coordinator import TankerkoenigDataUpdateCoordinator
from .entity import TankerkoenigCoordinatorEntity
_LOGGER = logging.getLogger(__name__)