Add statistics meta data table (#52331)
* Add statistics meta data table * Tweak meta data generation
This commit is contained in:
parent
d8337cf98f
commit
0ab999738b
7 changed files with 126 additions and 25 deletions
|
@ -173,12 +173,12 @@ async def ws_get_list_statistic_ids(
|
||||||
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
|
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Fetch a list of available statistic_id."""
|
"""Fetch a list of available statistic_id."""
|
||||||
statistics = await hass.async_add_executor_job(
|
statistic_ids = await hass.async_add_executor_job(
|
||||||
list_statistic_ids,
|
list_statistic_ids,
|
||||||
hass,
|
hass,
|
||||||
msg.get("statistic_type"),
|
msg.get("statistic_type"),
|
||||||
)
|
)
|
||||||
connection.send_result(msg["id"], {"statistic_ids": statistics})
|
connection.send_result(msg["id"], statistic_ids)
|
||||||
|
|
||||||
|
|
||||||
class HistoryPeriodView(HomeAssistantView):
|
class HistoryPeriodView(HomeAssistantView):
|
||||||
|
|
|
@ -452,6 +452,11 @@ def _apply_update(engine, session, new_version, old_version):
|
||||||
_drop_foreign_key_constraints(
|
_drop_foreign_key_constraints(
|
||||||
connection, engine, TABLE_STATES, ["old_state_id"]
|
connection, engine, TABLE_STATES, ["old_state_id"]
|
||||||
)
|
)
|
||||||
|
elif new_version == 17:
|
||||||
|
if sqlalchemy.inspect(engine).has_table(Statistics.__tablename__):
|
||||||
|
# Recreate the statistics table
|
||||||
|
Statistics.__table__.drop(engine)
|
||||||
|
Statistics.__table__.create(engine)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"No schema migration defined for version {new_version}")
|
raise ValueError(f"No schema migration defined for version {new_version}")
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ import homeassistant.util.dt as dt_util
|
||||||
# pylint: disable=invalid-name
|
# pylint: disable=invalid-name
|
||||||
Base = declarative_base()
|
Base = declarative_base()
|
||||||
|
|
||||||
SCHEMA_VERSION = 16
|
SCHEMA_VERSION = 17
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -47,6 +47,7 @@ TABLE_STATES = "states"
|
||||||
TABLE_RECORDER_RUNS = "recorder_runs"
|
TABLE_RECORDER_RUNS = "recorder_runs"
|
||||||
TABLE_SCHEMA_CHANGES = "schema_changes"
|
TABLE_SCHEMA_CHANGES = "schema_changes"
|
||||||
TABLE_STATISTICS = "statistics"
|
TABLE_STATISTICS = "statistics"
|
||||||
|
TABLE_STATISTICS_META = "statistics_meta"
|
||||||
|
|
||||||
ALL_TABLES = [
|
ALL_TABLES = [
|
||||||
TABLE_STATES,
|
TABLE_STATES,
|
||||||
|
@ -54,6 +55,7 @@ ALL_TABLES = [
|
||||||
TABLE_RECORDER_RUNS,
|
TABLE_RECORDER_RUNS,
|
||||||
TABLE_SCHEMA_CHANGES,
|
TABLE_SCHEMA_CHANGES,
|
||||||
TABLE_STATISTICS,
|
TABLE_STATISTICS,
|
||||||
|
TABLE_STATISTICS_META,
|
||||||
]
|
]
|
||||||
|
|
||||||
DATETIME_TYPE = DateTime(timezone=True).with_variant(
|
DATETIME_TYPE = DateTime(timezone=True).with_variant(
|
||||||
|
@ -248,6 +250,28 @@ class Statistics(Base): # type: ignore
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class StatisticsMeta(Base): # type: ignore
|
||||||
|
"""Statistics meta data."""
|
||||||
|
|
||||||
|
__table_args__ = {
|
||||||
|
"mysql_default_charset": "utf8mb4",
|
||||||
|
"mysql_collate": "utf8mb4_unicode_ci",
|
||||||
|
}
|
||||||
|
__tablename__ = TABLE_STATISTICS_META
|
||||||
|
statistic_id = Column(String(255), primary_key=True)
|
||||||
|
source = Column(String(32))
|
||||||
|
unit_of_measurement = Column(String(255))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_meta(source, statistic_id, unit_of_measurement):
|
||||||
|
"""Create object from meta data."""
|
||||||
|
return StatisticsMeta(
|
||||||
|
source=source,
|
||||||
|
statistic_id=statistic_id,
|
||||||
|
unit_of_measurement=unit_of_measurement,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class RecorderRuns(Base): # type: ignore
|
class RecorderRuns(Base): # type: ignore
|
||||||
"""Representation of recorder run."""
|
"""Representation of recorder run."""
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@ from sqlalchemy.ext import baked
|
||||||
import homeassistant.util.dt as dt_util
|
import homeassistant.util.dt as dt_util
|
||||||
|
|
||||||
from .const import DOMAIN
|
from .const import DOMAIN
|
||||||
from .models import Statistics, process_timestamp_to_utc_isoformat
|
from .models import Statistics, StatisticsMeta, process_timestamp_to_utc_isoformat
|
||||||
from .util import execute, retryable_database_job, session_scope
|
from .util import execute, retryable_database_job, session_scope
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -34,7 +34,13 @@ QUERY_STATISTIC_IDS = [
|
||||||
Statistics.statistic_id,
|
Statistics.statistic_id,
|
||||||
]
|
]
|
||||||
|
|
||||||
|
QUERY_STATISTIC_META = [
|
||||||
|
StatisticsMeta.statistic_id,
|
||||||
|
StatisticsMeta.unit_of_measurement,
|
||||||
|
]
|
||||||
|
|
||||||
STATISTICS_BAKERY = "recorder_statistics_bakery"
|
STATISTICS_BAKERY = "recorder_statistics_bakery"
|
||||||
|
STATISTICS_META_BAKERY = "recorder_statistics_bakery"
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -42,6 +48,7 @@ _LOGGER = logging.getLogger(__name__)
|
||||||
def async_setup(hass):
|
def async_setup(hass):
|
||||||
"""Set up the history hooks."""
|
"""Set up the history hooks."""
|
||||||
hass.data[STATISTICS_BAKERY] = baked.bakery()
|
hass.data[STATISTICS_BAKERY] = baked.bakery()
|
||||||
|
hass.data[STATISTICS_META_BAKERY] = baked.bakery()
|
||||||
|
|
||||||
|
|
||||||
def get_start_time() -> datetime.datetime:
|
def get_start_time() -> datetime.datetime:
|
||||||
|
@ -73,11 +80,47 @@ def compile_statistics(instance: Recorder, start: datetime.datetime) -> bool:
|
||||||
with session_scope(session=instance.get_session()) as session: # type: ignore
|
with session_scope(session=instance.get_session()) as session: # type: ignore
|
||||||
for stats in platform_stats:
|
for stats in platform_stats:
|
||||||
for entity_id, stat in stats.items():
|
for entity_id, stat in stats.items():
|
||||||
session.add(Statistics.from_stats(DOMAIN, entity_id, start, stat))
|
session.add(
|
||||||
|
Statistics.from_stats(DOMAIN, entity_id, start, stat["stat"])
|
||||||
|
)
|
||||||
|
exists = session.query(
|
||||||
|
session.query(StatisticsMeta)
|
||||||
|
.filter_by(statistic_id=entity_id)
|
||||||
|
.exists()
|
||||||
|
).scalar()
|
||||||
|
if not exists:
|
||||||
|
unit = stat["meta"]["unit_of_measurement"]
|
||||||
|
session.add(StatisticsMeta.from_meta(DOMAIN, entity_id, unit))
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def _get_meta_data(hass, session, statistic_ids):
|
||||||
|
"""Fetch meta data."""
|
||||||
|
|
||||||
|
def _meta(metas, wanted_statistic_id):
|
||||||
|
meta = {"statistic_id": wanted_statistic_id, "unit_of_measurement": None}
|
||||||
|
for statistic_id, unit in metas:
|
||||||
|
if statistic_id == wanted_statistic_id:
|
||||||
|
meta["unit_of_measurement"] = unit
|
||||||
|
return meta
|
||||||
|
|
||||||
|
baked_query = hass.data[STATISTICS_META_BAKERY](
|
||||||
|
lambda session: session.query(*QUERY_STATISTIC_META)
|
||||||
|
)
|
||||||
|
if statistic_ids is not None:
|
||||||
|
baked_query += lambda q: q.filter(
|
||||||
|
StatisticsMeta.statistic_id.in_(bindparam("statistic_ids"))
|
||||||
|
)
|
||||||
|
|
||||||
|
result = execute(baked_query(session).params(statistic_ids=statistic_ids))
|
||||||
|
|
||||||
|
if statistic_ids is None:
|
||||||
|
statistic_ids = [statistic_id[0] for statistic_id in result]
|
||||||
|
|
||||||
|
return {id: _meta(result, id) for id in statistic_ids}
|
||||||
|
|
||||||
|
|
||||||
def list_statistic_ids(hass, statistic_type=None):
|
def list_statistic_ids(hass, statistic_type=None):
|
||||||
"""Return statistic_ids."""
|
"""Return statistic_ids."""
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
|
@ -92,10 +135,10 @@ def list_statistic_ids(hass, statistic_type=None):
|
||||||
|
|
||||||
baked_query += lambda q: q.order_by(Statistics.statistic_id)
|
baked_query += lambda q: q.order_by(Statistics.statistic_id)
|
||||||
|
|
||||||
statistic_ids = []
|
|
||||||
result = execute(baked_query(session))
|
result = execute(baked_query(session))
|
||||||
statistic_ids = [statistic_id[0] for statistic_id in result]
|
statistic_ids_list = [statistic_id[0] for statistic_id in result]
|
||||||
return statistic_ids
|
|
||||||
|
return list(_get_meta_data(hass, session, statistic_ids_list).values())
|
||||||
|
|
||||||
|
|
||||||
def statistics_during_period(hass, start_time, end_time=None, statistic_ids=None):
|
def statistics_during_period(hass, start_time, end_time=None, statistic_ids=None):
|
||||||
|
|
|
@ -22,6 +22,7 @@ from .models import (
|
||||||
TABLE_RECORDER_RUNS,
|
TABLE_RECORDER_RUNS,
|
||||||
TABLE_SCHEMA_CHANGES,
|
TABLE_SCHEMA_CHANGES,
|
||||||
TABLE_STATISTICS,
|
TABLE_STATISTICS,
|
||||||
|
TABLE_STATISTICS_META,
|
||||||
RecorderRuns,
|
RecorderRuns,
|
||||||
process_timestamp,
|
process_timestamp,
|
||||||
)
|
)
|
||||||
|
@ -179,7 +180,7 @@ def basic_sanity_check(cursor):
|
||||||
"""Check tables to make sure select does not fail."""
|
"""Check tables to make sure select does not fail."""
|
||||||
|
|
||||||
for table in ALL_TABLES:
|
for table in ALL_TABLES:
|
||||||
if table == TABLE_STATISTICS:
|
if table in [TABLE_STATISTICS, TABLE_STATISTICS_META]:
|
||||||
continue
|
continue
|
||||||
if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES):
|
if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES):
|
||||||
cursor.execute(f"SELECT * FROM {table};") # nosec # not injection
|
cursor.execute(f"SELECT * FROM {table};") # nosec # not injection
|
||||||
|
|
|
@ -30,6 +30,7 @@ from homeassistant.const import (
|
||||||
PRESSURE_MBAR,
|
PRESSURE_MBAR,
|
||||||
PRESSURE_PA,
|
PRESSURE_PA,
|
||||||
PRESSURE_PSI,
|
PRESSURE_PSI,
|
||||||
|
TEMP_CELSIUS,
|
||||||
)
|
)
|
||||||
from homeassistant.core import HomeAssistant, State
|
from homeassistant.core import HomeAssistant, State
|
||||||
import homeassistant.util.dt as dt_util
|
import homeassistant.util.dt as dt_util
|
||||||
|
@ -49,6 +50,13 @@ DEVICE_CLASS_STATISTICS = {
|
||||||
DEVICE_CLASS_TEMPERATURE: {"mean", "min", "max"},
|
DEVICE_CLASS_TEMPERATURE: {"mean", "min", "max"},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DEVICE_CLASS_UNITS = {
|
||||||
|
DEVICE_CLASS_ENERGY: ENERGY_KILO_WATT_HOUR,
|
||||||
|
DEVICE_CLASS_POWER: POWER_WATT,
|
||||||
|
DEVICE_CLASS_PRESSURE: PRESSURE_PA,
|
||||||
|
DEVICE_CLASS_TEMPERATURE: TEMP_CELSIUS,
|
||||||
|
}
|
||||||
|
|
||||||
UNIT_CONVERSIONS = {
|
UNIT_CONVERSIONS = {
|
||||||
DEVICE_CLASS_ENERGY: {
|
DEVICE_CLASS_ENERGY: {
|
||||||
ENERGY_KILO_WATT_HOUR: lambda x: x,
|
ENERGY_KILO_WATT_HOUR: lambda x: x,
|
||||||
|
@ -134,12 +142,16 @@ def _time_weighted_average(
|
||||||
|
|
||||||
def _normalize_states(
|
def _normalize_states(
|
||||||
entity_history: list[State], device_class: str, entity_id: str
|
entity_history: list[State], device_class: str, entity_id: str
|
||||||
) -> list[tuple[float, State]]:
|
) -> tuple[str | None, list[tuple[float, State]]]:
|
||||||
"""Normalize units."""
|
"""Normalize units."""
|
||||||
|
|
||||||
if device_class not in UNIT_CONVERSIONS:
|
if device_class not in UNIT_CONVERSIONS:
|
||||||
# We're not normalizing this device class, return the state as they are
|
# We're not normalizing this device class, return the state as they are
|
||||||
return [(float(el.state), el) for el in entity_history if _is_number(el.state)]
|
fstates = [
|
||||||
|
(float(el.state), el) for el in entity_history if _is_number(el.state)
|
||||||
|
]
|
||||||
|
unit = fstates[0][1].attributes.get(ATTR_UNIT_OF_MEASUREMENT)
|
||||||
|
return unit, fstates
|
||||||
|
|
||||||
fstates = []
|
fstates = []
|
||||||
|
|
||||||
|
@ -159,7 +171,7 @@ def _normalize_states(
|
||||||
|
|
||||||
fstates.append((UNIT_CONVERSIONS[device_class][unit](fstate), state)) # type: ignore
|
fstates.append((UNIT_CONVERSIONS[device_class][unit](fstate), state)) # type: ignore
|
||||||
|
|
||||||
return fstates
|
return DEVICE_CLASS_UNITS[device_class], fstates
|
||||||
|
|
||||||
|
|
||||||
def compile_statistics(
|
def compile_statistics(
|
||||||
|
@ -185,21 +197,25 @@ def compile_statistics(
|
||||||
continue
|
continue
|
||||||
|
|
||||||
entity_history = history_list[entity_id]
|
entity_history = history_list[entity_id]
|
||||||
fstates = _normalize_states(entity_history, device_class, entity_id)
|
unit, fstates = _normalize_states(entity_history, device_class, entity_id)
|
||||||
|
|
||||||
if not fstates:
|
if not fstates:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
result[entity_id] = {}
|
result[entity_id] = {}
|
||||||
|
|
||||||
|
# Set meta data
|
||||||
|
result[entity_id]["meta"] = {"unit_of_measurement": unit}
|
||||||
|
|
||||||
# Make calculations
|
# Make calculations
|
||||||
|
stat: dict = {}
|
||||||
if "max" in wanted_statistics:
|
if "max" in wanted_statistics:
|
||||||
result[entity_id]["max"] = max(*itertools.islice(zip(*fstates), 1))
|
stat["max"] = max(*itertools.islice(zip(*fstates), 1))
|
||||||
if "min" in wanted_statistics:
|
if "min" in wanted_statistics:
|
||||||
result[entity_id]["min"] = min(*itertools.islice(zip(*fstates), 1))
|
stat["min"] = min(*itertools.islice(zip(*fstates), 1))
|
||||||
|
|
||||||
if "mean" in wanted_statistics:
|
if "mean" in wanted_statistics:
|
||||||
result[entity_id]["mean"] = _time_weighted_average(fstates, start, end)
|
stat["mean"] = _time_weighted_average(fstates, start, end)
|
||||||
|
|
||||||
if "sum" in wanted_statistics:
|
if "sum" in wanted_statistics:
|
||||||
last_reset = old_last_reset = None
|
last_reset = old_last_reset = None
|
||||||
|
@ -233,8 +249,10 @@ def compile_statistics(
|
||||||
|
|
||||||
# Update the sum with the last state
|
# Update the sum with the last state
|
||||||
_sum += new_state - old_state
|
_sum += new_state - old_state
|
||||||
result[entity_id]["last_reset"] = dt_util.parse_datetime(last_reset)
|
stat["last_reset"] = dt_util.parse_datetime(last_reset)
|
||||||
result[entity_id]["sum"] = _sum
|
stat["sum"] = _sum
|
||||||
result[entity_id]["state"] = new_state
|
stat["state"] = new_state
|
||||||
|
|
||||||
|
result[entity_id]["stat"] = stat
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
|
@ -951,7 +951,11 @@ async def test_list_statistic_ids(hass, hass_ws_client):
|
||||||
hass.states.async_set(
|
hass.states.async_set(
|
||||||
"sensor.test",
|
"sensor.test",
|
||||||
10,
|
10,
|
||||||
attributes={"device_class": "temperature", "state_class": "measurement"},
|
attributes={
|
||||||
|
"device_class": "temperature",
|
||||||
|
"state_class": "measurement",
|
||||||
|
"unit_of_measurement": "°C",
|
||||||
|
},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
@ -962,7 +966,7 @@ async def test_list_statistic_ids(hass, hass_ws_client):
|
||||||
await client.send_json({"id": 1, "type": "history/list_statistic_ids"})
|
await client.send_json({"id": 1, "type": "history/list_statistic_ids"})
|
||||||
response = await client.receive_json()
|
response = await client.receive_json()
|
||||||
assert response["success"]
|
assert response["success"]
|
||||||
assert response["result"] == {"statistic_ids": []}
|
assert response["result"] == []
|
||||||
|
|
||||||
hass.data[recorder.DATA_INSTANCE].do_adhoc_statistics(period="hourly", start=now)
|
hass.data[recorder.DATA_INSTANCE].do_adhoc_statistics(period="hourly", start=now)
|
||||||
await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
|
await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
|
||||||
|
@ -970,25 +974,31 @@ async def test_list_statistic_ids(hass, hass_ws_client):
|
||||||
await client.send_json({"id": 2, "type": "history/list_statistic_ids"})
|
await client.send_json({"id": 2, "type": "history/list_statistic_ids"})
|
||||||
response = await client.receive_json()
|
response = await client.receive_json()
|
||||||
assert response["success"]
|
assert response["success"]
|
||||||
assert response["result"] == {"statistic_ids": ["sensor.test"]}
|
assert response["result"] == [
|
||||||
|
{"statistic_id": "sensor.test", "unit_of_measurement": "°C"}
|
||||||
|
]
|
||||||
|
|
||||||
await client.send_json(
|
await client.send_json(
|
||||||
{"id": 3, "type": "history/list_statistic_ids", "statistic_type": "dogs"}
|
{"id": 3, "type": "history/list_statistic_ids", "statistic_type": "dogs"}
|
||||||
)
|
)
|
||||||
response = await client.receive_json()
|
response = await client.receive_json()
|
||||||
assert response["success"]
|
assert response["success"]
|
||||||
assert response["result"] == {"statistic_ids": ["sensor.test"]}
|
assert response["result"] == [
|
||||||
|
{"statistic_id": "sensor.test", "unit_of_measurement": "°C"}
|
||||||
|
]
|
||||||
|
|
||||||
await client.send_json(
|
await client.send_json(
|
||||||
{"id": 4, "type": "history/list_statistic_ids", "statistic_type": "mean"}
|
{"id": 4, "type": "history/list_statistic_ids", "statistic_type": "mean"}
|
||||||
)
|
)
|
||||||
response = await client.receive_json()
|
response = await client.receive_json()
|
||||||
assert response["success"]
|
assert response["success"]
|
||||||
assert response["result"] == {"statistic_ids": ["sensor.test"]}
|
assert response["result"] == [
|
||||||
|
{"statistic_id": "sensor.test", "unit_of_measurement": "°C"}
|
||||||
|
]
|
||||||
|
|
||||||
await client.send_json(
|
await client.send_json(
|
||||||
{"id": 5, "type": "history/list_statistic_ids", "statistic_type": "sum"}
|
{"id": 5, "type": "history/list_statistic_ids", "statistic_type": "sum"}
|
||||||
)
|
)
|
||||||
response = await client.receive_json()
|
response = await client.receive_json()
|
||||||
assert response["success"]
|
assert response["success"]
|
||||||
assert response["result"] == {"statistic_ids": []}
|
assert response["result"] == []
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue