Implement coordinator in Trafikverket Train (#96916)

* Implement coordinator TVT

* Review comments

* Review changes
This commit is contained in:
G Johansson 2023-07-20 10:40:34 +02:00 committed by GitHub
parent 4e460f71f8
commit db76bf3a9f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 189 additions and 127 deletions

View file

@ -1317,6 +1317,7 @@ omit =
homeassistant/components/tradfri/sensor.py
homeassistant/components/tradfri/switch.py
homeassistant/components/trafikverket_train/__init__.py
homeassistant/components/trafikverket_train/coordinator.py
homeassistant/components/trafikverket_train/sensor.py
homeassistant/components/trafikverket_weatherstation/__init__.py
homeassistant/components/trafikverket_weatherstation/coordinator.py

View file

@ -15,6 +15,7 @@ from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import CONF_FROM, CONF_TO, DOMAIN, PLATFORMS
from .coordinator import TVDataUpdateCoordinator
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
@ -34,11 +35,9 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
f" {entry.data[CONF_TO]}. Error: {error} "
) from error
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = {
CONF_TO: to_station,
CONF_FROM: from_station,
"train_api": train_api,
}
coordinator = TVDataUpdateCoordinator(hass, entry, to_station, from_station)
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

View file

@ -0,0 +1,149 @@
"""DataUpdateCoordinator for the Trafikverket Train integration."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import date, datetime, time, timedelta
import logging
from pytrafikverket import TrafikverketTrain
from pytrafikverket.trafikverket_train import StationInfo, TrainStop
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_API_KEY, CONF_WEEKDAY, WEEKDAYS
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
from .const import CONF_TIME, DOMAIN
@dataclass
class TrainData:
"""Dataclass for Trafikverket Train data."""
departure_time: datetime | None
departure_state: str
cancelled: bool
delayed_time: int | None
planned_time: datetime | None
estimated_time: datetime | None
actual_time: datetime | None
other_info: str | None
deviation: str | None
_LOGGER = logging.getLogger(__name__)
TIME_BETWEEN_UPDATES = timedelta(minutes=5)
def _next_weekday(fromdate: date, weekday: int) -> date:
"""Return the date of the next time a specific weekday happen."""
days_ahead = weekday - fromdate.weekday()
if days_ahead <= 0:
days_ahead += 7
return fromdate + timedelta(days_ahead)
def _next_departuredate(departure: list[str]) -> date:
"""Calculate the next departuredate from an array input of short days."""
today_date = date.today()
today_weekday = date.weekday(today_date)
if WEEKDAYS[today_weekday] in departure:
return today_date
for day in departure:
next_departure = WEEKDAYS.index(day)
if next_departure > today_weekday:
return _next_weekday(today_date, next_departure)
return _next_weekday(today_date, WEEKDAYS.index(departure[0]))
def _get_as_utc(date_value: datetime | None) -> datetime | None:
"""Return utc datetime or None."""
if date_value:
return dt_util.as_utc(date_value)
return None
def _get_as_joined(information: list[str] | None) -> str | None:
"""Return joined information or None."""
if information:
return ", ".join(information)
return None
class TVDataUpdateCoordinator(DataUpdateCoordinator[TrainData]):
"""A Trafikverket Data Update Coordinator."""
def __init__(
self,
hass: HomeAssistant,
entry: ConfigEntry,
to_station: StationInfo,
from_station: StationInfo,
) -> None:
"""Initialize the Trafikverket coordinator."""
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=TIME_BETWEEN_UPDATES,
)
self._train_api = TrafikverketTrain(
async_get_clientsession(hass), entry.data[CONF_API_KEY]
)
self.from_station: StationInfo = from_station
self.to_station: StationInfo = to_station
self._time: time | None = dt_util.parse_time(entry.data[CONF_TIME])
self._weekdays: list[str] = entry.data[CONF_WEEKDAY]
async def _async_update_data(self) -> TrainData:
"""Fetch data from Trafikverket."""
when = dt_util.now()
state: TrainStop | None = None
if self._time:
departure_day = _next_departuredate(self._weekdays)
when = datetime.combine(
departure_day,
self._time,
dt_util.get_time_zone(self.hass.config.time_zone),
)
try:
if self._time:
state = await self._train_api.async_get_train_stop(
self.from_station, self.to_station, when
)
else:
state = await self._train_api.async_get_next_train_stop(
self.from_station, self.to_station, when
)
except ValueError as error:
if "Invalid authentication" in error.args[0]:
raise ConfigEntryAuthFailed from error
raise UpdateFailed(
f"Train departure {when} encountered a problem: {error}"
) from error
departure_time = state.advertised_time_at_location
if state.estimated_time_at_location:
departure_time = state.estimated_time_at_location
elif state.time_at_location:
departure_time = state.time_at_location
delay_time = state.get_delay_time()
states = TrainData(
departure_time=_get_as_utc(departure_time),
departure_state=state.get_state().value,
cancelled=state.canceled,
delayed_time=delay_time.seconds if delay_time else None,
planned_time=_get_as_utc(state.advertised_time_at_location),
estimated_time=_get_as_utc(state.estimated_time_at_location),
actual_time=_get_as_utc(state.time_at_location),
other_info=_get_as_joined(state.other_information),
deviation=_get_as_joined(state.deviations),
)
return states

View file

@ -1,31 +1,25 @@
"""Train information for departures and delays, provided by Trafikverket."""
from __future__ import annotations
from datetime import date, datetime, time, timedelta
import logging
from typing import TYPE_CHECKING, Any
from datetime import time, timedelta
from typing import TYPE_CHECKING
from pytrafikverket import TrafikverketTrain
from pytrafikverket.exceptions import (
MultipleTrainAnnouncementFound,
NoTrainAnnouncementFound,
)
from pytrafikverket.trafikverket_train import StationInfo, TrainStop
from pytrafikverket.trafikverket_train import StationInfo
from homeassistant.components.sensor import SensorDeviceClass, SensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME, CONF_WEEKDAY, WEEKDAYS
from homeassistant.core import HomeAssistant
from homeassistant.const import CONF_NAME, CONF_WEEKDAY
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import dt as dt_util
from .const import CONF_FROM, CONF_TIME, CONF_TO, DOMAIN
from .const import CONF_TIME, DOMAIN
from .coordinator import TVDataUpdateCoordinator
from .util import create_unique_id
_LOGGER = logging.getLogger(__name__)
ATTR_DEPARTURE_STATE = "departure_state"
ATTR_CANCELED = "canceled"
ATTR_DELAY_TIME = "number_of_minutes_delayed"
@ -44,16 +38,17 @@ async def async_setup_entry(
) -> None:
"""Set up the Trafikverket sensor entry."""
train_api = hass.data[DOMAIN][entry.entry_id]["train_api"]
to_station = hass.data[DOMAIN][entry.entry_id][CONF_TO]
from_station = hass.data[DOMAIN][entry.entry_id][CONF_FROM]
coordinator: TVDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
to_station = coordinator.to_station
from_station = coordinator.from_station
get_time: str | None = entry.data.get(CONF_TIME)
train_time = dt_util.parse_time(get_time) if get_time else None
async_add_entities(
[
TrainSensor(
train_api,
coordinator,
entry.data[CONF_NAME],
from_station,
to_station,
@ -66,33 +61,7 @@ async def async_setup_entry(
)
def next_weekday(fromdate: date, weekday: int) -> date:
"""Return the date of the next time a specific weekday happen."""
days_ahead = weekday - fromdate.weekday()
if days_ahead <= 0:
days_ahead += 7
return fromdate + timedelta(days_ahead)
def next_departuredate(departure: list[str]) -> date:
"""Calculate the next departuredate from an array input of short days."""
today_date = date.today()
today_weekday = date.weekday(today_date)
if WEEKDAYS[today_weekday] in departure:
return today_date
for day in departure:
next_departure = WEEKDAYS.index(day)
if next_departure > today_weekday:
return next_weekday(today_date, next_departure)
return next_weekday(today_date, WEEKDAYS.index(departure[0]))
def _to_iso_format(traintime: datetime) -> str:
"""Return isoformatted utc time."""
return dt_util.as_utc(traintime).isoformat()
class TrainSensor(SensorEntity):
class TrainSensor(CoordinatorEntity[TVDataUpdateCoordinator], SensorEntity):
"""Contains data about a train depature."""
_attr_icon = ICON
@ -102,7 +71,7 @@ class TrainSensor(SensorEntity):
def __init__(
self,
train_api: TrafikverketTrain,
coordinator: TVDataUpdateCoordinator,
name: str,
from_station: StationInfo,
to_station: StationInfo,
@ -111,11 +80,7 @@ class TrainSensor(SensorEntity):
entry_id: str,
) -> None:
"""Initialize the sensor."""
self._train_api = train_api
self._from_station = from_station
self._to_station = to_station
self._weekday = weekday
self._time = departuretime
super().__init__(coordinator)
self._attr_device_info = DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, entry_id)},
@ -129,80 +94,28 @@ class TrainSensor(SensorEntity):
self._attr_unique_id = create_unique_id(
from_station.name, to_station.name, departuretime, weekday
)
self._update_attr()
async def async_update(self) -> None:
@callback
def _handle_coordinator_update(self) -> None:
self._update_attr()
return super()._handle_coordinator_update()
@callback
def _update_attr(self) -> None:
"""Retrieve latest state."""
when = dt_util.now()
_state: TrainStop | None = None
if self._time:
departure_day = next_departuredate(self._weekday)
when = datetime.combine(
departure_day,
self._time,
dt_util.get_time_zone(self.hass.config.time_zone),
)
try:
if self._time:
_LOGGER.debug("%s, %s, %s", self._from_station, self._to_station, when)
_state = await self._train_api.async_get_train_stop(
self._from_station, self._to_station, when
)
else:
_state = await self._train_api.async_get_next_train_stop(
self._from_station, self._to_station, when
)
except (NoTrainAnnouncementFound, MultipleTrainAnnouncementFound) as error:
_LOGGER.error("Departure %s encountered a problem: %s", when, error)
if not _state:
self._attr_available = False
self._attr_native_value = None
self._attr_extra_state_attributes = {}
return
data = self.coordinator.data
self._attr_available = True
self._attr_native_value = data.departure_time
# The original datetime doesn't provide a timezone so therefore attaching it here.
if TYPE_CHECKING:
assert _state.advertised_time_at_location
self._attr_native_value = dt_util.as_utc(_state.advertised_time_at_location)
if _state.time_at_location:
self._attr_native_value = dt_util.as_utc(_state.time_at_location)
if _state.estimated_time_at_location:
self._attr_native_value = dt_util.as_utc(_state.estimated_time_at_location)
self._update_attributes(_state)
def _update_attributes(self, state: TrainStop) -> None:
"""Return extra state attributes."""
attributes: dict[str, Any] = {
ATTR_DEPARTURE_STATE: state.get_state().value,
ATTR_CANCELED: state.canceled,
ATTR_DELAY_TIME: None,
ATTR_PLANNED_TIME: None,
ATTR_ESTIMATED_TIME: None,
ATTR_ACTUAL_TIME: None,
ATTR_OTHER_INFORMATION: None,
ATTR_DEVIATIONS: None,
self._attr_extra_state_attributes = {
ATTR_DEPARTURE_STATE: data.departure_state,
ATTR_CANCELED: data.cancelled,
ATTR_DELAY_TIME: data.delayed_time,
ATTR_PLANNED_TIME: data.planned_time,
ATTR_ESTIMATED_TIME: data.estimated_time,
ATTR_ACTUAL_TIME: data.actual_time,
ATTR_OTHER_INFORMATION: data.other_info,
ATTR_DEVIATIONS: data.deviation,
}
if delay_in_minutes := state.get_delay_time():
attributes[ATTR_DELAY_TIME] = delay_in_minutes.total_seconds() / 60
if advert_time := state.advertised_time_at_location:
attributes[ATTR_PLANNED_TIME] = _to_iso_format(advert_time)
if est_time := state.estimated_time_at_location:
attributes[ATTR_ESTIMATED_TIME] = _to_iso_format(est_time)
if time_location := state.time_at_location:
attributes[ATTR_ACTUAL_TIME] = _to_iso_format(time_location)
if other_info := state.other_information:
attributes[ATTR_OTHER_INFORMATION] = ", ".join(other_info)
if deviation := state.deviations:
attributes[ATTR_DEVIATIONS] = ", ".join(deviation)
self._attr_extra_state_attributes = attributes