Tibber data coordinator (#53619)

* Tibber data coordinator

Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net>

* Fix comments

Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net>

* Fix comments

Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net>

* Fix comments

Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net>

* Remove whitespace

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
Daniel Hjelseth Høyer 2021-08-18 11:21:39 +02:00 committed by GitHub
parent 73d03bdf1d
commit 102af02d8a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -26,17 +26,15 @@ from homeassistant.const import (
ELECTRIC_CURRENT_AMPERE,
ELECTRIC_POTENTIAL_VOLT,
ENERGY_KILO_WATT_HOUR,
EVENT_HOMEASSISTANT_STOP,
PERCENTAGE,
POWER_WATT,
SIGNAL_STRENGTH_DECIBELS,
)
from homeassistant.core import callback
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers import update_coordinator
from homeassistant.helpers.device_registry import async_get as async_get_dev_reg
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.entity_registry import async_get as async_get_entity_reg
from homeassistant.util import Throttle, dt as dt_util
@ -66,40 +64,40 @@ class TibberSensorEntityDescription(SensorEntityDescription):
reset_type: ResetType | None = None
RT_SENSOR_MAP: dict[str, TibberSensorEntityDescription] = {
"averagePower": TibberSensorEntityDescription(
RT_SENSORS: tuple[TibberSensorEntityDescription, ...] = (
TibberSensorEntityDescription(
key="averagePower",
name="average power",
device_class=DEVICE_CLASS_POWER,
native_unit_of_measurement=POWER_WATT,
),
"power": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="power",
name="power",
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
native_unit_of_measurement=POWER_WATT,
),
"powerProduction": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="powerProduction",
name="power production",
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
native_unit_of_measurement=POWER_WATT,
),
"minPower": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="minPower",
name="min power",
device_class=DEVICE_CLASS_POWER,
native_unit_of_measurement=POWER_WATT,
),
"maxPower": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="maxPower",
name="max power",
device_class=DEVICE_CLASS_POWER,
native_unit_of_measurement=POWER_WATT,
),
"accumulatedConsumption": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="accumulatedConsumption",
name="accumulated consumption",
device_class=DEVICE_CLASS_ENERGY,
@ -107,7 +105,7 @@ RT_SENSOR_MAP: dict[str, TibberSensorEntityDescription] = {
state_class=STATE_CLASS_MEASUREMENT,
reset_type=ResetType.DAILY,
),
"accumulatedConsumptionLastHour": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="accumulatedConsumptionLastHour",
name="accumulated consumption current hour",
device_class=DEVICE_CLASS_ENERGY,
@ -115,7 +113,7 @@ RT_SENSOR_MAP: dict[str, TibberSensorEntityDescription] = {
state_class=STATE_CLASS_MEASUREMENT,
reset_type=ResetType.HOURLY,
),
"accumulatedProduction": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="accumulatedProduction",
name="accumulated production",
device_class=DEVICE_CLASS_ENERGY,
@ -123,7 +121,7 @@ RT_SENSOR_MAP: dict[str, TibberSensorEntityDescription] = {
state_class=STATE_CLASS_MEASUREMENT,
reset_type=ResetType.DAILY,
),
"accumulatedProductionLastHour": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="accumulatedProductionLastHour",
name="accumulated production current hour",
device_class=DEVICE_CLASS_ENERGY,
@ -131,7 +129,7 @@ RT_SENSOR_MAP: dict[str, TibberSensorEntityDescription] = {
state_class=STATE_CLASS_MEASUREMENT,
reset_type=ResetType.HOURLY,
),
"lastMeterConsumption": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="lastMeterConsumption",
name="last meter consumption",
device_class=DEVICE_CLASS_ENERGY,
@ -139,7 +137,7 @@ RT_SENSOR_MAP: dict[str, TibberSensorEntityDescription] = {
state_class=STATE_CLASS_MEASUREMENT,
reset_type=ResetType.NEVER,
),
"lastMeterProduction": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="lastMeterProduction",
name="last meter production",
device_class=DEVICE_CLASS_ENERGY,
@ -147,77 +145,77 @@ RT_SENSOR_MAP: dict[str, TibberSensorEntityDescription] = {
state_class=STATE_CLASS_MEASUREMENT,
reset_type=ResetType.NEVER,
),
"voltagePhase1": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="voltagePhase1",
name="voltage phase1",
device_class=DEVICE_CLASS_VOLTAGE,
native_unit_of_measurement=ELECTRIC_POTENTIAL_VOLT,
state_class=STATE_CLASS_MEASUREMENT,
),
"voltagePhase2": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="voltagePhase2",
name="voltage phase2",
device_class=DEVICE_CLASS_VOLTAGE,
native_unit_of_measurement=ELECTRIC_POTENTIAL_VOLT,
state_class=STATE_CLASS_MEASUREMENT,
),
"voltagePhase3": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="voltagePhase3",
name="voltage phase3",
device_class=DEVICE_CLASS_VOLTAGE,
native_unit_of_measurement=ELECTRIC_POTENTIAL_VOLT,
state_class=STATE_CLASS_MEASUREMENT,
),
"currentL1": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="currentL1",
name="current L1",
device_class=DEVICE_CLASS_CURRENT,
native_unit_of_measurement=ELECTRIC_CURRENT_AMPERE,
state_class=STATE_CLASS_MEASUREMENT,
),
"currentL2": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="currentL2",
name="current L2",
device_class=DEVICE_CLASS_CURRENT,
native_unit_of_measurement=ELECTRIC_CURRENT_AMPERE,
state_class=STATE_CLASS_MEASUREMENT,
),
"currentL3": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="currentL3",
name="current L3",
device_class=DEVICE_CLASS_CURRENT,
native_unit_of_measurement=ELECTRIC_CURRENT_AMPERE,
state_class=STATE_CLASS_MEASUREMENT,
),
"signalStrength": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="signalStrength",
name="signal strength",
device_class=DEVICE_CLASS_SIGNAL_STRENGTH,
native_unit_of_measurement=SIGNAL_STRENGTH_DECIBELS,
state_class=STATE_CLASS_MEASUREMENT,
),
"accumulatedReward": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="accumulatedReward",
name="accumulated reward",
device_class=DEVICE_CLASS_MONETARY,
state_class=STATE_CLASS_MEASUREMENT,
reset_type=ResetType.DAILY,
),
"accumulatedCost": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="accumulatedCost",
name="accumulated cost",
device_class=DEVICE_CLASS_MONETARY,
state_class=STATE_CLASS_MEASUREMENT,
reset_type=ResetType.DAILY,
),
"powerFactor": TibberSensorEntityDescription(
TibberSensorEntityDescription(
key="powerFactor",
name="power factor",
device_class=DEVICE_CLASS_POWER_FACTOR,
native_unit_of_measurement=PERCENTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
}
)
async def async_setup_entry(hass, entry, async_add_entities):
@ -243,7 +241,9 @@ async def async_setup_entry(hass, entry, async_add_entities):
entities.append(TibberSensorElPrice(home))
if home.has_real_time_consumption:
await home.rt_subscribe(
TibberRtDataHandler(async_add_entities, home, hass).async_callback
TibberRtDataCoordinator(
async_add_entities, home, hass
).async_set_updated_data
)
# migrate
@ -273,27 +273,23 @@ async def async_setup_entry(hass, entry, async_add_entities):
class TibberSensor(SensorEntity):
"""Representation of a generic Tibber sensor."""
def __init__(self, tibber_home):
def __init__(self, *args, tibber_home, **kwargs):
"""Initialize the sensor."""
super().__init__(*args, **kwargs)
self._tibber_home = tibber_home
self._home_name = tibber_home.info["viewer"]["home"]["appNickname"]
self._device_name = None
if self._home_name is None:
self._home_name = tibber_home.info["viewer"]["home"]["address"].get(
"address1", ""
)
self._device_name = None
self._model = None
@property
def device_id(self):
"""Return the ID of the physical device this sensor is part of."""
return self._tibber_home.home_id
@property
def device_info(self):
"""Return the device_info of the device."""
device_info = {
"identifiers": {(TIBBER_DOMAIN, self.device_id)},
"identifiers": {(TIBBER_DOMAIN, self._tibber_home.home_id)},
"name": self._device_name,
"manufacturer": MANUFACTURER,
}
@ -307,7 +303,7 @@ class TibberSensorElPrice(TibberSensor):
def __init__(self, tibber_home):
"""Initialize the sensor."""
super().__init__(tibber_home)
super().__init__(tibber_home=tibber_home)
self._last_updated = None
self._spread_load_constant = randrange(5000)
@ -377,10 +373,9 @@ class TibberSensorElPrice(TibberSensor):
]["estimatedAnnualConsumption"]
class TibberSensorRT(TibberSensor):
class TibberSensorRT(TibberSensor, update_coordinator.CoordinatorEntity):
"""Representation of a Tibber sensor for real time consumption."""
_attr_should_poll = False
entity_description: TibberSensorEntityDescription
def __init__(
@ -388,9 +383,10 @@ class TibberSensorRT(TibberSensor):
tibber_home,
description: TibberSensorEntityDescription,
initial_state,
coordinator: TibberRtDataCoordinator,
):
"""Initialize the sensor."""
super().__init__(tibber_home)
super().__init__(coordinator=coordinator, tibber_home=tibber_home)
self.entity_description = description
self._model = "Tibber Pulse"
self._device_name = f"{self._model} {self._home_name}"
@ -399,7 +395,7 @@ class TibberSensorRT(TibberSensor):
self._attr_native_value = initial_state
self._attr_unique_id = f"{self._tibber_home.home_id}_rt_{description.name}"
if description.name in ("accumulated cost", "accumulated reward"):
if description.key in ("accumulatedCost", "accumulatedReward"):
self._attr_native_unit_of_measurement = tibber_home.currency
if description.reset_type == ResetType.NEVER:
self._attr_last_reset = dt_util.utc_from_timestamp(0)
@ -414,43 +410,35 @@ class TibberSensorRT(TibberSensor):
else:
self._attr_last_reset = None
async def async_added_to_hass(self):
"""Start listen for real time data."""
self.async_on_remove(
async_dispatcher_connect(
self.hass,
SIGNAL_UPDATE_ENTITY.format(self.unique_id),
self._set_state,
)
)
@property
def available(self):
"""Return True if entity is available."""
return self._tibber_home.rt_subscription_running
@callback
def _set_state(self, state, timestamp):
"""Set sensor state."""
if (
state < self._attr_native_value
and self.entity_description.reset_type == ResetType.DAILY
):
self._attr_last_reset = dt_util.as_utc(
timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
)
if (
state < self._attr_native_value
and self.entity_description.reset_type == ResetType.HOURLY
):
self._attr_last_reset = dt_util.as_utc(
timestamp.replace(minute=0, second=0, microsecond=0)
)
def _handle_coordinator_update(self) -> None:
if not (live_measurement := self.coordinator.get_live_measurement()): # type: ignore[attr-defined]
return
state = live_measurement.get(self.entity_description.key)
if state is None:
return
timestamp = dt_util.parse_datetime(live_measurement["timestamp"])
if timestamp is not None and state < self.state:
if self.entity_description.reset_type == ResetType.DAILY:
self._attr_last_reset = dt_util.as_utc(
timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
)
elif self.entity_description.reset_type == ResetType.HOURLY:
self._attr_last_reset = dt_util.as_utc(
timestamp.replace(minute=0, second=0, microsecond=0)
)
if self.entity_description.key == "powerFactor":
state *= 100.0
self._attr_native_value = state
self.async_write_ha_state()
class TibberRtDataHandler:
class TibberRtDataCoordinator(update_coordinator.DataUpdateCoordinator):
"""Handle Tibber realtime data."""
def __init__(self, async_add_entities, tibber_home, hass):
@ -458,42 +446,53 @@ class TibberRtDataHandler:
self._async_add_entities = async_add_entities
self._tibber_home = tibber_home
self.hass = hass
self._entities = {}
self._added_sensors = set()
super().__init__(
hass,
_LOGGER,
name=tibber_home.info["viewer"]["home"]["address"].get(
"address1", "Tibber"
),
)
async def async_callback(self, payload):
"""Handle received data."""
errors = payload.get("errors")
if errors:
_LOGGER.error(errors[0])
return
data = payload.get("data")
if data is None:
return
live_measurement = data.get("liveMeasurement")
if live_measurement is None:
self._async_remove_device_updates_handler = self.async_add_listener(
self._add_sensors
)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._handle_ha_stop)
@callback
def _handle_ha_stop(self, _event) -> None:
"""Handle Home Assistant stopping."""
self._async_remove_device_updates_handler()
@callback
def _add_sensors(self):
"""Add sensor."""
if not (live_measurement := self.get_live_measurement()):
return
timestamp = dt_util.parse_datetime(live_measurement.pop("timestamp"))
new_entities = []
for sensor_type, state in live_measurement.items():
if state is None or sensor_type not in RT_SENSOR_MAP:
for sensor_description in RT_SENSORS:
if sensor_description.key in self._added_sensors:
continue
if sensor_type == "powerFactor":
state *= 100.0
if sensor_type in self._entities:
async_dispatcher_send(
self.hass,
SIGNAL_UPDATE_ENTITY.format(self._entities[sensor_type]),
state,
timestamp,
)
else:
entity = TibberSensorRT(
self._tibber_home,
RT_SENSOR_MAP[sensor_type],
state,
)
new_entities.append(entity)
self._entities[sensor_type] = entity.unique_id
state = live_measurement.get(sensor_description.key)
if state is None:
continue
entity = TibberSensorRT(
self._tibber_home,
sensor_description,
state,
self,
)
new_entities.append(entity)
self._added_sensors.add(sensor_description.key)
if new_entities:
self._async_add_entities(new_entities)
def get_live_measurement(self):
"""Get live measurement data."""
errors = self.data.get("errors")
if errors:
_LOGGER.error(errors[0])
return None
return self.data.get("data", {}).get("liveMeasurement")