Reduce the number of queries needed to compile statistics (#69731)
Co-authored-by: Erik Montnemery <erik@montnemery.com>
This commit is contained in:
parent
012ef6bb7b
commit
89807f0d2d
3 changed files with 89 additions and 13 deletions
|
@ -236,9 +236,9 @@ def get_start_time() -> datetime:
|
|||
|
||||
|
||||
def _update_or_add_metadata(
|
||||
hass: HomeAssistant,
|
||||
session: Session,
|
||||
new_metadata: StatisticMetaData,
|
||||
old_metadata_dict: dict[str, tuple[int, StatisticMetaData]],
|
||||
) -> int:
|
||||
"""Get metadata_id for a statistic_id.
|
||||
|
||||
|
@ -248,10 +248,7 @@ def _update_or_add_metadata(
|
|||
Updating metadata source is not possible.
|
||||
"""
|
||||
statistic_id = new_metadata["statistic_id"]
|
||||
old_metadata_dict = get_metadata_with_session(
|
||||
hass, session, statistic_ids=[statistic_id]
|
||||
)
|
||||
if not old_metadata_dict:
|
||||
if statistic_id not in old_metadata_dict:
|
||||
meta = StatisticsMeta.from_meta(new_metadata)
|
||||
session.add(meta)
|
||||
session.flush() # Flush to get the metadata id assigned
|
||||
|
@ -568,8 +565,14 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
|
|||
session=instance.get_session(), # type: ignore[misc]
|
||||
exception_filter=_filter_unique_constraint_integrity_error(instance),
|
||||
) as session:
|
||||
statistic_ids = [stats["meta"]["statistic_id"] for stats in platform_stats]
|
||||
old_metadata_dict = get_metadata_with_session(
|
||||
instance.hass, session, statistic_ids=statistic_ids
|
||||
)
|
||||
for stats in platform_stats:
|
||||
metadata_id = _update_or_add_metadata(instance.hass, session, stats["meta"])
|
||||
metadata_id = _update_or_add_metadata(
|
||||
session, stats["meta"], old_metadata_dict
|
||||
)
|
||||
_insert_statistics(
|
||||
session,
|
||||
StatisticsShortTerm,
|
||||
|
@ -1098,6 +1101,52 @@ def get_last_short_term_statistics(
|
|||
)
|
||||
|
||||
|
||||
def get_latest_short_term_statistics(
|
||||
hass: HomeAssistant, statistic_ids: list[str]
|
||||
) -> dict[str, list[dict]]:
|
||||
"""Return the latest short term statistics for a list of statistic_ids."""
|
||||
# This function doesn't use a baked query, we instead rely on the
|
||||
# "Transparent SQL Compilation Caching" feature introduced in SQLAlchemy 1.4
|
||||
with session_scope(hass=hass) as session:
|
||||
# Fetch metadata for the given statistic_ids
|
||||
metadata = get_metadata_with_session(hass, session, statistic_ids=statistic_ids)
|
||||
if not metadata:
|
||||
return {}
|
||||
metadata_ids = [
|
||||
metadata[statistic_id][0]
|
||||
for statistic_id in statistic_ids
|
||||
if statistic_id in metadata
|
||||
]
|
||||
most_recent_statistic_row = (
|
||||
session.query(
|
||||
StatisticsShortTerm.id,
|
||||
func.max(StatisticsShortTerm.start),
|
||||
)
|
||||
.group_by(StatisticsShortTerm.metadata_id)
|
||||
.having(StatisticsShortTerm.metadata_id.in_(metadata_ids))
|
||||
).subquery()
|
||||
stats = execute(
|
||||
session.query(*QUERY_STATISTICS_SHORT_TERM).join(
|
||||
most_recent_statistic_row,
|
||||
StatisticsShortTerm.id == most_recent_statistic_row.c.id,
|
||||
)
|
||||
)
|
||||
if not stats:
|
||||
return {}
|
||||
|
||||
# Return statistics combined with metadata
|
||||
return _sorted_statistics_to_dict(
|
||||
hass,
|
||||
session,
|
||||
stats,
|
||||
statistic_ids,
|
||||
metadata,
|
||||
False,
|
||||
StatisticsShortTerm,
|
||||
None,
|
||||
)
|
||||
|
||||
|
||||
def _statistics_at_time(
|
||||
session: Session,
|
||||
metadata_ids: set[int],
|
||||
|
@ -1309,7 +1358,10 @@ def add_external_statistics(
|
|||
session=instance.get_session(), # type: ignore[misc]
|
||||
exception_filter=_filter_unique_constraint_integrity_error(instance),
|
||||
) as session:
|
||||
metadata_id = _update_or_add_metadata(instance.hass, session, metadata)
|
||||
old_metadata_dict = get_metadata_with_session(
|
||||
instance.hass, session, statistic_ids=[metadata["statistic_id"]]
|
||||
)
|
||||
metadata_id = _update_or_add_metadata(session, metadata, old_metadata_dict)
|
||||
for stat in statistics:
|
||||
if stat_id := _statistics_exists(
|
||||
session, Statistics, metadata_id, stat["start"]
|
||||
|
|
|
@ -446,12 +446,13 @@ def _compile_statistics( # noqa: C901
|
|||
if _state.entity_id not in history_list:
|
||||
history_list[_state.entity_id] = [_state]
|
||||
|
||||
for _state in sensor_states: # pylint: disable=too-many-nested-blocks
|
||||
to_process = []
|
||||
to_query = []
|
||||
for _state in sensor_states:
|
||||
entity_id = _state.entity_id
|
||||
if entity_id not in history_list:
|
||||
continue
|
||||
|
||||
state_class = _state.attributes[ATTR_STATE_CLASS]
|
||||
device_class = _state.attributes.get(ATTR_DEVICE_CLASS)
|
||||
entity_history = history_list[entity_id]
|
||||
unit, fstates = _normalize_states(
|
||||
|
@ -466,6 +467,19 @@ def _compile_statistics( # noqa: C901
|
|||
if not fstates:
|
||||
continue
|
||||
|
||||
state_class = _state.attributes[ATTR_STATE_CLASS]
|
||||
|
||||
to_process.append((entity_id, unit, state_class, fstates))
|
||||
if "sum" in wanted_statistics[entity_id]:
|
||||
to_query.append(entity_id)
|
||||
|
||||
last_stats = statistics.get_latest_short_term_statistics(hass, to_query)
|
||||
for ( # pylint: disable=too-many-nested-blocks
|
||||
entity_id,
|
||||
unit,
|
||||
state_class,
|
||||
fstates,
|
||||
) in to_process:
|
||||
# Check metadata
|
||||
if old_metadata := old_metadatas.get(entity_id):
|
||||
if old_metadata[1]["unit_of_measurement"] != unit:
|
||||
|
@ -511,9 +525,6 @@ def _compile_statistics( # noqa: C901
|
|||
last_reset = old_last_reset = None
|
||||
new_state = old_state = None
|
||||
_sum = 0.0
|
||||
last_stats = statistics.get_last_short_term_statistics(
|
||||
hass, 1, entity_id, False
|
||||
)
|
||||
if entity_id in last_stats:
|
||||
# We have compiled history for this sensor before, use that as a starting point
|
||||
last_reset = old_last_reset = last_stats[entity_id][0]["last_reset"]
|
||||
|
|
|
@ -23,6 +23,7 @@ from homeassistant.components.recorder.statistics import (
|
|||
delete_duplicates,
|
||||
get_last_short_term_statistics,
|
||||
get_last_statistics,
|
||||
get_latest_short_term_statistics,
|
||||
get_metadata,
|
||||
list_statistic_ids,
|
||||
statistics_during_period,
|
||||
|
@ -55,6 +56,10 @@ def test_compile_hourly_statistics(hass_recorder):
|
|||
hist = history.get_significant_states(hass, zero, four)
|
||||
assert dict(states) == dict(hist)
|
||||
|
||||
# Should not fail if there is nothing there yet
|
||||
stats = get_latest_short_term_statistics(hass, ["sensor.test1"])
|
||||
assert stats == {}
|
||||
|
||||
for kwargs in ({}, {"statistic_ids": ["sensor.test1"]}):
|
||||
stats = statistics_during_period(hass, zero, period="5minute", **kwargs)
|
||||
assert stats == {}
|
||||
|
@ -109,13 +114,16 @@ def test_compile_hourly_statistics(hass_recorder):
|
|||
)
|
||||
assert stats == {}
|
||||
|
||||
# Test get_last_short_term_statistics
|
||||
# Test get_last_short_term_statistics and get_latest_short_term_statistics
|
||||
stats = get_last_short_term_statistics(hass, 0, "sensor.test1", True)
|
||||
assert stats == {}
|
||||
|
||||
stats = get_last_short_term_statistics(hass, 1, "sensor.test1", True)
|
||||
assert stats == {"sensor.test1": [{**expected_2, "statistic_id": "sensor.test1"}]}
|
||||
|
||||
stats = get_latest_short_term_statistics(hass, ["sensor.test1"])
|
||||
assert stats == {"sensor.test1": [{**expected_2, "statistic_id": "sensor.test1"}]}
|
||||
|
||||
stats = get_last_short_term_statistics(hass, 2, "sensor.test1", True)
|
||||
assert stats == {"sensor.test1": expected_stats1[::-1]}
|
||||
|
||||
|
@ -125,6 +133,11 @@ def test_compile_hourly_statistics(hass_recorder):
|
|||
stats = get_last_short_term_statistics(hass, 1, "sensor.test3", True)
|
||||
assert stats == {}
|
||||
|
||||
recorder.get_session().query(StatisticsShortTerm).delete()
|
||||
# Should not fail there is nothing in the table
|
||||
stats = get_latest_short_term_statistics(hass, ["sensor.test1"])
|
||||
assert stats == {}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_sensor_statistics():
|
||||
|
|
Loading…
Add table
Reference in a new issue