Improve IQVIA data/API management based on enabled entities (#32291)

* Improve IQVIA data/API management based on enabled entities

* Code review comments

* Code review

* Cleanup

* Linting

* Code review

* Code review
This commit is contained in:
Aaron Bach 2020-03-16 23:58:50 -06:00 committed by GitHub
parent 2cda7bf1e7
commit a278cf3db2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 140 additions and 70 deletions

View file

@ -4,7 +4,7 @@ from datetime import timedelta
import logging
from pyiqvia import Client
from pyiqvia.errors import InvalidZipError
from pyiqvia.errors import InvalidZipError, IQVIAError
import voluptuous as vol
from homeassistant.config_entries import SOURCE_IMPORT
@ -17,7 +17,6 @@ from homeassistant.helpers.dispatcher import (
)
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.util.decorator import Registry
from .config_flow import configured_instances
from .const import (
@ -43,20 +42,20 @@ from .const import (
_LOGGER = logging.getLogger(__name__)
API_CATEGORY_MAPPING = {
TYPE_ALLERGY_TODAY: TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_TOMORROW: TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_TOMORROW: TYPE_ALLERGY_INDEX,
TYPE_ASTHMA_TODAY: TYPE_ASTHMA_INDEX,
TYPE_ASTHMA_TOMORROW: TYPE_ALLERGY_INDEX,
TYPE_DISEASE_TODAY: TYPE_DISEASE_INDEX,
}
DATA_CONFIG = "config"
DEFAULT_ATTRIBUTION = "Data provided by IQVIA™"
DEFAULT_SCAN_INTERVAL = timedelta(minutes=30)
FETCHER_MAPPING = {
(TYPE_ALLERGY_FORECAST,): (TYPE_ALLERGY_FORECAST, TYPE_ALLERGY_OUTLOOK),
(TYPE_ALLERGY_TODAY, TYPE_ALLERGY_TOMORROW): (TYPE_ALLERGY_INDEX,),
(TYPE_ASTHMA_FORECAST,): (TYPE_ASTHMA_FORECAST,),
(TYPE_ASTHMA_TODAY, TYPE_ASTHMA_TOMORROW): (TYPE_ASTHMA_INDEX,),
(TYPE_DISEASE_FORECAST,): (TYPE_DISEASE_FORECAST,),
(TYPE_DISEASE_TODAY,): (TYPE_DISEASE_INDEX,),
}
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.All(
@ -75,6 +74,12 @@ CONFIG_SCHEMA = vol.Schema(
)
@callback
def async_get_api_category(sensor_type):
"""Return the API category that a particular sensor type should use."""
return API_CATEGORY_MAPPING.get(sensor_type, sensor_type)
async def async_setup(hass, config):
"""Set up the IQVIA component."""
hass.data[DOMAIN] = {}
@ -102,8 +107,9 @@ async def async_setup_entry(hass, config_entry):
"""Set up IQVIA as config entry."""
websession = aiohttp_client.async_get_clientsession(hass)
iqvia = IQVIAData(hass, Client(config_entry.data[CONF_ZIP_CODE], websession))
try:
iqvia = IQVIAData(Client(config_entry.data[CONF_ZIP_CODE], websession))
await iqvia.async_update()
except InvalidZipError:
_LOGGER.error("Invalid ZIP code provided: %s", config_entry.data[CONF_ZIP_CODE])
@ -115,16 +121,6 @@ async def async_setup_entry(hass, config_entry):
hass.config_entries.async_forward_entry_setup(config_entry, "sensor")
)
async def refresh(event_time):
"""Refresh IQVIA data."""
_LOGGER.debug("Updating IQVIA data")
await iqvia.async_update()
async_dispatcher_send(hass, TOPIC_DATA_UPDATE)
hass.data[DOMAIN][DATA_LISTENER][config_entry.entry_id] = async_track_time_interval(
hass, refresh, DEFAULT_SCAN_INTERVAL
)
return True
@ -143,42 +139,99 @@ async def async_unload_entry(hass, config_entry):
class IQVIAData:
"""Define a data object to retrieve info from IQVIA."""
def __init__(self, client):
def __init__(self, hass, client):
"""Initialize."""
self._async_cancel_time_interval_listener = None
self._client = client
self._hass = hass
self.data = {}
self.zip_code = client.zip_code
self.fetchers = Registry()
self.fetchers.register(TYPE_ALLERGY_FORECAST)(self._client.allergens.extended)
self.fetchers.register(TYPE_ALLERGY_OUTLOOK)(self._client.allergens.outlook)
self.fetchers.register(TYPE_ALLERGY_INDEX)(self._client.allergens.current)
self.fetchers.register(TYPE_ASTHMA_FORECAST)(self._client.asthma.extended)
self.fetchers.register(TYPE_ASTHMA_INDEX)(self._client.asthma.current)
self.fetchers.register(TYPE_DISEASE_FORECAST)(self._client.disease.extended)
self.fetchers.register(TYPE_DISEASE_INDEX)(self._client.disease.current)
self._api_coros = {
TYPE_ALLERGY_FORECAST: client.allergens.extended,
TYPE_ALLERGY_INDEX: client.allergens.current,
TYPE_ALLERGY_OUTLOOK: client.allergens.outlook,
TYPE_ASTHMA_FORECAST: client.asthma.extended,
TYPE_ASTHMA_INDEX: client.asthma.current,
TYPE_DISEASE_FORECAST: client.disease.extended,
TYPE_DISEASE_INDEX: client.disease.current,
}
self._api_category_count = {
TYPE_ALLERGY_FORECAST: 0,
TYPE_ALLERGY_INDEX: 0,
TYPE_ALLERGY_OUTLOOK: 0,
TYPE_ASTHMA_FORECAST: 0,
TYPE_ASTHMA_INDEX: 0,
TYPE_DISEASE_FORECAST: 0,
TYPE_DISEASE_INDEX: 0,
}
self._api_category_locks = {
TYPE_ALLERGY_FORECAST: asyncio.Lock(),
TYPE_ALLERGY_INDEX: asyncio.Lock(),
TYPE_ALLERGY_OUTLOOK: asyncio.Lock(),
TYPE_ASTHMA_FORECAST: asyncio.Lock(),
TYPE_ASTHMA_INDEX: asyncio.Lock(),
TYPE_DISEASE_FORECAST: asyncio.Lock(),
TYPE_DISEASE_INDEX: asyncio.Lock(),
}
async def _async_get_data_from_api(self, api_category):
"""Update and save data for a particular API category."""
if self._api_category_count[api_category] == 0:
return
try:
self.data[api_category] = await self._api_coros[api_category]()
except IQVIAError as err:
_LOGGER.error("Unable to get %s data: %s", api_category, err)
self.data[api_category] = None
async def _async_update_listener_action(self, now):
"""Define an async_track_time_interval action to update data."""
await self.async_update()
@callback
def async_deregister_api_interest(self, sensor_type):
"""Decrement the number of entities with data needs from an API category."""
# If this deregistration should leave us with no registration at all, remove the
# time interval:
if sum(self._api_category_count.values()) == 0:
if self._async_cancel_time_interval_listener:
self._async_cancel_time_interval_listener()
self._async_cancel_time_interval_listener = None
return
api_category = async_get_api_category(sensor_type)
self._api_category_count[api_category] -= 1
async def async_register_api_interest(self, sensor_type):
"""Increment the number of entities with data needs from an API category."""
# If this is the first registration we have, start a time interval:
if not self._async_cancel_time_interval_listener:
self._async_cancel_time_interval_listener = async_track_time_interval(
self._hass, self._async_update_listener_action, DEFAULT_SCAN_INTERVAL,
)
api_category = async_get_api_category(sensor_type)
self._api_category_count[api_category] += 1
# If a sensor registers interest in a particular API call and the data doesn't
# exist for it yet, make the API call and grab the data:
async with self._api_category_locks[api_category]:
if api_category not in self.data:
await self._async_get_data_from_api(api_category)
async def async_update(self):
"""Update IQVIA data."""
tasks = {}
tasks = [
self._async_get_data_from_api(api_category)
for api_category in self._api_coros
]
for conditions, fetcher_types in FETCHER_MAPPING.items():
if not any(c in SENSORS for c in conditions):
continue
await asyncio.gather(*tasks)
for fetcher_type in fetcher_types:
tasks[fetcher_type] = self.fetchers[fetcher_type]()
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
for key, result in zip(tasks, results):
if isinstance(result, Exception):
_LOGGER.error("Unable to get %s data: %s", key, result)
self.data[key] = {}
continue
_LOGGER.debug("Loaded new %s data", key)
self.data[key] = result
_LOGGER.debug("Received new data")
async_dispatcher_send(self._hass, TOPIC_DATA_UPDATE)
class IQVIAEntity(Entity):
@ -245,13 +298,34 @@ class IQVIAEntity(Entity):
@callback
def update():
"""Update the state."""
self.async_schedule_update_ha_state(True)
self.update_from_latest_data()
self.async_write_ha_state()
self._async_unsub_dispatcher_connect = async_dispatcher_connect(
self.hass, TOPIC_DATA_UPDATE, update
)
await self._iqvia.async_register_api_interest(self._type)
if self._type == TYPE_ALLERGY_FORECAST:
# Entities that express interest in allergy forecast data should also
# express interest in allergy outlook data:
await self._iqvia.async_register_api_interest(TYPE_ALLERGY_OUTLOOK)
self.update_from_latest_data()
async def async_will_remove_from_hass(self):
"""Disconnect dispatcher listener when removed."""
if self._async_unsub_dispatcher_connect:
self._async_unsub_dispatcher_connect()
self._async_unsub_dispatcher_connect = None
self._iqvia.async_deregister_api_interest(self._type)
if self._type == TYPE_ALLERGY_FORECAST:
# Entities that lose interest in allergy forecast data should also lose
# interest in allergy outlook data:
self._iqvia.async_deregister_api_interest(TYPE_ALLERGY_OUTLOOK)
@callback
def update_from_latest_data(self):
"""Update the entity's state."""
raise NotImplementedError()

View file

@ -25,9 +25,9 @@ SENSORS = {
TYPE_ALLERGY_FORECAST: ("Allergy Index: Forecasted Average", "mdi:flower"),
TYPE_ALLERGY_TODAY: ("Allergy Index: Today", "mdi:flower"),
TYPE_ALLERGY_TOMORROW: ("Allergy Index: Tomorrow", "mdi:flower"),
TYPE_ASTHMA_FORECAST: ("Asthma Index: Forecasted Average", "mdi:flower"),
TYPE_ASTHMA_TODAY: ("Asthma Index: Today", "mdi:flower"),
TYPE_ASTHMA_TOMORROW: ("Asthma Index: Tomorrow", "mdi:flower"),
TYPE_ASTHMA_FORECAST: ("Asthma Index: Forecasted Average", "mdi:flower"),
TYPE_DISEASE_FORECAST: ("Cold & Flu: Forecasted Average", "mdi:snowflake"),
TYPE_DISEASE_TODAY: ("Cold & Flu Index: Today", "mdi:pill"),
}

View file

@ -7,7 +7,6 @@ import numpy as np
from homeassistant.components.iqvia import (
DATA_CLIENT,
DOMAIN,
SENSORS,
TYPE_ALLERGY_FORECAST,
TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_OUTLOOK,
@ -23,6 +22,9 @@ from homeassistant.components.iqvia import (
IQVIAEntity,
)
from homeassistant.const import ATTR_STATE
from homeassistant.core import callback
from .const import SENSORS
_LOGGER = logging.getLogger(__name__)
@ -65,13 +67,14 @@ async def async_setup_entry(hass, entry, async_add_entities):
TYPE_DISEASE_TODAY: IndexSensor,
}
sensors = []
for sensor_type in SENSORS:
klass = sensor_class_mapping[sensor_type]
name, icon = SENSORS[sensor_type]
sensors.append(klass(iqvia, sensor_type, name, icon, iqvia.zip_code))
async_add_entities(sensors, True)
async_add_entities(
[
sensor_class_mapping[sensor_type](
iqvia, sensor_type, name, icon, iqvia.zip_code
)
for sensor_type, (name, icon) in SENSORS.items()
]
)
def calculate_trend(indices):
@ -93,9 +96,10 @@ def calculate_trend(indices):
class ForecastSensor(IQVIAEntity):
"""Define sensor related to forecast data."""
async def async_update(self):
@callback
def update_from_latest_data(self):
"""Update the sensor."""
if not self._iqvia.data:
if not self._iqvia.data.get(self._type):
return
data = self._iqvia.data[self._type].get("Location")
@ -131,12 +135,10 @@ class ForecastSensor(IQVIAEntity):
class IndexSensor(IQVIAEntity):
"""Define sensor related to indices."""
async def async_update(self):
@callback
def update_from_latest_data(self):
"""Update the sensor."""
if not self._iqvia.data:
_LOGGER.warning(
"IQVIA didn't return data for %s; trying again later", self.name
)
return
try:
@ -147,9 +149,6 @@ class IndexSensor(IQVIAEntity):
elif self._type == TYPE_DISEASE_TODAY:
data = self._iqvia.data[TYPE_DISEASE_INDEX].get("Location")
except KeyError:
_LOGGER.warning(
"IQVIA didn't return data for %s; trying again later", self.name
)
return
key = self._type.split("_")[-1].title()
@ -157,9 +156,6 @@ class IndexSensor(IQVIAEntity):
try:
[period] = [p for p in data["periods"] if p["Type"] == key]
except ValueError:
_LOGGER.warning(
"IQVIA didn't return data for %s; trying again later", self.name
)
return
[rating] = [