Tibber use dataclass (#53233)
* Tibber, use dataclass Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net> * Tibber, use dataclass Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net>
This commit is contained in:
parent
2b9b346a28
commit
0cc4231ac2
1 changed files with 94 additions and 80 deletions
|
@ -1,6 +1,10 @@
|
|||
"""Support for Tibber sensors."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from datetime import timedelta
|
||||
from enum import Enum
|
||||
import logging
|
||||
from random import randrange
|
||||
|
||||
|
@ -45,108 +49,142 @@ MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
|
|||
PARALLEL_UPDATES = 0
|
||||
SIGNAL_UPDATE_ENTITY = "tibber_rt_update_{}"
|
||||
|
||||
RT_SENSOR_MAP = {
|
||||
"averagePower": ["average power", DEVICE_CLASS_POWER, POWER_WATT, None],
|
||||
"power": ["power", DEVICE_CLASS_POWER, POWER_WATT, None],
|
||||
"powerProduction": ["power production", DEVICE_CLASS_POWER, POWER_WATT, None],
|
||||
"minPower": ["min power", DEVICE_CLASS_POWER, POWER_WATT, None],
|
||||
"maxPower": ["max power", DEVICE_CLASS_POWER, POWER_WATT, None],
|
||||
"accumulatedConsumption": [
|
||||
|
||||
class ResetType(Enum):
|
||||
"""Data reset type."""
|
||||
|
||||
HOURLY = "hourly"
|
||||
DAILY = "daily"
|
||||
NEVER = "never"
|
||||
|
||||
|
||||
@dataclass
|
||||
class TibberSensorMetadata:
|
||||
"""Metadata for an individual Tibber sensor."""
|
||||
|
||||
name: str
|
||||
device_class: str
|
||||
unit: str | None = None
|
||||
state_class: str | None = None
|
||||
reset_type: ResetType | None = None
|
||||
|
||||
|
||||
RT_SENSOR_MAP: dict[str, TibberSensorMetadata] = {
|
||||
"averagePower": TibberSensorMetadata(
|
||||
"average power", DEVICE_CLASS_POWER, POWER_WATT
|
||||
),
|
||||
"power": TibberSensorMetadata(
|
||||
"power",
|
||||
DEVICE_CLASS_POWER,
|
||||
POWER_WATT,
|
||||
),
|
||||
"powerProduction": TibberSensorMetadata(
|
||||
"power production", DEVICE_CLASS_POWER, POWER_WATT
|
||||
),
|
||||
"minPower": TibberSensorMetadata("min power", DEVICE_CLASS_POWER, POWER_WATT),
|
||||
"maxPower": TibberSensorMetadata("max power", DEVICE_CLASS_POWER, POWER_WATT),
|
||||
"accumulatedConsumption": TibberSensorMetadata(
|
||||
"accumulated consumption",
|
||||
DEVICE_CLASS_ENERGY,
|
||||
ENERGY_KILO_WATT_HOUR,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"accumulatedConsumptionLastHour": [
|
||||
ResetType.DAILY,
|
||||
),
|
||||
"accumulatedConsumptionLastHour": TibberSensorMetadata(
|
||||
"accumulated consumption current hour",
|
||||
DEVICE_CLASS_ENERGY,
|
||||
ENERGY_KILO_WATT_HOUR,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"accumulatedProduction": [
|
||||
ResetType.HOURLY,
|
||||
),
|
||||
"accumulatedProduction": TibberSensorMetadata(
|
||||
"accumulated production",
|
||||
DEVICE_CLASS_ENERGY,
|
||||
ENERGY_KILO_WATT_HOUR,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"accumulatedProductionLastHour": [
|
||||
ResetType.DAILY,
|
||||
),
|
||||
"accumulatedProductionLastHour": TibberSensorMetadata(
|
||||
"accumulated production current hour",
|
||||
DEVICE_CLASS_ENERGY,
|
||||
ENERGY_KILO_WATT_HOUR,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"lastMeterConsumption": [
|
||||
ResetType.HOURLY,
|
||||
),
|
||||
"lastMeterConsumption": TibberSensorMetadata(
|
||||
"last meter consumption",
|
||||
DEVICE_CLASS_ENERGY,
|
||||
ENERGY_KILO_WATT_HOUR,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"lastMeterProduction": [
|
||||
),
|
||||
"lastMeterProduction": TibberSensorMetadata(
|
||||
"last meter production",
|
||||
DEVICE_CLASS_ENERGY,
|
||||
ENERGY_KILO_WATT_HOUR,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"voltagePhase1": [
|
||||
),
|
||||
"voltagePhase1": TibberSensorMetadata(
|
||||
"voltage phase1",
|
||||
DEVICE_CLASS_VOLTAGE,
|
||||
VOLT,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"voltagePhase2": [
|
||||
),
|
||||
"voltagePhase2": TibberSensorMetadata(
|
||||
"voltage phase2",
|
||||
DEVICE_CLASS_VOLTAGE,
|
||||
VOLT,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"voltagePhase3": [
|
||||
),
|
||||
"voltagePhase3": TibberSensorMetadata(
|
||||
"voltage phase3",
|
||||
DEVICE_CLASS_VOLTAGE,
|
||||
VOLT,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"currentL1": [
|
||||
),
|
||||
"currentL1": TibberSensorMetadata(
|
||||
"current L1",
|
||||
DEVICE_CLASS_CURRENT,
|
||||
ELECTRICAL_CURRENT_AMPERE,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"currentL2": [
|
||||
),
|
||||
"currentL2": TibberSensorMetadata(
|
||||
"current L2",
|
||||
DEVICE_CLASS_CURRENT,
|
||||
ELECTRICAL_CURRENT_AMPERE,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"currentL3": [
|
||||
),
|
||||
"currentL3": TibberSensorMetadata(
|
||||
"current L3",
|
||||
DEVICE_CLASS_CURRENT,
|
||||
ELECTRICAL_CURRENT_AMPERE,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"signalStrength": [
|
||||
),
|
||||
"signalStrength": TibberSensorMetadata(
|
||||
"signal strength",
|
||||
DEVICE_CLASS_SIGNAL_STRENGTH,
|
||||
SIGNAL_STRENGTH_DECIBELS,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"accumulatedReward": [
|
||||
),
|
||||
"accumulatedReward": TibberSensorMetadata(
|
||||
"accumulated reward",
|
||||
DEVICE_CLASS_MONETARY,
|
||||
None,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"accumulatedCost": [
|
||||
ResetType.DAILY,
|
||||
),
|
||||
"accumulatedCost": TibberSensorMetadata(
|
||||
"accumulated cost",
|
||||
DEVICE_CLASS_MONETARY,
|
||||
None,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
"powerFactor": [
|
||||
ResetType.DAILY,
|
||||
),
|
||||
"powerFactor": TibberSensorMetadata(
|
||||
"power factor",
|
||||
DEVICE_CLASS_POWER_FACTOR,
|
||||
PERCENTAGE,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
],
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
|
@ -312,39 +350,30 @@ class TibberSensorRT(TibberSensor):
|
|||
|
||||
_attr_should_poll = False
|
||||
|
||||
def __init__(
|
||||
self, tibber_home, sensor_name, device_class, unit, initial_state, state_class
|
||||
):
|
||||
def __init__(self, tibber_home, metadata: TibberSensorMetadata, initial_state):
|
||||
"""Initialize the sensor."""
|
||||
super().__init__(tibber_home)
|
||||
self._sensor_name = sensor_name
|
||||
self._model = "Tibber Pulse"
|
||||
self._device_name = f"{self._model} {self._home_name}"
|
||||
self._metadata = metadata
|
||||
|
||||
self._attr_device_class = device_class
|
||||
self._attr_name = f"{self._sensor_name} {self._home_name}"
|
||||
self._attr_device_class = metadata.device_class
|
||||
self._attr_name = f"{metadata.name} {self._home_name}"
|
||||
self._attr_state = initial_state
|
||||
self._attr_unique_id = f"{self._tibber_home.home_id}_rt_{self._sensor_name}"
|
||||
self._attr_unit_of_measurement = unit
|
||||
self._attr_state_class = state_class
|
||||
if sensor_name in [
|
||||
"last meter consumption",
|
||||
"last meter production",
|
||||
]:
|
||||
self._attr_unique_id = f"{self._tibber_home.home_id}_rt_{metadata.name}"
|
||||
|
||||
if metadata.name in ["accumulated cost", "accumulated reward"]:
|
||||
self._attr_unit_of_measurement = tibber_home.currency
|
||||
else:
|
||||
self._attr_unit_of_measurement = metadata.unit
|
||||
self._attr_state_class = metadata.state_class
|
||||
if metadata.reset_type == ResetType.NEVER:
|
||||
self._attr_last_reset = dt_util.utc_from_timestamp(0)
|
||||
elif self._sensor_name in [
|
||||
"accumulated consumption",
|
||||
"accumulated production",
|
||||
"accumulated cost",
|
||||
"accumulated reward",
|
||||
]:
|
||||
elif metadata.reset_type == ResetType.DAILY:
|
||||
self._attr_last_reset = dt_util.as_utc(
|
||||
dt_util.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
)
|
||||
elif self._sensor_name in [
|
||||
"accumulated consumption current hour",
|
||||
"accumulated production current hour",
|
||||
]:
|
||||
elif metadata.reset_type == ResetType.HOURLY:
|
||||
self._attr_last_reset = dt_util.as_utc(
|
||||
dt_util.now().replace(minute=0, second=0, microsecond=0)
|
||||
)
|
||||
|
@ -369,19 +398,11 @@ class TibberSensorRT(TibberSensor):
|
|||
@callback
|
||||
def _set_state(self, state, timestamp):
|
||||
"""Set sensor state."""
|
||||
if state < self._attr_state and self._sensor_name in [
|
||||
"accumulated consumption",
|
||||
"accumulated production",
|
||||
"accumulated cost",
|
||||
"accumulated reward",
|
||||
]:
|
||||
if state < self._attr_state and self._metadata.reset_type == ResetType.DAILY:
|
||||
self._attr_last_reset = dt_util.as_utc(
|
||||
timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
)
|
||||
if state < self._attr_state and self._sensor_name in [
|
||||
"accumulated consumption current hour",
|
||||
"accumulated production current hour",
|
||||
]:
|
||||
if state < self._attr_state and self._metadata.reset_type == ResetType.HOURLY:
|
||||
self._attr_last_reset = dt_util.as_utc(
|
||||
timestamp.replace(minute=0, second=0, microsecond=0)
|
||||
)
|
||||
|
@ -427,18 +448,11 @@ class TibberRtDataHandler:
|
|||
timestamp,
|
||||
)
|
||||
else:
|
||||
sensor_name, device_class, unit, state_class = RT_SENSOR_MAP[
|
||||
sensor_type
|
||||
]
|
||||
if sensor_type in ["accumulatedCost", "accumulatedReward"]:
|
||||
unit = self._tibber_home.currency
|
||||
sensor_meta = RT_SENSOR_MAP[sensor_type]
|
||||
entity = TibberSensorRT(
|
||||
self._tibber_home,
|
||||
sensor_name,
|
||||
device_class,
|
||||
unit,
|
||||
sensor_meta,
|
||||
state,
|
||||
state_class,
|
||||
)
|
||||
new_entities.append(entity)
|
||||
self._entities[sensor_type] = entity.unique_id
|
||||
|
|
Loading…
Add table
Reference in a new issue