hass-core/homeassistant/components/recorder/statistics.py

272 lines
9.1 KiB
Python
Raw Normal View History

2021-05-16 19:23:37 +02:00
"""Statistics helper."""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta
from itertools import groupby
import logging
from typing import TYPE_CHECKING
from sqlalchemy import bindparam
from sqlalchemy.ext import baked
from homeassistant.const import PRESSURE_PA, TEMP_CELSIUS
2021-05-16 19:23:37 +02:00
import homeassistant.util.dt as dt_util
import homeassistant.util.pressure as pressure_util
import homeassistant.util.temperature as temperature_util
2021-05-16 19:23:37 +02:00
from .const import DOMAIN
from .models import Statistics, StatisticsMeta, process_timestamp_to_utc_isoformat
2021-05-16 19:23:37 +02:00
from .util import execute, retryable_database_job, session_scope
if TYPE_CHECKING:
from . import Recorder
QUERY_STATISTICS = [
Statistics.statistic_id,
Statistics.start,
Statistics.mean,
Statistics.min,
Statistics.max,
Statistics.last_reset,
Statistics.state,
Statistics.sum,
2021-05-16 19:23:37 +02:00
]
QUERY_STATISTIC_IDS = [
Statistics.statistic_id,
]
QUERY_STATISTIC_META = [
StatisticsMeta.statistic_id,
StatisticsMeta.unit_of_measurement,
]
2021-05-16 19:23:37 +02:00
STATISTICS_BAKERY = "recorder_statistics_bakery"
STATISTICS_META_BAKERY = "recorder_statistics_bakery"
2021-05-16 19:23:37 +02:00
# Convert pressure and temperature statistics from the native unit used for statistics
# to the units configured by the user
UNIT_CONVERSIONS = {
PRESSURE_PA: lambda x, units: pressure_util.convert(
x, PRESSURE_PA, units.pressure_unit
)
if x is not None
else None,
TEMP_CELSIUS: lambda x, units: temperature_util.convert(
x, TEMP_CELSIUS, units.temperature_unit
)
if x is not None
else None,
}
2021-05-16 19:23:37 +02:00
_LOGGER = logging.getLogger(__name__)
def async_setup(hass):
"""Set up the history hooks."""
hass.data[STATISTICS_BAKERY] = baked.bakery()
hass.data[STATISTICS_META_BAKERY] = baked.bakery()
2021-05-16 19:23:37 +02:00
def get_start_time() -> datetime.datetime:
"""Return start time."""
last_hour = dt_util.utcnow() - timedelta(hours=1)
start = last_hour.replace(minute=0, second=0, microsecond=0)
return start
@retryable_database_job("statistics")
def compile_statistics(instance: Recorder, start: datetime.datetime) -> bool:
"""Compile statistics."""
start = dt_util.as_utc(start)
end = start + timedelta(hours=1)
_LOGGER.debug(
"Compiling statistics for %s-%s",
start,
end,
)
platform_stats = []
for domain, platform in instance.hass.data[DOMAIN].items():
if not hasattr(platform, "compile_statistics"):
continue
platform_stats.append(platform.compile_statistics(instance.hass, start, end))
_LOGGER.debug(
"Statistics for %s during %s-%s: %s", domain, start, end, platform_stats[-1]
)
with session_scope(session=instance.get_session()) as session: # type: ignore
for stats in platform_stats:
for entity_id, stat in stats.items():
session.add(
Statistics.from_stats(DOMAIN, entity_id, start, stat["stat"])
)
exists = session.query(
session.query(StatisticsMeta)
.filter_by(statistic_id=entity_id)
.exists()
).scalar()
if not exists:
unit = stat["meta"]["unit_of_measurement"]
session.add(StatisticsMeta.from_meta(DOMAIN, entity_id, unit))
2021-05-16 19:23:37 +02:00
return True
def _get_meta_data(hass, session, statistic_ids):
"""Fetch meta data."""
def _meta(metas, wanted_statistic_id):
meta = {"statistic_id": wanted_statistic_id, "unit_of_measurement": None}
for statistic_id, unit in metas:
if statistic_id == wanted_statistic_id:
meta["unit_of_measurement"] = unit
return meta
baked_query = hass.data[STATISTICS_META_BAKERY](
lambda session: session.query(*QUERY_STATISTIC_META)
)
if statistic_ids is not None:
baked_query += lambda q: q.filter(
StatisticsMeta.statistic_id.in_(bindparam("statistic_ids"))
)
result = execute(baked_query(session).params(statistic_ids=statistic_ids))
if statistic_ids is None:
statistic_ids = [statistic_id[0] for statistic_id in result]
return {id: _meta(result, id) for id in statistic_ids}
def _configured_unit(unit: str, units) -> str:
"""Return the pressure and temperature units configured by the user."""
if unit == PRESSURE_PA:
return units.pressure_unit
if unit == TEMP_CELSIUS:
return units.temperature_unit
return unit
def list_statistic_ids(hass, statistic_type=None):
"""Return statistic_ids and meta data."""
units = hass.config.units
with session_scope(hass=hass) as session:
baked_query = hass.data[STATISTICS_BAKERY](
lambda session: session.query(*QUERY_STATISTIC_IDS).distinct()
)
if statistic_type == "mean":
baked_query += lambda q: q.filter(Statistics.mean.isnot(None))
if statistic_type == "sum":
baked_query += lambda q: q.filter(Statistics.sum.isnot(None))
baked_query += lambda q: q.order_by(Statistics.statistic_id)
result = execute(baked_query(session))
statistic_ids = [statistic_id[0] for statistic_id in result]
meta_data = _get_meta_data(hass, session, statistic_ids)
for item in meta_data.values():
unit = _configured_unit(item["unit_of_measurement"], units)
item["unit_of_measurement"] = unit
return list(meta_data.values())
def statistics_during_period(hass, start_time, end_time=None, statistic_ids=None):
2021-05-16 19:23:37 +02:00
"""Return states changes during UTC period start_time - end_time."""
with session_scope(hass=hass) as session:
baked_query = hass.data[STATISTICS_BAKERY](
lambda session: session.query(*QUERY_STATISTICS)
)
baked_query += lambda q: q.filter(Statistics.start >= bindparam("start_time"))
if end_time is not None:
baked_query += lambda q: q.filter(Statistics.start < bindparam("end_time"))
if statistic_ids is not None:
baked_query += lambda q: q.filter(
Statistics.statistic_id.in_(bindparam("statistic_ids"))
)
statistic_ids = [statistic_id.lower() for statistic_id in statistic_ids]
2021-05-16 19:23:37 +02:00
baked_query += lambda q: q.order_by(Statistics.statistic_id, Statistics.start)
stats = execute(
baked_query(session).params(
start_time=start_time, end_time=end_time, statistic_ids=statistic_ids
2021-05-16 19:23:37 +02:00
)
)
meta_data = _get_meta_data(hass, session, statistic_ids)
return _sorted_statistics_to_dict(hass, stats, statistic_ids, meta_data)
def get_last_statistics(hass, number_of_stats, statistic_id=None):
"""Return the last number_of_stats statistics."""
with session_scope(hass=hass) as session:
baked_query = hass.data[STATISTICS_BAKERY](
lambda session: session.query(*QUERY_STATISTICS)
)
if statistic_id is not None:
baked_query += lambda q: q.filter_by(statistic_id=bindparam("statistic_id"))
baked_query += lambda q: q.order_by(
Statistics.statistic_id, Statistics.start.desc()
2021-05-16 19:23:37 +02:00
)
baked_query += lambda q: q.limit(bindparam("number_of_stats"))
stats = execute(
baked_query(session).params(
number_of_stats=number_of_stats, statistic_id=statistic_id
)
)
statistic_ids = [statistic_id] if statistic_id is not None else None
meta_data = _get_meta_data(hass, session, statistic_ids)
return _sorted_statistics_to_dict(hass, stats, statistic_ids, meta_data)
2021-05-16 19:23:37 +02:00
def _sorted_statistics_to_dict(
hass,
2021-05-16 19:23:37 +02:00
stats,
statistic_ids,
meta_data,
2021-05-16 19:23:37 +02:00
):
"""Convert SQL results into JSON friendly data structure."""
result = defaultdict(list)
units = hass.config.units
2021-05-16 19:23:37 +02:00
# Set all statistic IDs to empty lists in result set to maintain the order
if statistic_ids is not None:
for stat_id in statistic_ids:
result[stat_id] = []
# Called in a tight loop so cache the function here
2021-05-16 19:23:37 +02:00
_process_timestamp_to_utc_isoformat = process_timestamp_to_utc_isoformat
# Append all statistic entries, and do unit conversion
2021-05-16 19:23:37 +02:00
for ent_id, group in groupby(stats, lambda state: state.statistic_id):
unit = meta_data[ent_id]["unit_of_measurement"]
convert = UNIT_CONVERSIONS.get(unit, lambda x, units: x)
2021-05-16 19:23:37 +02:00
ent_results = result[ent_id]
ent_results.extend(
{
"statistic_id": db_state.statistic_id,
"start": _process_timestamp_to_utc_isoformat(db_state.start),
"mean": convert(db_state.mean, units),
"min": convert(db_state.min, units),
"max": convert(db_state.max, units),
"last_reset": _process_timestamp_to_utc_isoformat(db_state.last_reset),
"state": convert(db_state.state, units),
"sum": convert(db_state.sum, units),
2021-05-16 19:23:37 +02:00
}
for db_state in group
)
# Filter out the empty lists if some states had 0 results.
return {key: val for key, val in result.items() if val}