Improve gdacs typing (#108040)
This commit is contained in:
parent
84038fb119
commit
f968b43f6a
4 changed files with 70 additions and 49 deletions
|
@ -1,8 +1,13 @@
|
||||||
"""The Global Disaster Alert and Coordination System (GDACS) integration."""
|
"""The Global Disaster Alert and Coordination System (GDACS) integration."""
|
||||||
from datetime import timedelta
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections.abc import Callable
|
||||||
|
from datetime import datetime, timedelta
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from aio_georss_client.status_update import StatusUpdate
|
||||||
from aio_georss_gdacs import GdacsFeedManager
|
from aio_georss_gdacs import GdacsFeedManager
|
||||||
|
from aio_georss_gdacs.feed_entry import FeedEntry
|
||||||
|
|
||||||
from homeassistant.config_entries import ConfigEntry
|
from homeassistant.config_entries import ConfigEntry
|
||||||
from homeassistant.const import (
|
from homeassistant.const import (
|
||||||
|
@ -50,7 +55,7 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b
|
||||||
|
|
||||||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
"""Unload an GDACS component config entry."""
|
"""Unload an GDACS component config entry."""
|
||||||
manager = hass.data[DOMAIN][FEED].pop(entry.entry_id)
|
manager: GdacsFeedEntityManager = hass.data[DOMAIN][FEED].pop(entry.entry_id)
|
||||||
await manager.async_stop()
|
await manager.async_stop()
|
||||||
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
|
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
|
||||||
|
|
||||||
|
@ -58,7 +63,9 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
class GdacsFeedEntityManager:
|
class GdacsFeedEntityManager:
|
||||||
"""Feed Entity Manager for GDACS feed."""
|
"""Feed Entity Manager for GDACS feed."""
|
||||||
|
|
||||||
def __init__(self, hass, config_entry, radius_in_km):
|
def __init__(
|
||||||
|
self, hass: HomeAssistant, config_entry: ConfigEntry, radius_in_km: float
|
||||||
|
) -> None:
|
||||||
"""Initialize the Feed Entity Manager."""
|
"""Initialize the Feed Entity Manager."""
|
||||||
self._hass = hass
|
self._hass = hass
|
||||||
self._config_entry = config_entry
|
self._config_entry = config_entry
|
||||||
|
@ -80,18 +87,18 @@ class GdacsFeedEntityManager:
|
||||||
)
|
)
|
||||||
self._config_entry_id = config_entry.entry_id
|
self._config_entry_id = config_entry.entry_id
|
||||||
self._scan_interval = timedelta(seconds=config_entry.data[CONF_SCAN_INTERVAL])
|
self._scan_interval = timedelta(seconds=config_entry.data[CONF_SCAN_INTERVAL])
|
||||||
self._track_time_remove_callback = None
|
self._track_time_remove_callback: Callable[[], None] | None = None
|
||||||
self._status_info = None
|
self._status_info: StatusUpdate | None = None
|
||||||
self.listeners = []
|
self.listeners: list[Callable[[], None]] = []
|
||||||
|
|
||||||
async def async_init(self):
|
async def async_init(self) -> None:
|
||||||
"""Schedule initial and regular updates based on configured time interval."""
|
"""Schedule initial and regular updates based on configured time interval."""
|
||||||
|
|
||||||
await self._hass.config_entries.async_forward_entry_setups(
|
await self._hass.config_entries.async_forward_entry_setups(
|
||||||
self._config_entry, PLATFORMS
|
self._config_entry, PLATFORMS
|
||||||
)
|
)
|
||||||
|
|
||||||
async def update(event_time):
|
async def update(event_time: datetime) -> None:
|
||||||
"""Update."""
|
"""Update."""
|
||||||
await self.async_update()
|
await self.async_update()
|
||||||
|
|
||||||
|
@ -102,12 +109,12 @@ class GdacsFeedEntityManager:
|
||||||
|
|
||||||
_LOGGER.debug("Feed entity manager initialized")
|
_LOGGER.debug("Feed entity manager initialized")
|
||||||
|
|
||||||
async def async_update(self):
|
async def async_update(self) -> None:
|
||||||
"""Refresh data."""
|
"""Refresh data."""
|
||||||
await self._feed_manager.update()
|
await self._feed_manager.update()
|
||||||
_LOGGER.debug("Feed entity manager updated")
|
_LOGGER.debug("Feed entity manager updated")
|
||||||
|
|
||||||
async def async_stop(self):
|
async def async_stop(self) -> None:
|
||||||
"""Stop this feed entity manager from refreshing."""
|
"""Stop this feed entity manager from refreshing."""
|
||||||
for unsub_dispatcher in self.listeners:
|
for unsub_dispatcher in self.listeners:
|
||||||
unsub_dispatcher()
|
unsub_dispatcher()
|
||||||
|
@ -117,19 +124,19 @@ class GdacsFeedEntityManager:
|
||||||
_LOGGER.debug("Feed entity manager stopped")
|
_LOGGER.debug("Feed entity manager stopped")
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def async_event_new_entity(self):
|
def async_event_new_entity(self) -> str:
|
||||||
"""Return manager specific event to signal new entity."""
|
"""Return manager specific event to signal new entity."""
|
||||||
return f"gdacs_new_geolocation_{self._config_entry_id}"
|
return f"gdacs_new_geolocation_{self._config_entry_id}"
|
||||||
|
|
||||||
def get_entry(self, external_id):
|
def get_entry(self, external_id: str) -> FeedEntry | None:
|
||||||
"""Get feed entry by external id."""
|
"""Get feed entry by external id."""
|
||||||
return self._feed_manager.feed_entries.get(external_id)
|
return self._feed_manager.feed_entries.get(external_id)
|
||||||
|
|
||||||
def status_info(self):
|
def status_info(self) -> StatusUpdate | None:
|
||||||
"""Return latest status update info received."""
|
"""Return latest status update info received."""
|
||||||
return self._status_info
|
return self._status_info
|
||||||
|
|
||||||
async def _generate_entity(self, external_id):
|
async def _generate_entity(self, external_id: str) -> None:
|
||||||
"""Generate new entity."""
|
"""Generate new entity."""
|
||||||
async_dispatcher_send(
|
async_dispatcher_send(
|
||||||
self._hass,
|
self._hass,
|
||||||
|
@ -139,15 +146,15 @@ class GdacsFeedEntityManager:
|
||||||
external_id,
|
external_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _update_entity(self, external_id):
|
async def _update_entity(self, external_id: str) -> None:
|
||||||
"""Update entity."""
|
"""Update entity."""
|
||||||
async_dispatcher_send(self._hass, f"gdacs_update_{external_id}")
|
async_dispatcher_send(self._hass, f"gdacs_update_{external_id}")
|
||||||
|
|
||||||
async def _remove_entity(self, external_id):
|
async def _remove_entity(self, external_id: str) -> None:
|
||||||
"""Remove entity."""
|
"""Remove entity."""
|
||||||
async_dispatcher_send(self._hass, f"gdacs_delete_{external_id}")
|
async_dispatcher_send(self._hass, f"gdacs_delete_{external_id}")
|
||||||
|
|
||||||
async def _status_update(self, status_info):
|
async def _status_update(self, status_info: StatusUpdate) -> None:
|
||||||
"""Propagate status update."""
|
"""Propagate status update."""
|
||||||
_LOGGER.debug("Status update received: %s", status_info)
|
_LOGGER.debug("Status update received: %s", status_info)
|
||||||
self._status_info = status_info
|
self._status_info = status_info
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
"""Config flow to configure the GDACS integration."""
|
"""Config flow to configure the GDACS integration."""
|
||||||
import logging
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
|
@ -10,6 +11,7 @@ from homeassistant.const import (
|
||||||
CONF_RADIUS,
|
CONF_RADIUS,
|
||||||
CONF_SCAN_INTERVAL,
|
CONF_SCAN_INTERVAL,
|
||||||
)
|
)
|
||||||
|
from homeassistant.data_entry_flow import FlowResult
|
||||||
from homeassistant.helpers import config_validation as cv
|
from homeassistant.helpers import config_validation as cv
|
||||||
|
|
||||||
from .const import CONF_CATEGORIES, DEFAULT_RADIUS, DEFAULT_SCAN_INTERVAL, DOMAIN
|
from .const import CONF_CATEGORIES, DEFAULT_RADIUS, DEFAULT_SCAN_INTERVAL, DOMAIN
|
||||||
|
@ -24,13 +26,15 @@ _LOGGER = logging.getLogger(__name__)
|
||||||
class GdacsFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
class GdacsFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
"""Handle a GDACS config flow."""
|
"""Handle a GDACS config flow."""
|
||||||
|
|
||||||
async def _show_form(self, errors=None):
|
async def _show_form(self, errors: dict[str, str] | None = None) -> FlowResult:
|
||||||
"""Show the form to the user."""
|
"""Show the form to the user."""
|
||||||
return self.async_show_form(
|
return self.async_show_form(
|
||||||
step_id="user", data_schema=DATA_SCHEMA, errors=errors or {}
|
step_id="user", data_schema=DATA_SCHEMA, errors=errors or {}
|
||||||
)
|
)
|
||||||
|
|
||||||
async def async_step_user(self, user_input=None):
|
async def async_step_user(
|
||||||
|
self, user_input: dict[str, Any] | None = None
|
||||||
|
) -> FlowResult:
|
||||||
"""Handle the start of the config flow."""
|
"""Handle the start of the config flow."""
|
||||||
_LOGGER.debug("User input: %s", user_input)
|
_LOGGER.debug("User input: %s", user_input)
|
||||||
if not user_input:
|
if not user_input:
|
||||||
|
|
|
@ -2,10 +2,10 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
|
from datetime import datetime
|
||||||
import logging
|
import logging
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from aio_georss_gdacs import GdacsFeedManager
|
|
||||||
from aio_georss_gdacs.feed_entry import GdacsFeedEntry
|
from aio_georss_gdacs.feed_entry import GdacsFeedEntry
|
||||||
|
|
||||||
from homeassistant.components.geo_location import GeolocationEvent
|
from homeassistant.components.geo_location import GeolocationEvent
|
||||||
|
@ -58,7 +58,7 @@ async def async_setup_entry(
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def async_add_geolocation(
|
def async_add_geolocation(
|
||||||
feed_manager: GdacsFeedManager, integration_id: str, external_id: str
|
feed_manager: GdacsFeedEntityManager, integration_id: str, external_id: str
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Add geolocation entity from feed."""
|
"""Add geolocation entity from feed."""
|
||||||
new_entity = GdacsEvent(feed_manager, integration_id, external_id)
|
new_entity = GdacsEvent(feed_manager, integration_id, external_id)
|
||||||
|
@ -83,25 +83,28 @@ class GdacsEvent(GeolocationEvent):
|
||||||
_attr_source = SOURCE
|
_attr_source = SOURCE
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, feed_manager: GdacsFeedManager, integration_id: str, external_id: str
|
self,
|
||||||
|
feed_manager: GdacsFeedEntityManager,
|
||||||
|
integration_id: str,
|
||||||
|
external_id: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Initialize entity with data from feed entry."""
|
"""Initialize entity with data from feed entry."""
|
||||||
self._feed_manager = feed_manager
|
self._feed_manager = feed_manager
|
||||||
self._external_id = external_id
|
self._external_id = external_id
|
||||||
self._attr_unique_id = f"{integration_id}_{external_id}"
|
self._attr_unique_id = f"{integration_id}_{external_id}"
|
||||||
self._attr_unit_of_measurement = UnitOfLength.KILOMETERS
|
self._attr_unit_of_measurement = UnitOfLength.KILOMETERS
|
||||||
self._alert_level = None
|
self._alert_level: str | None = None
|
||||||
self._country = None
|
self._country: str | None = None
|
||||||
self._description = None
|
self._description: str | None = None
|
||||||
self._duration_in_week = None
|
self._duration_in_week: int | None = None
|
||||||
self._event_type_short = None
|
self._event_type_short: str | None = None
|
||||||
self._event_type = None
|
self._event_type: str | None = None
|
||||||
self._from_date = None
|
self._from_date: datetime | None = None
|
||||||
self._to_date = None
|
self._to_date: datetime | None = None
|
||||||
self._population = None
|
self._population: str | None = None
|
||||||
self._severity = None
|
self._severity: str | None = None
|
||||||
self._vulnerability = None
|
self._vulnerability: str | float | None = None
|
||||||
self._version = None
|
self._version: int | None = None
|
||||||
self._remove_signal_delete: Callable[[], None]
|
self._remove_signal_delete: Callable[[], None]
|
||||||
self._remove_signal_update: Callable[[], None]
|
self._remove_signal_update: Callable[[], None]
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,11 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
|
from datetime import datetime
|
||||||
import logging
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from aio_georss_client.status_update import StatusUpdate
|
||||||
|
|
||||||
from homeassistant.components.sensor import SensorEntity
|
from homeassistant.components.sensor import SensorEntity
|
||||||
from homeassistant.config_entries import ConfigEntry
|
from homeassistant.config_entries import ConfigEntry
|
||||||
|
@ -12,6 +16,7 @@ from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||||
from homeassistant.util import dt as dt_util
|
from homeassistant.util import dt as dt_util
|
||||||
|
|
||||||
|
from . import GdacsFeedEntityManager
|
||||||
from .const import DEFAULT_ICON, DOMAIN, FEED
|
from .const import DEFAULT_ICON, DOMAIN, FEED
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
@ -34,7 +39,7 @@ async def async_setup_entry(
|
||||||
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
|
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Set up the GDACS Feed platform."""
|
"""Set up the GDACS Feed platform."""
|
||||||
manager = hass.data[DOMAIN][FEED][entry.entry_id]
|
manager: GdacsFeedEntityManager = hass.data[DOMAIN][FEED][entry.entry_id]
|
||||||
sensor = GdacsSensor(entry, manager)
|
sensor = GdacsSensor(entry, manager)
|
||||||
async_add_entities([sensor])
|
async_add_entities([sensor])
|
||||||
|
|
||||||
|
@ -48,20 +53,22 @@ class GdacsSensor(SensorEntity):
|
||||||
_attr_has_entity_name = True
|
_attr_has_entity_name = True
|
||||||
_attr_name = None
|
_attr_name = None
|
||||||
|
|
||||||
def __init__(self, config_entry: ConfigEntry, manager) -> None:
|
def __init__(
|
||||||
|
self, config_entry: ConfigEntry, manager: GdacsFeedEntityManager
|
||||||
|
) -> None:
|
||||||
"""Initialize entity."""
|
"""Initialize entity."""
|
||||||
assert config_entry.unique_id
|
assert config_entry.unique_id
|
||||||
self._config_entry_id = config_entry.entry_id
|
self._config_entry_id = config_entry.entry_id
|
||||||
self._attr_unique_id = config_entry.unique_id
|
self._attr_unique_id = config_entry.unique_id
|
||||||
self._manager = manager
|
self._manager = manager
|
||||||
self._status = None
|
self._status: str | None = None
|
||||||
self._last_update = None
|
self._last_update: datetime | None = None
|
||||||
self._last_update_successful = None
|
self._last_update_successful: datetime | None = None
|
||||||
self._last_timestamp = None
|
self._last_timestamp: datetime | None = None
|
||||||
self._total = None
|
self._total: int | None = None
|
||||||
self._created = None
|
self._created: int | None = None
|
||||||
self._updated = None
|
self._updated: int | None = None
|
||||||
self._removed = None
|
self._removed: int | None = None
|
||||||
self._remove_signal_status: Callable[[], None] | None = None
|
self._remove_signal_status: Callable[[], None] | None = None
|
||||||
self._attr_device_info = DeviceInfo(
|
self._attr_device_info = DeviceInfo(
|
||||||
identifiers={(DOMAIN, config_entry.unique_id)},
|
identifiers={(DOMAIN, config_entry.unique_id)},
|
||||||
|
@ -86,7 +93,7 @@ class GdacsSensor(SensorEntity):
|
||||||
self._remove_signal_status()
|
self._remove_signal_status()
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def _update_status_callback(self):
|
def _update_status_callback(self) -> None:
|
||||||
"""Call status update method."""
|
"""Call status update method."""
|
||||||
_LOGGER.debug("Received status update for %s", self._config_entry_id)
|
_LOGGER.debug("Received status update for %s", self._config_entry_id)
|
||||||
self.async_schedule_update_ha_state(True)
|
self.async_schedule_update_ha_state(True)
|
||||||
|
@ -99,7 +106,7 @@ class GdacsSensor(SensorEntity):
|
||||||
if status_info:
|
if status_info:
|
||||||
self._update_from_status_info(status_info)
|
self._update_from_status_info(status_info)
|
||||||
|
|
||||||
def _update_from_status_info(self, status_info):
|
def _update_from_status_info(self, status_info: StatusUpdate) -> None:
|
||||||
"""Update the internal state from the provided information."""
|
"""Update the internal state from the provided information."""
|
||||||
self._status = status_info.status
|
self._status = status_info.status
|
||||||
self._last_update = (
|
self._last_update = (
|
||||||
|
@ -118,14 +125,14 @@ class GdacsSensor(SensorEntity):
|
||||||
self._removed = status_info.removed
|
self._removed = status_info.removed
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def native_value(self):
|
def native_value(self) -> int | None:
|
||||||
"""Return the state of the sensor."""
|
"""Return the state of the sensor."""
|
||||||
return self._total
|
return self._total
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def extra_state_attributes(self):
|
def extra_state_attributes(self) -> dict[str, Any]:
|
||||||
"""Return the device state attributes."""
|
"""Return the device state attributes."""
|
||||||
attributes = {}
|
attributes: dict[str, Any] = {}
|
||||||
for key, value in (
|
for key, value in (
|
||||||
(ATTR_STATUS, self._status),
|
(ATTR_STATUS, self._status),
|
||||||
(ATTR_LAST_UPDATE, self._last_update),
|
(ATTR_LAST_UPDATE, self._last_update),
|
||||||
|
|
Loading…
Add table
Reference in a new issue