Fire events when long term statistics is updated (#82492)

* Fire events when long term statistics is updated

* Allow the new events to be subscribed to by anyone

* Address review comments

* Finish renaming events

* Finish renaming events

* Fix do_adhoc_statistics

* Adjust tests

* Adjust tests
This commit is contained in:
Erik Montnemery 2022-11-24 22:01:36 +01:00 committed by GitHub
parent 9f9114cb4a
commit b94e1e9ef8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 108 additions and 23 deletions

View file

@ -21,10 +21,12 @@ from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import bind_hass from homeassistant.loader import bind_hass
from . import statistics, websocket_api from . import statistics, websocket_api
from .const import ( from .const import ( # noqa: F401
CONF_DB_INTEGRITY_CHECK, CONF_DB_INTEGRITY_CHECK,
DATA_INSTANCE, DATA_INSTANCE,
DOMAIN, DOMAIN,
EVENT_RECORDER_5MIN_STATISTICS_GENERATED,
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED,
EXCLUDE_ATTRIBUTES, EXCLUDE_ATTRIBUTES,
SQLITE_URL_PREFIX, SQLITE_URL_PREFIX,
) )

View file

@ -14,6 +14,9 @@ MYSQLDB_URL_PREFIX = "mysql://"
MYSQLDB_PYMYSQL_URL_PREFIX = "mysql+pymysql://" MYSQLDB_PYMYSQL_URL_PREFIX = "mysql+pymysql://"
DOMAIN = "recorder" DOMAIN = "recorder"
EVENT_RECORDER_5MIN_STATISTICS_GENERATED = "recorder_5min_statistics_generated"
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED = "recorder_hourly_statistics_generated"
CONF_DB_INTEGRITY_CHECK = "db_integrity_check" CONF_DB_INTEGRITY_CHECK = "db_integrity_check"
MAX_QUEUE_BACKLOG = 40000 MAX_QUEUE_BACKLOG = 40000

View file

@ -375,12 +375,6 @@ class Recorder(threading.Thread):
# Unknown what it is. # Unknown what it is.
return True return True
def do_adhoc_statistics(self, **kwargs: Any) -> None:
"""Trigger an adhoc statistics run."""
if not (start := kwargs.get("start")):
start = statistics.get_start_time()
self.queue_task(StatisticsTask(start))
def _empty_queue(self, event: Event) -> None: def _empty_queue(self, event: Event) -> None:
"""Empty the queue if its still present at final write.""" """Empty the queue if its still present at final write."""
@ -479,7 +473,7 @@ class Recorder(threading.Thread):
Short term statistics run every 5 minutes Short term statistics run every 5 minutes
""" """
start = statistics.get_start_time() start = statistics.get_start_time()
self.queue_task(StatisticsTask(start)) self.queue_task(StatisticsTask(start, True))
@callback @callback
def async_adjust_statistics( def async_adjust_statistics(
@ -1193,7 +1187,7 @@ class Recorder(threading.Thread):
while start < last_period: while start < last_period:
end = start + timedelta(minutes=5) end = start + timedelta(minutes=5)
_LOGGER.debug("Compiling missing statistics for %s-%s", start, end) _LOGGER.debug("Compiling missing statistics for %s-%s", start, end)
self.queue_task(StatisticsTask(start)) self.queue_task(StatisticsTask(start, end >= last_period))
start = end start = end
def _end_session(self) -> None: def _end_session(self) -> None:

View file

@ -45,7 +45,13 @@ from homeassistant.util.unit_conversion import (
VolumeConverter, VolumeConverter,
) )
from .const import DOMAIN, MAX_ROWS_TO_PURGE, SupportedDialect from .const import (
DOMAIN,
EVENT_RECORDER_5MIN_STATISTICS_GENERATED,
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED,
MAX_ROWS_TO_PURGE,
SupportedDialect,
)
from .db_schema import ( from .db_schema import (
Statistics, Statistics,
StatisticsBase, StatisticsBase,
@ -640,7 +646,7 @@ def _compile_hourly_statistics(session: Session, start: datetime) -> None:
@retryable_database_job("statistics") @retryable_database_job("statistics")
def compile_statistics(instance: Recorder, start: datetime) -> bool: def compile_statistics(instance: Recorder, start: datetime, fire_events: bool) -> bool:
"""Compile 5-minute statistics for all integrations with a recorder platform. """Compile 5-minute statistics for all integrations with a recorder platform.
The actual calculation is delegated to the platforms. The actual calculation is delegated to the platforms.
@ -696,6 +702,11 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
session.add(StatisticsRuns(start=start)) session.add(StatisticsRuns(start=start))
if fire_events:
instance.hass.bus.fire(EVENT_RECORDER_5MIN_STATISTICS_GENERATED)
if start.minute == 55:
instance.hass.bus.fire(EVENT_RECORDER_HOURLY_STATISTICS_GENERATED)
return True return True

View file

@ -133,13 +133,14 @@ class StatisticsTask(RecorderTask):
"""An object to insert into the recorder queue to run a statistics task.""" """An object to insert into the recorder queue to run a statistics task."""
start: datetime start: datetime
fire_events: bool
def run(self, instance: Recorder) -> None: def run(self, instance: Recorder) -> None:
"""Run statistics task.""" """Run statistics task."""
if statistics.compile_statistics(instance, self.start): if statistics.compile_statistics(instance, self.start, self.fire_events):
return return
# Schedule a new statistics task if this one didn't finish # Schedule a new statistics task if this one didn't finish
instance.queue_task(StatisticsTask(self.start)) instance.queue_task(StatisticsTask(self.start, self.fire_events))
@dataclass @dataclass

View file

@ -5,5 +5,6 @@
"dependencies": ["http"], "dependencies": ["http"],
"codeowners": ["@home-assistant/core"], "codeowners": ["@home-assistant/core"],
"quality_scale": "internal", "quality_scale": "internal",
"integration_type": "system" "integration_type": "system",
"after_dependencies": ["recorder"]
} }

View file

@ -11,6 +11,10 @@ from homeassistant.components.lovelace import EVENT_LOVELACE_UPDATED
from homeassistant.components.persistent_notification import ( from homeassistant.components.persistent_notification import (
EVENT_PERSISTENT_NOTIFICATIONS_UPDATED, EVENT_PERSISTENT_NOTIFICATIONS_UPDATED,
) )
from homeassistant.components.recorder import (
EVENT_RECORDER_5MIN_STATISTICS_GENERATED,
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED,
)
from homeassistant.components.shopping_list import EVENT_SHOPPING_LIST_UPDATED from homeassistant.components.shopping_list import EVENT_SHOPPING_LIST_UPDATED
from homeassistant.const import ( from homeassistant.const import (
EVENT_COMPONENT_LOADED, EVENT_COMPONENT_LOADED,
@ -35,6 +39,8 @@ SUBSCRIBE_ALLOWLIST: Final[set[str]] = {
EVENT_LOVELACE_UPDATED, EVENT_LOVELACE_UPDATED,
EVENT_PANELS_UPDATED, EVENT_PANELS_UPDATED,
EVENT_PERSISTENT_NOTIFICATIONS_UPDATED, EVENT_PERSISTENT_NOTIFICATIONS_UPDATED,
EVENT_RECORDER_5MIN_STATISTICS_GENERATED,
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED,
EVENT_SERVICE_REGISTERED, EVENT_SERVICE_REGISTERED,
EVENT_SERVICE_REMOVED, EVENT_SERVICE_REMOVED,
EVENT_SHOPPING_LIST_UPDATED, EVENT_SHOPPING_LIST_UPDATED,

View file

@ -53,7 +53,7 @@ def do_adhoc_statistics(hass: HomeAssistant, **kwargs: Any) -> None:
"""Trigger an adhoc statistics run.""" """Trigger an adhoc statistics run."""
if not (start := kwargs.get("start")): if not (start := kwargs.get("start")):
start = statistics.get_start_time() start = statistics.get_start_time()
get_instance(hass).queue_task(StatisticsTask(start)) get_instance(hass).queue_task(StatisticsTask(start, False))
def wait_recording_done(hass: HomeAssistant) -> None: def wait_recording_done(hass: HomeAssistant) -> None:

View file

@ -26,8 +26,13 @@ from homeassistant.components.recorder import (
Recorder, Recorder,
get_instance, get_instance,
pool, pool,
statistics,
)
from homeassistant.components.recorder.const import (
EVENT_RECORDER_5MIN_STATISTICS_GENERATED,
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED,
KEEPALIVE_TIME,
) )
from homeassistant.components.recorder.const import KEEPALIVE_TIME
from homeassistant.components.recorder.db_schema import ( from homeassistant.components.recorder.db_schema import (
SCHEMA_VERSION, SCHEMA_VERSION,
EventData, EventData,
@ -933,7 +938,7 @@ def test_auto_purge_disabled(hass_recorder):
@pytest.mark.parametrize("enable_statistics", [True]) @pytest.mark.parametrize("enable_statistics", [True])
def test_auto_statistics(hass_recorder): def test_auto_statistics(hass_recorder, freezer):
"""Test periodic statistics scheduling.""" """Test periodic statistics scheduling."""
hass = hass_recorder() hass = hass_recorder()
@ -942,43 +947,82 @@ def test_auto_statistics(hass_recorder):
tz = dt_util.get_time_zone("Europe/Copenhagen") tz = dt_util.get_time_zone("Europe/Copenhagen")
dt_util.set_default_time_zone(tz) dt_util.set_default_time_zone(tz)
stats_5min = []
stats_hourly = []
@callback
def async_5min_stats_updated_listener(event: Event) -> None:
"""Handle recorder 5 min stat updated."""
stats_5min.append(event)
def async_hourly_stats_updated_listener(event: Event) -> None:
"""Handle recorder 5 min stat updated."""
stats_hourly.append(event)
# Statistics is scheduled to happen every 5 minutes. Exercise this behavior by # Statistics is scheduled to happen every 5 minutes. Exercise this behavior by
# firing time changed events and advancing the clock around this time. Pick an # firing time changed events and advancing the clock around this time. Pick an
# arbitrary year in the future to avoid boundary conditions relative to the current # arbitrary year in the future to avoid boundary conditions relative to the current
# date. # date.
# #
# The clock is started at 4:16am then advanced forward below # The clock is started at 4:51am then advanced forward below
now = dt_util.utcnow() now = dt_util.utcnow()
test_time = datetime(now.year + 2, 1, 1, 4, 16, 0, tzinfo=tz) test_time = datetime(now.year + 2, 1, 1, 4, 51, 0, tzinfo=tz)
freezer.move_to(test_time.isoformat())
run_tasks_at_time(hass, test_time) run_tasks_at_time(hass, test_time)
hass.block_till_done()
hass.bus.listen(
EVENT_RECORDER_5MIN_STATISTICS_GENERATED, async_5min_stats_updated_listener
)
hass.bus.listen(
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED, async_hourly_stats_updated_listener
)
real_compile_statistics = statistics.compile_statistics
with patch( with patch(
"homeassistant.components.recorder.statistics.compile_statistics", "homeassistant.components.recorder.statistics.compile_statistics",
return_value=True, side_effect=real_compile_statistics,
autospec=True,
) as compile_statistics: ) as compile_statistics:
# Advance 5 minutes, and the statistics task should run # Advance 5 minutes, and the statistics task should run
test_time = test_time + timedelta(minutes=5) test_time = test_time + timedelta(minutes=5)
freezer.move_to(test_time.isoformat())
run_tasks_at_time(hass, test_time) run_tasks_at_time(hass, test_time)
assert len(compile_statistics.mock_calls) == 1 assert len(compile_statistics.mock_calls) == 1
hass.block_till_done()
assert len(stats_5min) == 1
assert len(stats_hourly) == 0
compile_statistics.reset_mock() compile_statistics.reset_mock()
# Advance 5 minutes, and the statistics task should run again # Advance 5 minutes, and the statistics task should run again
test_time = test_time + timedelta(minutes=5) test_time = test_time + timedelta(minutes=5)
freezer.move_to(test_time.isoformat())
run_tasks_at_time(hass, test_time) run_tasks_at_time(hass, test_time)
assert len(compile_statistics.mock_calls) == 1 assert len(compile_statistics.mock_calls) == 1
hass.block_till_done()
assert len(stats_5min) == 2
assert len(stats_hourly) == 1
compile_statistics.reset_mock() compile_statistics.reset_mock()
# Advance less than 5 minutes. The task should not run. # Advance less than 5 minutes. The task should not run.
test_time = test_time + timedelta(minutes=3) test_time = test_time + timedelta(minutes=3)
freezer.move_to(test_time.isoformat())
run_tasks_at_time(hass, test_time) run_tasks_at_time(hass, test_time)
assert len(compile_statistics.mock_calls) == 0 assert len(compile_statistics.mock_calls) == 0
hass.block_till_done()
assert len(stats_5min) == 2
assert len(stats_hourly) == 1
# Advance 5 minutes, and the statistics task should run again # Advance 5 minutes, and the statistics task should run again
test_time = test_time + timedelta(minutes=5) test_time = test_time + timedelta(minutes=5)
freezer.move_to(test_time.isoformat())
run_tasks_at_time(hass, test_time) run_tasks_at_time(hass, test_time)
assert len(compile_statistics.mock_calls) == 1 assert len(compile_statistics.mock_calls) == 1
hass.block_till_done()
assert len(stats_5min) == 3
assert len(stats_hourly) == 1
dt_util.set_default_time_zone(original_tz) dt_util.set_default_time_zone(original_tz)
@ -1027,8 +1071,27 @@ def test_compile_missing_statistics(tmpdir, freezer):
hass.stop() hass.stop()
# Start Home Assistant one hour later # Start Home Assistant one hour later
stats_5min = []
stats_hourly = []
@callback
def async_5min_stats_updated_listener(event: Event) -> None:
"""Handle recorder 5 min stat updated."""
stats_5min.append(event)
def async_hourly_stats_updated_listener(event: Event) -> None:
"""Handle recorder 5 min stat updated."""
stats_hourly.append(event)
freezer.tick(timedelta(hours=1)) freezer.tick(timedelta(hours=1))
hass = get_test_home_assistant() hass = get_test_home_assistant()
hass.bus.listen(
EVENT_RECORDER_5MIN_STATISTICS_GENERATED, async_5min_stats_updated_listener
)
hass.bus.listen(
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED, async_hourly_stats_updated_listener
)
recorder_helper.async_initialize_recorder(hass) recorder_helper.async_initialize_recorder(hass)
setup_component(hass, DOMAIN, {DOMAIN: {CONF_DB_URL: dburl}}) setup_component(hass, DOMAIN, {DOMAIN: {CONF_DB_URL: dburl}})
hass.start() hass.start()
@ -1041,6 +1104,9 @@ def test_compile_missing_statistics(tmpdir, freezer):
last_run = process_timestamp(statistics_runs[1].start) last_run = process_timestamp(statistics_runs[1].start)
assert last_run == now assert last_run == now
assert len(stats_5min) == 1
assert len(stats_hourly) == 1
wait_recording_done(hass) wait_recording_done(hass)
wait_recording_done(hass) wait_recording_done(hass)
hass.stop() hass.stop()

View file

@ -418,14 +418,15 @@ async def test_discovery_requirements_ssdp(hass):
) as mock_process: ) as mock_process:
await async_get_integration_with_requirements(hass, "ssdp_comp") await async_get_integration_with_requirements(hass, "ssdp_comp")
assert len(mock_process.mock_calls) == 4 assert len(mock_process.mock_calls) == 5
assert mock_process.mock_calls[0][1][1] == ssdp.requirements assert mock_process.mock_calls[0][1][1] == ssdp.requirements
# Ensure zeroconf is a dep for ssdp # Ensure zeroconf is a dep for ssdp
assert { assert {
mock_process.mock_calls[1][1][0], mock_process.mock_calls[1][1][0],
mock_process.mock_calls[2][1][0], mock_process.mock_calls[2][1][0],
mock_process.mock_calls[3][1][0], mock_process.mock_calls[3][1][0],
} == {"network", "zeroconf", "http"} mock_process.mock_calls[4][1][0],
} == {"http", "network", "recorder", "zeroconf"}
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -447,7 +448,7 @@ async def test_discovery_requirements_zeroconf(hass, partial_manifest):
) as mock_process: ) as mock_process:
await async_get_integration_with_requirements(hass, "comp") await async_get_integration_with_requirements(hass, "comp")
assert len(mock_process.mock_calls) == 3 # zeroconf also depends on http assert len(mock_process.mock_calls) == 4 # zeroconf also depends on http
assert mock_process.mock_calls[0][1][1] == zeroconf.requirements assert mock_process.mock_calls[0][1][1] == zeroconf.requirements