Always include start point for statistics (#57182)
This commit is contained in:
parent
a4357fdb95
commit
ee98849360
2 changed files with 80 additions and 8 deletions
|
@ -5,7 +5,7 @@ from collections import defaultdict
|
||||||
from collections.abc import Callable, Iterable
|
from collections.abc import Callable, Iterable
|
||||||
import dataclasses
|
import dataclasses
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from itertools import groupby
|
from itertools import chain, groupby
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Any, Literal
|
from typing import TYPE_CHECKING, Any, Literal
|
||||||
|
|
||||||
|
@ -629,7 +629,7 @@ def statistics_during_period(
|
||||||
return {}
|
return {}
|
||||||
# Return statistics combined with metadata
|
# Return statistics combined with metadata
|
||||||
return _sorted_statistics_to_dict(
|
return _sorted_statistics_to_dict(
|
||||||
hass, stats, statistic_ids, metadata, True, table.duration
|
hass, session, stats, statistic_ids, metadata, True, table, start_time
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -668,25 +668,64 @@ def get_last_statistics(
|
||||||
# Return statistics combined with metadata
|
# Return statistics combined with metadata
|
||||||
return _sorted_statistics_to_dict(
|
return _sorted_statistics_to_dict(
|
||||||
hass,
|
hass,
|
||||||
|
session,
|
||||||
stats,
|
stats,
|
||||||
statistic_ids,
|
statistic_ids,
|
||||||
metadata,
|
metadata,
|
||||||
convert_units,
|
convert_units,
|
||||||
StatisticsShortTerm.duration,
|
StatisticsShortTerm,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _statistics_at_time(
|
||||||
|
session: scoped_session,
|
||||||
|
metadata_ids: set[int],
|
||||||
|
table: type[Statistics | StatisticsShortTerm],
|
||||||
|
start_time: datetime,
|
||||||
|
) -> list | None:
|
||||||
|
"""Return last known statics, earlier than start_time, for the metadata_ids."""
|
||||||
|
# Fetch metadata for the given (or all) statistic_ids
|
||||||
|
if table == StatisticsShortTerm:
|
||||||
|
base_query = QUERY_STATISTICS_SHORT_TERM
|
||||||
|
else:
|
||||||
|
base_query = QUERY_STATISTICS
|
||||||
|
|
||||||
|
query = session.query(*base_query)
|
||||||
|
|
||||||
|
most_recent_statistic_ids = (
|
||||||
|
session.query(
|
||||||
|
func.max(table.id).label("max_id"),
|
||||||
|
)
|
||||||
|
.filter(table.start < start_time)
|
||||||
|
.filter(table.metadata_id.in_(metadata_ids))
|
||||||
|
)
|
||||||
|
most_recent_statistic_ids = most_recent_statistic_ids.group_by(table.metadata_id)
|
||||||
|
most_recent_statistic_ids = most_recent_statistic_ids.subquery()
|
||||||
|
query = query.join(
|
||||||
|
most_recent_statistic_ids,
|
||||||
|
table.id == most_recent_statistic_ids.c.max_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
return execute(query)
|
||||||
|
|
||||||
|
|
||||||
def _sorted_statistics_to_dict(
|
def _sorted_statistics_to_dict(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
session: scoped_session,
|
||||||
stats: list,
|
stats: list,
|
||||||
statistic_ids: list[str] | None,
|
statistic_ids: list[str] | None,
|
||||||
_metadata: dict[str, tuple[int, StatisticMetaData]],
|
_metadata: dict[str, tuple[int, StatisticMetaData]],
|
||||||
convert_units: bool,
|
convert_units: bool,
|
||||||
duration: timedelta,
|
table: type[Statistics | StatisticsShortTerm],
|
||||||
|
start_time: datetime | None,
|
||||||
) -> dict[str, list[dict]]:
|
) -> dict[str, list[dict]]:
|
||||||
"""Convert SQL results into JSON friendly data structure."""
|
"""Convert SQL results into JSON friendly data structure."""
|
||||||
result: dict = defaultdict(list)
|
result: dict = defaultdict(list)
|
||||||
units = hass.config.units
|
units = hass.config.units
|
||||||
|
metadata = dict(_metadata.values())
|
||||||
|
need_stat_at_start_time = set()
|
||||||
|
stats_at_start_time = {}
|
||||||
|
|
||||||
def no_conversion(val: Any, _: Any) -> float | None:
|
def no_conversion(val: Any, _: Any) -> float | None:
|
||||||
"""Return x."""
|
"""Return x."""
|
||||||
|
@ -697,7 +736,19 @@ def _sorted_statistics_to_dict(
|
||||||
for stat_id in statistic_ids:
|
for stat_id in statistic_ids:
|
||||||
result[stat_id] = []
|
result[stat_id] = []
|
||||||
|
|
||||||
metadata = dict(_metadata.values())
|
# Identify metadata IDs for which no data was available at the requested start time
|
||||||
|
for meta_id, group in groupby(stats, lambda stat: stat.metadata_id): # type: ignore
|
||||||
|
first_start_time = process_timestamp(next(group).start)
|
||||||
|
if start_time and first_start_time > start_time:
|
||||||
|
need_stat_at_start_time.add(meta_id)
|
||||||
|
|
||||||
|
# Fetch last known statistics for the needed metadata IDs
|
||||||
|
if need_stat_at_start_time:
|
||||||
|
assert start_time # Can not be None if need_stat_at_start_time is not empty
|
||||||
|
tmp = _statistics_at_time(session, need_stat_at_start_time, table, start_time)
|
||||||
|
if tmp:
|
||||||
|
for stat in tmp:
|
||||||
|
stats_at_start_time[stat.metadata_id] = (stat,)
|
||||||
|
|
||||||
# Append all statistic entries, and optionally do unit conversion
|
# Append all statistic entries, and optionally do unit conversion
|
||||||
for meta_id, group in groupby(stats, lambda stat: stat.metadata_id): # type: ignore
|
for meta_id, group in groupby(stats, lambda stat: stat.metadata_id): # type: ignore
|
||||||
|
@ -709,9 +760,9 @@ def _sorted_statistics_to_dict(
|
||||||
else:
|
else:
|
||||||
convert = no_conversion
|
convert = no_conversion
|
||||||
ent_results = result[meta_id]
|
ent_results = result[meta_id]
|
||||||
for db_state in group:
|
for db_state in chain(stats_at_start_time.get(meta_id, ()), group):
|
||||||
start = process_timestamp(db_state.start)
|
start = process_timestamp(db_state.start)
|
||||||
end = start + duration
|
end = start + table.duration
|
||||||
ent_results.append(
|
ent_results.append(
|
||||||
{
|
{
|
||||||
"statistic_id": statistic_id,
|
"statistic_id": statistic_id,
|
||||||
|
|
|
@ -337,7 +337,7 @@ def test_compile_hourly_sum_statistics_amount(
|
||||||
{"statistic_id": "sensor.test1", "unit_of_measurement": display_unit}
|
{"statistic_id": "sensor.test1", "unit_of_measurement": display_unit}
|
||||||
]
|
]
|
||||||
stats = statistics_during_period(hass, period0, period="5minute")
|
stats = statistics_during_period(hass, period0, period="5minute")
|
||||||
assert stats == {
|
expected_stats = {
|
||||||
"sensor.test1": [
|
"sensor.test1": [
|
||||||
{
|
{
|
||||||
"statistic_id": "sensor.test1",
|
"statistic_id": "sensor.test1",
|
||||||
|
@ -374,6 +374,27 @@ def test_compile_hourly_sum_statistics_amount(
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
assert stats == expected_stats
|
||||||
|
|
||||||
|
# With an offset of 1 minute, we expect to get all periods
|
||||||
|
stats = statistics_during_period(
|
||||||
|
hass, period0 + timedelta(minutes=1), period="5minute"
|
||||||
|
)
|
||||||
|
assert stats == expected_stats
|
||||||
|
|
||||||
|
# With an offset of 5 minutes, we expect to get the 2nd and 3rd periods
|
||||||
|
stats = statistics_during_period(
|
||||||
|
hass, period0 + timedelta(minutes=5), period="5minute"
|
||||||
|
)
|
||||||
|
expected_stats["sensor.test1"] = expected_stats["sensor.test1"][1:3]
|
||||||
|
assert stats == expected_stats
|
||||||
|
|
||||||
|
# With an offset of 6 minutes, we expect to get the 2nd and 3rd periods
|
||||||
|
stats = statistics_during_period(
|
||||||
|
hass, period0 + timedelta(minutes=6), period="5minute"
|
||||||
|
)
|
||||||
|
assert stats == expected_stats
|
||||||
|
|
||||||
assert "Error while processing event StatisticsTask" not in caplog.text
|
assert "Error while processing event StatisticsTask" not in caplog.text
|
||||||
assert "Detected new cycle for sensor.test1, last_reset set to" in caplog.text
|
assert "Detected new cycle for sensor.test1, last_reset set to" in caplog.text
|
||||||
assert "Compiling initial sum statistics for sensor.test1" in caplog.text
|
assert "Compiling initial sum statistics for sensor.test1" in caplog.text
|
||||||
|
|
Loading…
Add table
Reference in a new issue