Add data update coordinator to Whois (#64846)

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>
This commit is contained in:
Franck Nijhof 2022-01-24 18:50:07 +01:00 committed by GitHub
parent b07f4ba398
commit d15d081646
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 80 additions and 57 deletions

View file

@ -1,13 +1,48 @@
"""The Whois integration."""
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from __future__ import annotations
from .const import PLATFORMS
from whois import Domain, query as whois_query
from whois.exceptions import (
FailedParsingWhoisOutput,
UnknownDateFormat,
UnknownTld,
WhoisCommandFailed,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN, LOGGER, PLATFORMS, SCAN_INTERVAL
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up from a config entry."""
async def _async_query_domain() -> Domain | None:
"""Query WHOIS for domain information."""
try:
return await hass.async_add_executor_job(
whois_query, entry.data[CONF_DOMAIN]
)
except UnknownTld as ex:
raise UpdateFailed("Could not set up whois, TLD is unknown") from ex
except (FailedParsingWhoisOutput, WhoisCommandFailed, UnknownDateFormat) as ex:
raise UpdateFailed("An error occurred during WHOIS lookup") from ex
coordinator: DataUpdateCoordinator[Domain | None] = DataUpdateCoordinator(
hass,
LOGGER,
name=f"{DOMAIN}_APK",
update_interval=SCAN_INTERVAL,
update_method=_async_query_domain,
)
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True

View file

@ -1,6 +1,7 @@
"""Constants for the Whois integration."""
from __future__ import annotations
from datetime import timedelta
import logging
from typing import Final
@ -11,6 +12,8 @@ PLATFORMS = [Platform.SENSOR]
LOGGER = logging.getLogger(__package__)
SCAN_INTERVAL = timedelta(hours=24)
DEFAULT_NAME = "Whois"
ATTR_EXPIRES = "expires"

View file

@ -3,18 +3,10 @@ from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from datetime import timedelta
from typing import cast
import voluptuous as vol
import whois
from whois import Domain
from whois.exceptions import (
FailedParsingWhoisOutput,
UnknownDateFormat,
UnknownTld,
WhoisCommandFailed,
)
from homeassistant.components.sensor import (
PLATFORM_SCHEMA,
@ -29,6 +21,10 @@ from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from .const import (
ATTR_EXPIRES,
@ -40,8 +36,6 @@ from .const import (
LOGGER,
)
SCAN_INTERVAL = timedelta(hours=24)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_DOMAIN): cv.string,
@ -111,21 +105,13 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the platform from config_entry."""
domain = entry.data[CONF_DOMAIN]
try:
await hass.async_add_executor_job(whois.query, domain)
except UnknownTld:
LOGGER.error("Could not set up whois for %s, TLD is unknown", domain)
return
except (FailedParsingWhoisOutput, WhoisCommandFailed, UnknownDateFormat) as ex:
LOGGER.error("Exception %s occurred during WHOIS lookup for %s", ex, domain)
return
coordinator = hass.data[DOMAIN][entry.entry_id]
async_add_entities(
[
WhoisSensorEntity(
domain=domain,
coordinator=coordinator,
description=description,
domain=entry.data[CONF_DOMAIN],
)
for description in SENSORS
],
@ -133,13 +119,19 @@ async def async_setup_entry(
)
class WhoisSensorEntity(SensorEntity):
class WhoisSensorEntity(CoordinatorEntity, SensorEntity):
"""Implementation of a WHOIS sensor."""
entity_description: WhoisSensorEntityDescription
def __init__(self, description: WhoisSensorEntityDescription, domain: str) -> None:
def __init__(
self,
coordinator: DataUpdateCoordinator,
description: WhoisSensorEntityDescription,
domain: str,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator=coordinator)
self.entity_description = description
self._attr_name = domain
self._attr_unique_id = f"{domain}_{description.key}"
@ -149,42 +141,35 @@ class WhoisSensorEntity(SensorEntity):
)
self._domain = domain
def _empty_value_and_attributes(self) -> None:
"""Empty the state and attributes on an error."""
self._attr_native_value = None
self._attr_extra_state_attributes = {}
@property
def native_value(self) -> int | None:
"""Return the state of the sensor."""
if self.coordinator.data is None:
return None
return self.entity_description.value_fn(self.coordinator.data)
def update(self) -> None:
"""Get the current WHOIS data for the domain."""
try:
response: Domain | None = whois.query(self._domain)
except (FailedParsingWhoisOutput, WhoisCommandFailed, UnknownDateFormat) as ex:
LOGGER.error("Exception %s occurred during WHOIS lookup", ex)
self._empty_value_and_attributes()
return
@property
def extra_state_attributes(self) -> dict[str, int | float | None] | None:
"""Return the state attributes of the monitored installation."""
if response:
if not response.expiration_date:
LOGGER.error("Failed to find expiration_date in whois lookup response")
self._empty_value_and_attributes()
return
# Only add attributes to the original sensor
if self.entity_description.key != "days_until_expiration":
return None
self._attr_native_value = self.entity_description.value_fn(response)
if self.coordinator.data is None:
return None
# Only add attributes to the original sensor
if self.entity_description.key != "days_until_expiration":
return None
attrs = {
ATTR_EXPIRES: self.coordinator.data.expiration_date.isoformat(),
}
attrs = {}
attrs[ATTR_EXPIRES] = response.expiration_date.isoformat()
if self.coordinator.data.name_servers:
attrs[ATTR_NAME_SERVERS] = " ".join(self.coordinator.data.name_servers)
if response.name_servers:
attrs[ATTR_NAME_SERVERS] = " ".join(response.name_servers)
if self.coordinator.data.last_updated:
attrs[ATTR_UPDATED] = self.coordinator.data.last_updated.isoformat()
if response.last_updated:
attrs[ATTR_UPDATED] = response.last_updated.isoformat()
if self.coordinator.data.registrar:
attrs[ATTR_REGISTRAR] = self.coordinator.data.registrar
if response.registrar:
attrs[ATTR_REGISTRAR] = response.registrar
self._attr_extra_state_attributes = attrs
return attrs