Add 5-minute statistics for sensors (#56006)

* Add 5-minute statistics for sensors

* Address pylint issues

* Black

* Apply suggestion from code review

* Apply suggestions from code review

* Improve tests
This commit is contained in:
Erik Montnemery 2021-09-16 10:57:15 +02:00 committed by GitHub
parent 0656407561
commit 8c5efafdd8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 781 additions and 297 deletions

View file

@ -124,6 +124,7 @@ class LazyState(history_models.LazyState):
vol.Required("start_time"): str, vol.Required("start_time"): str,
vol.Optional("end_time"): str, vol.Optional("end_time"): str,
vol.Optional("statistic_ids"): [str], vol.Optional("statistic_ids"): [str],
vol.Required("period"): vol.Any("hour", "5minute"),
} }
) )
@websocket_api.async_response @websocket_api.async_response
@ -157,6 +158,7 @@ async def ws_get_statistics_during_period(
start_time, start_time,
end_time, end_time,
msg.get("statistic_ids"), msg.get("statistic_ids"),
msg.get("period"),
) )
connection.send_result(msg["id"], statistics) connection.send_result(msg["id"], statistics)

View file

@ -565,7 +565,7 @@ class Recorder(threading.Thread):
self.queue.put(PerodicCleanupTask()) self.queue.put(PerodicCleanupTask())
@callback @callback
def async_hourly_statistics(self, now): def async_periodic_statistics(self, now):
"""Trigger the hourly statistics run.""" """Trigger the hourly statistics run."""
start = statistics.get_start_time() start = statistics.get_start_time()
self.queue.put(StatisticsTask(start)) self.queue.put(StatisticsTask(start))
@ -582,9 +582,9 @@ class Recorder(threading.Thread):
self.hass, self.async_nightly_tasks, hour=4, minute=12, second=0 self.hass, self.async_nightly_tasks, hour=4, minute=12, second=0
) )
# Compile hourly statistics every hour at *:12 # Compile short term statistics every 5 minutes
async_track_time_change( async_track_time_change(
self.hass, self.async_hourly_statistics, minute=12, second=0 self.hass, self.async_periodic_statistics, minute=range(0, 60, 5), second=10
) )
def run(self): def run(self):
@ -995,20 +995,21 @@ class Recorder(threading.Thread):
def _schedule_compile_missing_statistics(self, session: Session) -> None: def _schedule_compile_missing_statistics(self, session: Session) -> None:
"""Add tasks for missing statistics runs.""" """Add tasks for missing statistics runs."""
now = dt_util.utcnow() now = dt_util.utcnow()
last_hour = now.replace(minute=0, second=0, microsecond=0) last_period_minutes = now.minute - now.minute % 5
last_period = now.replace(minute=last_period_minutes, second=0, microsecond=0)
start = now - timedelta(days=self.keep_days) start = now - timedelta(days=self.keep_days)
start = start.replace(minute=0, second=0, microsecond=0) start = start.replace(minute=0, second=0, microsecond=0)
# Find the newest statistics run, if any # Find the newest statistics run, if any
if last_run := session.query(func.max(StatisticsRuns.start)).scalar(): if last_run := session.query(func.max(StatisticsRuns.start)).scalar():
start = max(start, process_timestamp(last_run) + timedelta(hours=1)) start = max(start, process_timestamp(last_run) + timedelta(minutes=5))
# Add tasks # Add tasks
while start < last_hour: while start < last_period:
end = start + timedelta(hours=1) end = start + timedelta(minutes=5)
_LOGGER.debug("Compiling missing statistics for %s-%s", start, end) _LOGGER.debug("Compiling missing statistics for %s-%s", start, end)
self.queue.put(StatisticsTask(start)) self.queue.put(StatisticsTask(start))
start = start + timedelta(hours=1) start = end
def _end_session(self): def _end_session(self):
"""End the recorder session.""" """End the recorder session."""

View file

@ -1,6 +1,5 @@
"""Schema migration helpers.""" """Schema migration helpers."""
import contextlib import contextlib
from datetime import timedelta
import logging import logging
import sqlalchemy import sqlalchemy
@ -13,8 +12,6 @@ from sqlalchemy.exc import (
) )
from sqlalchemy.schema import AddConstraint, DropConstraint from sqlalchemy.schema import AddConstraint, DropConstraint
import homeassistant.util.dt as dt_util
from .models import ( from .models import (
SCHEMA_VERSION, SCHEMA_VERSION,
TABLE_STATES, TABLE_STATES,
@ -24,6 +21,7 @@ from .models import (
StatisticsMeta, StatisticsMeta,
StatisticsRuns, StatisticsRuns,
) )
from .statistics import get_start_time
from .util import session_scope from .util import session_scope
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -483,10 +481,7 @@ def _apply_update(engine, session, new_version, old_version): # noqa: C901
elif new_version == 19: elif new_version == 19:
# This adds the statistic runs table, insert a fake run to prevent duplicating # This adds the statistic runs table, insert a fake run to prevent duplicating
# statistics. # statistics.
now = dt_util.utcnow() session.add(StatisticsRuns(start=get_start_time()))
start = now.replace(minute=0, second=0, microsecond=0)
start = start - timedelta(hours=1)
session.add(StatisticsRuns(start=start))
elif new_version == 20: elif new_version == 20:
# This changed the precision of statistics from float to double # This changed the precision of statistics from float to double
if engine.dialect.name in ["mysql", "oracle", "postgresql"]: if engine.dialect.name in ["mysql", "oracle", "postgresql"]:
@ -537,10 +532,7 @@ def _inspect_schema_version(engine, session):
for index in indexes: for index in indexes:
if index["column_names"] == ["time_fired"]: if index["column_names"] == ["time_fired"]:
# Schema addition from version 1 detected. New DB. # Schema addition from version 1 detected. New DB.
now = dt_util.utcnow() session.add(StatisticsRuns(start=get_start_time()))
start = now.replace(minute=0, second=0, microsecond=0)
start = start - timedelta(hours=1)
session.add(StatisticsRuns(start=start))
session.add(SchemaChanges(schema_version=SCHEMA_VERSION)) session.add(SchemaChanges(schema_version=SCHEMA_VERSION))
return SCHEMA_VERSION return SCHEMA_VERSION

View file

@ -1,7 +1,7 @@
"""Models for SQLAlchemy.""" """Models for SQLAlchemy."""
from __future__ import annotations from __future__ import annotations
from datetime import datetime from datetime import datetime, timedelta
import json import json
import logging import logging
from typing import TypedDict, overload from typing import TypedDict, overload
@ -20,6 +20,7 @@ from sqlalchemy import (
distinct, distinct,
) )
from sqlalchemy.dialects import mysql, oracle, postgresql from sqlalchemy.dialects import mysql, oracle, postgresql
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import declarative_base, relationship from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
@ -52,6 +53,7 @@ TABLE_SCHEMA_CHANGES = "schema_changes"
TABLE_STATISTICS = "statistics" TABLE_STATISTICS = "statistics"
TABLE_STATISTICS_META = "statistics_meta" TABLE_STATISTICS_META = "statistics_meta"
TABLE_STATISTICS_RUNS = "statistics_runs" TABLE_STATISTICS_RUNS = "statistics_runs"
TABLE_STATISTICS_SHORT_TERM = "statistics_short_term"
ALL_TABLES = [ ALL_TABLES = [
TABLE_STATES, TABLE_STATES,
@ -61,6 +63,7 @@ ALL_TABLES = [
TABLE_STATISTICS, TABLE_STATISTICS,
TABLE_STATISTICS_META, TABLE_STATISTICS_META,
TABLE_STATISTICS_RUNS, TABLE_STATISTICS_RUNS,
TABLE_STATISTICS_SHORT_TERM,
] ]
DATETIME_TYPE = DateTime(timezone=True).with_variant( DATETIME_TYPE = DateTime(timezone=True).with_variant(
@ -232,21 +235,21 @@ class StatisticData(TypedDict, total=False):
sum_increase: float sum_increase: float
class Statistics(Base): # type: ignore class StatisticsBase:
"""Statistics.""" """Statistics base class."""
__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index("ix_statistics_statistic_id_start", "metadata_id", "start"),
)
__tablename__ = TABLE_STATISTICS
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
created = Column(DATETIME_TYPE, default=dt_util.utcnow) created = Column(DATETIME_TYPE, default=dt_util.utcnow)
metadata_id = Column(
Integer, @declared_attr
ForeignKey(f"{TABLE_STATISTICS_META}.id", ondelete="CASCADE"), def metadata_id(self):
index=True, """Define the metadata_id column for sub classes."""
) return Column(
Integer,
ForeignKey(f"{TABLE_STATISTICS_META}.id", ondelete="CASCADE"),
index=True,
)
start = Column(DATETIME_TYPE, index=True) start = Column(DATETIME_TYPE, index=True)
mean = Column(DOUBLE_TYPE) mean = Column(DOUBLE_TYPE)
min = Column(DOUBLE_TYPE) min = Column(DOUBLE_TYPE)
@ -256,16 +259,40 @@ class Statistics(Base): # type: ignore
sum = Column(DOUBLE_TYPE) sum = Column(DOUBLE_TYPE)
sum_increase = Column(DOUBLE_TYPE) sum_increase = Column(DOUBLE_TYPE)
@staticmethod @classmethod
def from_stats(metadata_id: str, start: datetime, stats: StatisticData): def from_stats(cls, metadata_id: str, start: datetime, stats: StatisticData):
"""Create object from a statistics.""" """Create object from a statistics."""
return Statistics( return cls( # type: ignore
metadata_id=metadata_id, metadata_id=metadata_id,
start=start, start=start,
**stats, **stats,
) )
class Statistics(Base, StatisticsBase): # type: ignore
"""Long term statistics."""
duration = timedelta(hours=1)
__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index("ix_statistics_statistic_id_start", "metadata_id", "start"),
)
__tablename__ = TABLE_STATISTICS
class StatisticsShortTerm(Base, StatisticsBase): # type: ignore
"""Short term statistics."""
duration = timedelta(minutes=5)
__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index("ix_statistics_short_term_statistic_id_start", "metadata_id", "start"),
)
__tablename__ = TABLE_STATISTICS_SHORT_TERM
class StatisticMetaData(TypedDict, total=False): class StatisticMetaData(TypedDict, total=False):
"""Statistic meta data class.""" """Statistic meta data class."""
@ -401,7 +428,7 @@ def process_timestamp(ts: datetime) -> datetime:
... ...
def process_timestamp(ts): def process_timestamp(ts: datetime | None) -> datetime | None:
"""Process a timestamp into datetime object.""" """Process a timestamp into datetime object."""
if ts is None: if ts is None:
return None return None

View file

@ -2,13 +2,14 @@
from __future__ import annotations from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from collections.abc import Iterable
import dataclasses import dataclasses
from datetime import datetime, timedelta from datetime import datetime, timedelta
from itertools import groupby from itertools import groupby
import logging import logging
from typing import TYPE_CHECKING, Any, Callable from typing import TYPE_CHECKING, Any, Callable
from sqlalchemy import bindparam from sqlalchemy import bindparam, func
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext import baked from sqlalchemy.ext import baked
from sqlalchemy.orm.scoping import scoped_session from sqlalchemy.orm.scoping import scoped_session
@ -33,6 +34,7 @@ from .models import (
Statistics, Statistics,
StatisticsMeta, StatisticsMeta,
StatisticsRuns, StatisticsRuns,
StatisticsShortTerm,
process_timestamp, process_timestamp,
process_timestamp_to_utc_isoformat, process_timestamp_to_utc_isoformat,
) )
@ -53,6 +55,40 @@ QUERY_STATISTICS = [
Statistics.sum_increase, Statistics.sum_increase,
] ]
QUERY_STATISTICS_SHORT_TERM = [
StatisticsShortTerm.metadata_id,
StatisticsShortTerm.start,
StatisticsShortTerm.mean,
StatisticsShortTerm.min,
StatisticsShortTerm.max,
StatisticsShortTerm.last_reset,
StatisticsShortTerm.state,
StatisticsShortTerm.sum,
StatisticsShortTerm.sum_increase,
]
QUERY_STATISTICS_SUMMARY_MEAN = [
StatisticsShortTerm.metadata_id,
func.avg(StatisticsShortTerm.mean),
func.min(StatisticsShortTerm.min),
func.max(StatisticsShortTerm.max),
]
QUERY_STATISTICS_SUMMARY_SUM = [
StatisticsShortTerm.metadata_id,
StatisticsShortTerm.start,
StatisticsShortTerm.last_reset,
StatisticsShortTerm.state,
StatisticsShortTerm.sum,
StatisticsShortTerm.sum_increase,
func.row_number()
.over(
partition_by=StatisticsShortTerm.metadata_id,
order_by=StatisticsShortTerm.start.desc(),
)
.label("rownum"),
]
QUERY_STATISTIC_META = [ QUERY_STATISTIC_META = [
StatisticsMeta.id, StatisticsMeta.id,
StatisticsMeta.statistic_id, StatisticsMeta.statistic_id,
@ -67,7 +103,9 @@ QUERY_STATISTIC_META_ID = [
] ]
STATISTICS_BAKERY = "recorder_statistics_bakery" STATISTICS_BAKERY = "recorder_statistics_bakery"
STATISTICS_META_BAKERY = "recorder_statistics_bakery" STATISTICS_META_BAKERY = "recorder_statistics_meta_bakery"
STATISTICS_SHORT_TERM_BAKERY = "recorder_statistics_short_term_bakery"
# Convert pressure and temperature statistics from the native unit used for statistics # Convert pressure and temperature statistics from the native unit used for statistics
# to the units configured by the user # to the units configured by the user
@ -108,6 +146,7 @@ def async_setup(hass: HomeAssistant) -> None:
"""Set up the history hooks.""" """Set up the history hooks."""
hass.data[STATISTICS_BAKERY] = baked.bakery() hass.data[STATISTICS_BAKERY] = baked.bakery()
hass.data[STATISTICS_META_BAKERY] = baked.bakery() hass.data[STATISTICS_META_BAKERY] = baked.bakery()
hass.data[STATISTICS_SHORT_TERM_BAKERY] = baked.bakery()
def entity_id_changed(event: Event) -> None: def entity_id_changed(event: Event) -> None:
"""Handle entity_id changed.""" """Handle entity_id changed."""
@ -137,9 +176,11 @@ def async_setup(hass: HomeAssistant) -> None:
def get_start_time() -> datetime: def get_start_time() -> datetime:
"""Return start time.""" """Return start time."""
last_hour = dt_util.utcnow() - timedelta(hours=1) now = dt_util.utcnow()
start = last_hour.replace(minute=0, second=0, microsecond=0) current_period_minutes = now.minute - now.minute % 5
return start current_period = now.replace(minute=current_period_minutes, second=0, microsecond=0)
last_period = current_period - timedelta(minutes=5)
return last_period
def _get_metadata_ids( def _get_metadata_ids(
@ -204,11 +245,76 @@ def _update_or_add_metadata(
return metadata_id return metadata_id
def compile_hourly_statistics(
instance: Recorder, session: scoped_session, start: datetime
) -> None:
"""Compile hourly statistics."""
start_time = start.replace(minute=0)
end_time = start_time + timedelta(hours=1)
# Get last hour's average, min, max
summary = {}
baked_query = instance.hass.data[STATISTICS_SHORT_TERM_BAKERY](
lambda session: session.query(*QUERY_STATISTICS_SUMMARY_MEAN)
)
baked_query += lambda q: q.filter(
StatisticsShortTerm.start >= bindparam("start_time")
)
baked_query += lambda q: q.filter(StatisticsShortTerm.start < bindparam("end_time"))
baked_query += lambda q: q.group_by(StatisticsShortTerm.metadata_id)
baked_query += lambda q: q.order_by(StatisticsShortTerm.metadata_id)
stats = execute(
baked_query(session).params(start_time=start_time, end_time=end_time)
)
if stats:
for stat in stats:
metadata_id, _mean, _min, _max = stat
summary[metadata_id] = {
"metadata_id": metadata_id,
"mean": _mean,
"min": _min,
"max": _max,
}
# Get last hour's sum
subquery = (
session.query(*QUERY_STATISTICS_SUMMARY_SUM)
.filter(StatisticsShortTerm.start >= bindparam("start_time"))
.filter(StatisticsShortTerm.start < bindparam("end_time"))
.subquery()
)
query = (
session.query(subquery)
.filter(subquery.c.rownum == 1)
.order_by(subquery.c.metadata_id)
)
stats = execute(query.params(start_time=start_time, end_time=end_time))
if stats:
for stat in stats:
metadata_id, start, last_reset, state, _sum, sum_increase, _ = stat
summary[metadata_id] = {
**summary.get(metadata_id, {}),
**{
"metadata_id": metadata_id,
"last_reset": process_timestamp(last_reset),
"state": state,
"sum": _sum,
"sum_increase": sum_increase,
},
}
for stat in summary.values():
session.add(Statistics.from_stats(stat.pop("metadata_id"), start_time, stat))
@retryable_database_job("statistics") @retryable_database_job("statistics")
def compile_statistics(instance: Recorder, start: datetime) -> bool: def compile_statistics(instance: Recorder, start: datetime) -> bool:
"""Compile statistics.""" """Compile statistics."""
start = dt_util.as_utc(start) start = dt_util.as_utc(start)
end = start + timedelta(hours=1) end = start + timedelta(minutes=5)
with session_scope(session=instance.get_session()) as session: # type: ignore with session_scope(session=instance.get_session()) as session: # type: ignore
if session.query(StatisticsRuns).filter_by(start=start).first(): if session.query(StatisticsRuns).filter_by(start=start).first():
@ -232,13 +338,20 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
instance.hass, session, entity_id, stat["meta"] instance.hass, session, entity_id, stat["meta"]
) )
try: try:
session.add(Statistics.from_stats(metadata_id, start, stat["stat"])) session.add(
StatisticsShortTerm.from_stats(metadata_id, start, stat["stat"])
)
except SQLAlchemyError: except SQLAlchemyError:
_LOGGER.exception( _LOGGER.exception(
"Unexpected exception when inserting statistics %s:%s ", "Unexpected exception when inserting statistics %s:%s ",
metadata_id, metadata_id,
stat, stat,
) )
if start.minute == 55:
# A full hour is ready, summarize it
compile_hourly_statistics(instance, session, start)
session.add(StatisticsRuns(start=start)) session.add(StatisticsRuns(start=start))
return True return True
@ -354,11 +467,36 @@ def list_statistic_ids(
] ]
def _statistics_during_period_query(
hass: HomeAssistant,
end_time: datetime | None,
statistic_ids: list[str] | None,
bakery: Any,
base_query: Iterable,
table: type[Statistics | StatisticsShortTerm],
) -> Callable:
baked_query = hass.data[bakery](lambda session: session.query(*base_query))
baked_query += lambda q: q.filter(table.start >= bindparam("start_time"))
if end_time is not None:
baked_query += lambda q: q.filter(table.start < bindparam("end_time"))
if statistic_ids is not None:
baked_query += lambda q: q.filter(
table.metadata_id.in_(bindparam("metadata_ids"))
)
baked_query += lambda q: q.order_by(table.metadata_id, table.start)
return baked_query # type: ignore[no-any-return]
def statistics_during_period( def statistics_during_period(
hass: HomeAssistant, hass: HomeAssistant,
start_time: datetime, start_time: datetime,
end_time: datetime | None = None, end_time: datetime | None = None,
statistic_ids: list[str] | None = None, statistic_ids: list[str] | None = None,
period: str = "hour",
) -> dict[str, list[dict[str, str]]]: ) -> dict[str, list[dict[str, str]]]:
"""Return states changes during UTC period start_time - end_time.""" """Return states changes during UTC period start_time - end_time."""
metadata = None metadata = None
@ -367,23 +505,22 @@ def statistics_during_period(
if not metadata: if not metadata:
return {} return {}
baked_query = hass.data[STATISTICS_BAKERY](
lambda session: session.query(*QUERY_STATISTICS)
)
baked_query += lambda q: q.filter(Statistics.start >= bindparam("start_time"))
if end_time is not None:
baked_query += lambda q: q.filter(Statistics.start < bindparam("end_time"))
metadata_ids = None metadata_ids = None
if statistic_ids is not None: if statistic_ids is not None:
baked_query += lambda q: q.filter(
Statistics.metadata_id.in_(bindparam("metadata_ids"))
)
metadata_ids = list(metadata.keys()) metadata_ids = list(metadata.keys())
baked_query += lambda q: q.order_by(Statistics.metadata_id, Statistics.start) if period == "hour":
bakery = STATISTICS_BAKERY
base_query = QUERY_STATISTICS
table = Statistics
else:
bakery = STATISTICS_SHORT_TERM_BAKERY
base_query = QUERY_STATISTICS_SHORT_TERM
table = StatisticsShortTerm
baked_query = _statistics_during_period_query(
hass, end_time, statistic_ids, bakery, base_query, table
)
stats = execute( stats = execute(
baked_query(session).params( baked_query(session).params(
@ -392,7 +529,9 @@ def statistics_during_period(
) )
if not stats: if not stats:
return {} return {}
return _sorted_statistics_to_dict(hass, stats, statistic_ids, metadata, True) return _sorted_statistics_to_dict(
hass, stats, statistic_ids, metadata, True, table.duration
)
def get_last_statistics( def get_last_statistics(
@ -405,15 +544,15 @@ def get_last_statistics(
if not metadata: if not metadata:
return {} return {}
baked_query = hass.data[STATISTICS_BAKERY]( baked_query = hass.data[STATISTICS_SHORT_TERM_BAKERY](
lambda session: session.query(*QUERY_STATISTICS) lambda session: session.query(*QUERY_STATISTICS_SHORT_TERM)
) )
baked_query += lambda q: q.filter_by(metadata_id=bindparam("metadata_id")) baked_query += lambda q: q.filter_by(metadata_id=bindparam("metadata_id"))
metadata_id = next(iter(metadata.keys())) metadata_id = next(iter(metadata.keys()))
baked_query += lambda q: q.order_by( baked_query += lambda q: q.order_by(
Statistics.metadata_id, Statistics.start.desc() StatisticsShortTerm.metadata_id, StatisticsShortTerm.start.desc()
) )
baked_query += lambda q: q.limit(bindparam("number_of_stats")) baked_query += lambda q: q.limit(bindparam("number_of_stats"))
@ -427,7 +566,12 @@ def get_last_statistics(
return {} return {}
return _sorted_statistics_to_dict( return _sorted_statistics_to_dict(
hass, stats, statistic_ids, metadata, convert_units hass,
stats,
statistic_ids,
metadata,
convert_units,
StatisticsShortTerm.duration,
) )
@ -437,6 +581,7 @@ def _sorted_statistics_to_dict(
statistic_ids: list[str] | None, statistic_ids: list[str] | None,
metadata: dict[str, StatisticMetaData], metadata: dict[str, StatisticMetaData],
convert_units: bool, convert_units: bool,
duration: timedelta,
) -> dict[str, list[dict]]: ) -> dict[str, list[dict]]:
"""Convert SQL results into JSON friendly data structure.""" """Convert SQL results into JSON friendly data structure."""
result: dict = defaultdict(list) result: dict = defaultdict(list)
@ -463,7 +608,7 @@ def _sorted_statistics_to_dict(
ent_results = result[meta_id] ent_results = result[meta_id]
for db_state in group: for db_state in group:
start = process_timestamp(db_state.start) start = process_timestamp(db_state.start)
end = start + timedelta(hours=1) end = start + duration
ent_results.append( ent_results.append(
{ {
"statistic_id": statistic_id, "statistic_id": statistic_id,

View file

@ -25,6 +25,7 @@ from .models import (
TABLE_STATISTICS, TABLE_STATISTICS,
TABLE_STATISTICS_META, TABLE_STATISTICS_META,
TABLE_STATISTICS_RUNS, TABLE_STATISTICS_RUNS,
TABLE_STATISTICS_SHORT_TERM,
RecorderRuns, RecorderRuns,
process_timestamp, process_timestamp,
) )
@ -185,7 +186,12 @@ def basic_sanity_check(cursor):
for table in ALL_TABLES: for table in ALL_TABLES:
# The statistics tables may not be present in old databases # The statistics tables may not be present in old databases
if table in [TABLE_STATISTICS, TABLE_STATISTICS_META, TABLE_STATISTICS_RUNS]: if table in [
TABLE_STATISTICS,
TABLE_STATISTICS_META,
TABLE_STATISTICS_RUNS,
TABLE_STATISTICS_SHORT_TERM,
]:
continue continue
if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES): if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES):
cursor.execute(f"SELECT * FROM {table};") # nosec # not injection cursor.execute(f"SELECT * FROM {table};") # nosec # not injection

View file

@ -875,7 +875,7 @@ async def test_statistics_during_period(
await hass.async_add_executor_job(trigger_db_commit, hass) await hass.async_add_executor_job(trigger_db_commit, hass)
await hass.async_block_till_done() await hass.async_block_till_done()
hass.data[recorder.DATA_INSTANCE].do_adhoc_statistics(period="hourly", start=now) hass.data[recorder.DATA_INSTANCE].do_adhoc_statistics(start=now)
await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done) await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
client = await hass_ws_client() client = await hass_ws_client()
@ -886,19 +886,20 @@ async def test_statistics_during_period(
"start_time": now.isoformat(), "start_time": now.isoformat(),
"end_time": now.isoformat(), "end_time": now.isoformat(),
"statistic_ids": ["sensor.test"], "statistic_ids": ["sensor.test"],
"period": "hour",
} }
) )
response = await client.receive_json() response = await client.receive_json()
assert response["success"] assert response["success"]
assert response["result"] == {} assert response["result"] == {}
client = await hass_ws_client()
await client.send_json( await client.send_json(
{ {
"id": 1, "id": 2,
"type": "history/statistics_during_period", "type": "history/statistics_during_period",
"start_time": now.isoformat(), "start_time": now.isoformat(),
"statistic_ids": ["sensor.test"], "statistic_ids": ["sensor.test"],
"period": "5minute",
} }
) )
response = await client.receive_json() response = await client.receive_json()
@ -908,7 +909,7 @@ async def test_statistics_during_period(
{ {
"statistic_id": "sensor.test", "statistic_id": "sensor.test",
"start": now.isoformat(), "start": now.isoformat(),
"end": (now + timedelta(hours=1)).isoformat(), "end": (now + timedelta(minutes=5)).isoformat(),
"mean": approx(value), "mean": approx(value),
"min": approx(value), "min": approx(value),
"max": approx(value), "max": approx(value),
@ -938,6 +939,7 @@ async def test_statistics_during_period_bad_start_time(hass, hass_ws_client):
"id": 1, "id": 1,
"type": "history/statistics_during_period", "type": "history/statistics_during_period",
"start_time": "cats", "start_time": "cats",
"period": "5minute",
} }
) )
response = await client.receive_json() response = await client.receive_json()
@ -964,6 +966,7 @@ async def test_statistics_during_period_bad_end_time(hass, hass_ws_client):
"type": "history/statistics_during_period", "type": "history/statistics_during_period",
"start_time": now.isoformat(), "start_time": now.isoformat(),
"end_time": "dogs", "end_time": "dogs",
"period": "5minute",
} }
) )
response = await client.receive_json() response = await client.receive_json()
@ -1011,7 +1014,7 @@ async def test_list_statistic_ids(hass, hass_ws_client, units, attributes, unit)
{"statistic_id": "sensor.test", "unit_of_measurement": unit} {"statistic_id": "sensor.test", "unit_of_measurement": unit}
] ]
hass.data[recorder.DATA_INSTANCE].do_adhoc_statistics(period="hourly", start=now) hass.data[recorder.DATA_INSTANCE].do_adhoc_statistics(start=now)
await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done) await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
# Remove the state, statistics will now be fetched from the database # Remove the state, statistics will now be fetched from the database
hass.states.async_remove("sensor.test") hass.states.async_remove("sensor.test")

View file

@ -30,9 +30,11 @@ async def async_setup_recorder_instance(
hass: HomeAssistant, config: ConfigType | None = None hass: HomeAssistant, config: ConfigType | None = None
) -> Recorder: ) -> Recorder:
"""Setup and return recorder instance.""" # noqa: D401 """Setup and return recorder instance.""" # noqa: D401
stats = recorder.Recorder.async_hourly_statistics if enable_statistics else None stats = (
recorder.Recorder.async_periodic_statistics if enable_statistics else None
)
with patch( with patch(
"homeassistant.components.recorder.Recorder.async_hourly_statistics", "homeassistant.components.recorder.Recorder.async_periodic_statistics",
side_effect=stats, side_effect=stats,
autospec=True, autospec=True,
): ):

View file

@ -700,41 +700,41 @@ def test_auto_statistics(hass_recorder):
tz = dt_util.get_time_zone("Europe/Copenhagen") tz = dt_util.get_time_zone("Europe/Copenhagen")
dt_util.set_default_time_zone(tz) dt_util.set_default_time_zone(tz)
# Statistics is scheduled to happen at *:12am every hour. Exercise this behavior by # Statistics is scheduled to happen every 5 minutes. Exercise this behavior by
# firing time changed events and advancing the clock around this time. Pick an # firing time changed events and advancing the clock around this time. Pick an
# arbitrary year in the future to avoid boundary conditions relative to the current # arbitrary year in the future to avoid boundary conditions relative to the current
# date. # date.
# #
# The clock is started at 4:15am then advanced forward below # The clock is started at 4:16am then advanced forward below
now = dt_util.utcnow() now = dt_util.utcnow()
test_time = datetime(now.year + 2, 1, 1, 4, 15, 0, tzinfo=tz) test_time = datetime(now.year + 2, 1, 1, 4, 16, 0, tzinfo=tz)
run_tasks_at_time(hass, test_time) run_tasks_at_time(hass, test_time)
with patch( with patch(
"homeassistant.components.recorder.statistics.compile_statistics", "homeassistant.components.recorder.statistics.compile_statistics",
return_value=True, return_value=True,
) as compile_statistics: ) as compile_statistics:
# Advance one hour, and the statistics task should run # Advance 5 minutes, and the statistics task should run
test_time = test_time + timedelta(hours=1) test_time = test_time + timedelta(minutes=5)
run_tasks_at_time(hass, test_time) run_tasks_at_time(hass, test_time)
assert len(compile_statistics.mock_calls) == 1 assert len(compile_statistics.mock_calls) == 1
compile_statistics.reset_mock() compile_statistics.reset_mock()
# Advance one hour, and the statistics task should run again # Advance 5 minutes, and the statistics task should run again
test_time = test_time + timedelta(hours=1) test_time = test_time + timedelta(minutes=5)
run_tasks_at_time(hass, test_time) run_tasks_at_time(hass, test_time)
assert len(compile_statistics.mock_calls) == 1 assert len(compile_statistics.mock_calls) == 1
compile_statistics.reset_mock() compile_statistics.reset_mock()
# Advance less than one full hour. The task should not run. # Advance less than 5 minutes. The task should not run.
test_time = test_time + timedelta(minutes=50) test_time = test_time + timedelta(minutes=3)
run_tasks_at_time(hass, test_time) run_tasks_at_time(hass, test_time)
assert len(compile_statistics.mock_calls) == 0 assert len(compile_statistics.mock_calls) == 0
# Advance to the next hour, and the statistics task should run again # Advance 5 minutes, and the statistics task should run again
test_time = test_time + timedelta(hours=1) test_time = test_time + timedelta(minutes=5)
run_tasks_at_time(hass, test_time) run_tasks_at_time(hass, test_time)
assert len(compile_statistics.mock_calls) == 1 assert len(compile_statistics.mock_calls) == 1
@ -754,8 +754,8 @@ def test_statistics_runs_initiated(hass_recorder):
assert len(statistics_runs) == 1 assert len(statistics_runs) == 1
last_run = process_timestamp(statistics_runs[0].start) last_run = process_timestamp(statistics_runs[0].start)
assert process_timestamp(last_run) == now.replace( assert process_timestamp(last_run) == now.replace(
minute=0, second=0, microsecond=0 minute=now.minute - now.minute % 5, second=0, microsecond=0
) - timedelta(hours=1) ) - timedelta(minutes=5)
def test_compile_missing_statistics(tmpdir): def test_compile_missing_statistics(tmpdir):
@ -776,7 +776,7 @@ def test_compile_missing_statistics(tmpdir):
statistics_runs = list(session.query(StatisticsRuns)) statistics_runs = list(session.query(StatisticsRuns))
assert len(statistics_runs) == 1 assert len(statistics_runs) == 1
last_run = process_timestamp(statistics_runs[0].start) last_run = process_timestamp(statistics_runs[0].start)
assert last_run == now - timedelta(hours=1) assert last_run == now - timedelta(minutes=5)
wait_recording_done(hass) wait_recording_done(hass)
wait_recording_done(hass) wait_recording_done(hass)
@ -795,7 +795,7 @@ def test_compile_missing_statistics(tmpdir):
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
statistics_runs = list(session.query(StatisticsRuns)) statistics_runs = list(session.query(StatisticsRuns))
assert len(statistics_runs) == 2 assert len(statistics_runs) == 13 # 12 5-minute runs
last_run = process_timestamp(statistics_runs[1].start) last_run = process_timestamp(statistics_runs[1].start)
assert last_run == now assert last_run == now

View file

@ -9,7 +9,7 @@ from pytest import approx
from homeassistant.components.recorder import history from homeassistant.components.recorder import history
from homeassistant.components.recorder.const import DATA_INSTANCE from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.models import ( from homeassistant.components.recorder.models import (
Statistics, StatisticsShortTerm,
process_timestamp_to_utc_isoformat, process_timestamp_to_utc_isoformat,
) )
from homeassistant.components.recorder.statistics import ( from homeassistant.components.recorder.statistics import (
@ -34,18 +34,18 @@ def test_compile_hourly_statistics(hass_recorder):
assert dict(states) == dict(hist) assert dict(states) == dict(hist)
for kwargs in ({}, {"statistic_ids": ["sensor.test1"]}): for kwargs in ({}, {"statistic_ids": ["sensor.test1"]}):
stats = statistics_during_period(hass, zero, **kwargs) stats = statistics_during_period(hass, zero, period="5minute", **kwargs)
assert stats == {} assert stats == {}
stats = get_last_statistics(hass, 0, "sensor.test1", True) stats = get_last_statistics(hass, 0, "sensor.test1", True)
assert stats == {} assert stats == {}
recorder.do_adhoc_statistics(period="hourly", start=zero) recorder.do_adhoc_statistics(start=zero)
recorder.do_adhoc_statistics(period="hourly", start=four) recorder.do_adhoc_statistics(start=four)
wait_recording_done(hass) wait_recording_done(hass)
expected_1 = { expected_1 = {
"statistic_id": "sensor.test1", "statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(zero), "start": process_timestamp_to_utc_isoformat(zero),
"end": process_timestamp_to_utc_isoformat(zero + timedelta(hours=1)), "end": process_timestamp_to_utc_isoformat(zero + timedelta(minutes=5)),
"mean": approx(14.915254237288135), "mean": approx(14.915254237288135),
"min": approx(10.0), "min": approx(10.0),
"max": approx(20.0), "max": approx(20.0),
@ -58,7 +58,7 @@ def test_compile_hourly_statistics(hass_recorder):
expected_2 = { expected_2 = {
"statistic_id": "sensor.test1", "statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(four), "start": process_timestamp_to_utc_isoformat(four),
"end": process_timestamp_to_utc_isoformat(four + timedelta(hours=1)), "end": process_timestamp_to_utc_isoformat(four + timedelta(minutes=5)),
"mean": approx(20.0), "mean": approx(20.0),
"min": approx(20.0), "min": approx(20.0),
"max": approx(20.0), "max": approx(20.0),
@ -78,13 +78,17 @@ def test_compile_hourly_statistics(hass_recorder):
] ]
# Test statistics_during_period # Test statistics_during_period
stats = statistics_during_period(hass, zero) stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {"sensor.test1": expected_stats1, "sensor.test2": expected_stats2} assert stats == {"sensor.test1": expected_stats1, "sensor.test2": expected_stats2}
stats = statistics_during_period(hass, zero, statistic_ids=["sensor.test2"]) stats = statistics_during_period(
hass, zero, statistic_ids=["sensor.test2"], period="5minute"
)
assert stats == {"sensor.test2": expected_stats2} assert stats == {"sensor.test2": expected_stats2}
stats = statistics_during_period(hass, zero, statistic_ids=["sensor.test3"]) stats = statistics_during_period(
hass, zero, statistic_ids=["sensor.test3"], period="5minute"
)
assert stats == {} assert stats == {}
# Test get_last_statistics # Test get_last_statistics
@ -130,7 +134,7 @@ def mock_sensor_statistics():
def mock_from_stats(): def mock_from_stats():
"""Mock out Statistics.from_stats.""" """Mock out Statistics.from_stats."""
counter = 0 counter = 0
real_from_stats = Statistics.from_stats real_from_stats = StatisticsShortTerm.from_stats
def from_stats(metadata_id, start, stats): def from_stats(metadata_id, start, stats):
nonlocal counter nonlocal counter
@ -140,17 +144,17 @@ def mock_from_stats():
return real_from_stats(metadata_id, start, stats) return real_from_stats(metadata_id, start, stats)
with patch( with patch(
"homeassistant.components.recorder.statistics.Statistics.from_stats", "homeassistant.components.recorder.statistics.StatisticsShortTerm.from_stats",
side_effect=from_stats, side_effect=from_stats,
autospec=True, autospec=True,
): ):
yield yield
def test_compile_hourly_statistics_exception( def test_compile_periodic_statistics_exception(
hass_recorder, mock_sensor_statistics, mock_from_stats hass_recorder, mock_sensor_statistics, mock_from_stats
): ):
"""Test exception handling when compiling hourly statistics.""" """Test exception handling when compiling periodic statistics."""
def mock_from_stats(): def mock_from_stats():
raise ValueError raise ValueError
@ -160,13 +164,13 @@ def test_compile_hourly_statistics_exception(
setup_component(hass, "sensor", {}) setup_component(hass, "sensor", {})
now = dt_util.utcnow() now = dt_util.utcnow()
recorder.do_adhoc_statistics(period="hourly", start=now) recorder.do_adhoc_statistics(start=now)
recorder.do_adhoc_statistics(period="hourly", start=now + timedelta(hours=1)) recorder.do_adhoc_statistics(start=now + timedelta(minutes=5))
wait_recording_done(hass) wait_recording_done(hass)
expected_1 = { expected_1 = {
"statistic_id": "sensor.test1", "statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(now), "start": process_timestamp_to_utc_isoformat(now),
"end": process_timestamp_to_utc_isoformat(now + timedelta(hours=1)), "end": process_timestamp_to_utc_isoformat(now + timedelta(minutes=5)),
"mean": None, "mean": None,
"min": None, "min": None,
"max": None, "max": None,
@ -178,8 +182,8 @@ def test_compile_hourly_statistics_exception(
} }
expected_2 = { expected_2 = {
"statistic_id": "sensor.test1", "statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(now + timedelta(hours=1)), "start": process_timestamp_to_utc_isoformat(now + timedelta(minutes=5)),
"end": process_timestamp_to_utc_isoformat(now + timedelta(hours=2)), "end": process_timestamp_to_utc_isoformat(now + timedelta(minutes=10)),
"mean": None, "mean": None,
"min": None, "min": None,
"max": None, "max": None,
@ -201,7 +205,7 @@ def test_compile_hourly_statistics_exception(
{**expected_2, "statistic_id": "sensor.test3"}, {**expected_2, "statistic_id": "sensor.test3"},
] ]
stats = statistics_during_period(hass, now) stats = statistics_during_period(hass, now, period="5minute")
assert stats == { assert stats == {
"sensor.test1": expected_stats1, "sensor.test1": expected_stats1,
"sensor.test2": expected_stats2, "sensor.test2": expected_stats2,
@ -229,17 +233,17 @@ def test_rename_entity(hass_recorder):
assert dict(states) == dict(hist) assert dict(states) == dict(hist)
for kwargs in ({}, {"statistic_ids": ["sensor.test1"]}): for kwargs in ({}, {"statistic_ids": ["sensor.test1"]}):
stats = statistics_during_period(hass, zero, **kwargs) stats = statistics_during_period(hass, zero, period="5minute", **kwargs)
assert stats == {} assert stats == {}
stats = get_last_statistics(hass, 0, "sensor.test1", True) stats = get_last_statistics(hass, 0, "sensor.test1", True)
assert stats == {} assert stats == {}
recorder.do_adhoc_statistics(period="hourly", start=zero) recorder.do_adhoc_statistics(start=zero)
wait_recording_done(hass) wait_recording_done(hass)
expected_1 = { expected_1 = {
"statistic_id": "sensor.test1", "statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(zero), "start": process_timestamp_to_utc_isoformat(zero),
"end": process_timestamp_to_utc_isoformat(zero + timedelta(hours=1)), "end": process_timestamp_to_utc_isoformat(zero + timedelta(minutes=5)),
"mean": approx(14.915254237288135), "mean": approx(14.915254237288135),
"min": approx(10.0), "min": approx(10.0),
"max": approx(20.0), "max": approx(20.0),
@ -259,13 +263,13 @@ def test_rename_entity(hass_recorder):
{**expected_1, "statistic_id": "sensor.test99"}, {**expected_1, "statistic_id": "sensor.test99"},
] ]
stats = statistics_during_period(hass, zero) stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {"sensor.test1": expected_stats1, "sensor.test2": expected_stats2} assert stats == {"sensor.test1": expected_stats1, "sensor.test2": expected_stats2}
entity_reg.async_update_entity(reg_entry.entity_id, new_entity_id="sensor.test99") entity_reg.async_update_entity(reg_entry.entity_id, new_entity_id="sensor.test99")
hass.block_till_done() hass.block_till_done()
stats = statistics_during_period(hass, zero) stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {"sensor.test99": expected_stats99, "sensor.test2": expected_stats2} assert stats == {"sensor.test99": expected_stats99, "sensor.test2": expected_stats2}
@ -285,7 +289,7 @@ def test_statistics_duplicated(hass_recorder, caplog):
with patch( with patch(
"homeassistant.components.sensor.recorder.compile_statistics" "homeassistant.components.sensor.recorder.compile_statistics"
) as compile_statistics: ) as compile_statistics:
recorder.do_adhoc_statistics(period="hourly", start=zero) recorder.do_adhoc_statistics(start=zero)
wait_recording_done(hass) wait_recording_done(hass)
assert compile_statistics.called assert compile_statistics.called
compile_statistics.reset_mock() compile_statistics.reset_mock()
@ -293,7 +297,7 @@ def test_statistics_duplicated(hass_recorder, caplog):
assert "Statistics already compiled" not in caplog.text assert "Statistics already compiled" not in caplog.text
caplog.clear() caplog.clear()
recorder.do_adhoc_statistics(period="hourly", start=zero) recorder.do_adhoc_statistics(start=zero)
wait_recording_done(hass) wait_recording_done(hass)
assert not compile_statistics.called assert not compile_statistics.called
compile_statistics.reset_mock() compile_statistics.reset_mock()
@ -332,10 +336,10 @@ def record_states(hass):
return hass.states.get(entity_id) return hass.states.get(entity_id)
zero = dt_util.utcnow() zero = dt_util.utcnow()
one = zero + timedelta(minutes=1) one = zero + timedelta(seconds=1 * 5)
two = one + timedelta(minutes=15) two = one + timedelta(seconds=15 * 5)
three = two + timedelta(minutes=30) three = two + timedelta(seconds=30 * 5)
four = three + timedelta(minutes=15) four = three + timedelta(seconds=15 * 5)
states = {mp: [], sns1: [], sns2: [], sns3: [], sns4: []} states = {mp: [], sns1: [], sns2: [], sns3: [], sns4: []}
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one): with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):

File diff suppressed because it is too large Load diff

View file

@ -636,9 +636,9 @@ def enable_statistics():
def hass_recorder(enable_statistics, hass_storage): def hass_recorder(enable_statistics, hass_storage):
"""Home Assistant fixture with in-memory recorder.""" """Home Assistant fixture with in-memory recorder."""
hass = get_test_home_assistant() hass = get_test_home_assistant()
stats = recorder.Recorder.async_hourly_statistics if enable_statistics else None stats = recorder.Recorder.async_periodic_statistics if enable_statistics else None
with patch( with patch(
"homeassistant.components.recorder.Recorder.async_hourly_statistics", "homeassistant.components.recorder.Recorder.async_periodic_statistics",
side_effect=stats, side_effect=stats,
autospec=True, autospec=True,
): ):