From 10848b9bdf62dcdd3867dc3d2c9627f07ceb20fc Mon Sep 17 00:00:00 2001 From: Marc Mueller <30130371+cdce8p@users.noreply.github.com> Date: Thu, 11 Mar 2021 18:52:07 +0100 Subject: [PATCH] Recorder improvements (#47739) --- homeassistant/components/recorder/__init__.py | 9 +- homeassistant/components/recorder/models.py | 39 +++++++ homeassistant/components/recorder/purge.py | 21 ++-- homeassistant/components/recorder/util.py | 9 +- tests/components/recorder/common.py | 38 ++++++- tests/components/recorder/conftest.py | 36 +++++- tests/components/recorder/test_init.py | 27 +++-- tests/components/recorder/test_purge.py | 105 ++++++++++-------- 8 files changed, 207 insertions(+), 77 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 3935aa97eb8..9b84518b6d3 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -1,6 +1,5 @@ """Support for recording details.""" import asyncio -from collections import namedtuple import concurrent.futures from datetime import datetime import logging @@ -8,7 +7,7 @@ import queue import sqlite3 import threading import time -from typing import Any, Callable, List, Optional +from typing import Any, Callable, List, NamedTuple, Optional from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select from sqlalchemy.orm import scoped_session, sessionmaker @@ -223,7 +222,11 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: return await instance.async_db_ready -PurgeTask = namedtuple("PurgeTask", ["keep_days", "repack"]) +class PurgeTask(NamedTuple): + """Object to store information about purge task.""" + + keep_days: int + repack: bool class WaitTask: diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index 551abeac15a..6ed25e64eda 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -64,6 +64,15 @@ class Events(Base): # type: ignore Index("ix_events_event_type_time_fired", "event_type", "time_fired"), ) + def __repr__(self) -> str: + """Return string representation of instance for debugging.""" + return ( + f"" + ) + @staticmethod def from_event(event, event_data=None): """Create an event database object from a native event.""" @@ -129,6 +138,17 @@ class States(Base): # type: ignore Index("ix_states_entity_id_last_updated", "entity_id", "last_updated"), ) + def __repr__(self) -> str: + """Return string representation of instance for debugging.""" + return ( + f"" + ) + @staticmethod def from_event(event): """Create object from a state_changed event.""" @@ -185,6 +205,16 @@ class RecorderRuns(Base): # type: ignore __table_args__ = (Index("ix_recorder_runs_start_end", "start", "end"),) + def __repr__(self) -> str: + """Return string representation of instance for debugging.""" + return ( + f"" + ) + def entity_ids(self, point_in_time=None): """Return the entity ids that existed in this run. @@ -219,6 +249,15 @@ class SchemaChanges(Base): # type: ignore schema_version = Column(Integer) changed = Column(DateTime(timezone=True), default=dt_util.utcnow) + def __repr__(self) -> str: + """Return string representation of instance for debugging.""" + return ( + f"" + ) + def process_timestamp(ts): """Process a timestamp into datetime object.""" diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index ac10dadc227..3717ed49f30 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -28,14 +28,16 @@ def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool: Cleans up an timeframe of an hour, based on the oldest record. """ purge_before = dt_util.utcnow() - timedelta(days=purge_days) - _LOGGER.debug("Purging states and events before target %s", purge_before) + _LOGGER.debug( + "Purging states and events before target %s", + purge_before.isoformat(sep=" ", timespec="seconds"), + ) try: with session_scope(session=instance.get_session()) as session: # type: ignore # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record event_ids = _select_event_ids_to_purge(session, purge_before) state_ids = _select_state_ids_to_purge(session, purge_before, event_ids) if state_ids: - _disconnect_states_about_to_be_purged(session, state_ids) _purge_state_ids(session, state_ids) if event_ids: _purge_event_ids(session, event_ids) @@ -66,7 +68,7 @@ def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool: return True -def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list: +def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list[int]: """Return a list of event ids to purge.""" events = ( session.query(Events.event_id) @@ -79,8 +81,8 @@ def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list def _select_state_ids_to_purge( - session: Session, purge_before: datetime, event_ids: list -) -> list: + session: Session, purge_before: datetime, event_ids: list[int] +) -> list[int]: """Return a list of state ids to purge.""" if not event_ids: return [] @@ -94,7 +96,9 @@ def _select_state_ids_to_purge( return [state.state_id for state in states] -def _disconnect_states_about_to_be_purged(session: Session, state_ids: list) -> None: +def _purge_state_ids(session: Session, state_ids: list[int]) -> None: + """Disconnect states and delete by state id.""" + # Update old_state_id to NULL before deleting to ensure # the delete does not fail due to a foreign key constraint # since some databases (MSSQL) cannot do the ON DELETE SET NULL @@ -106,9 +110,6 @@ def _disconnect_states_about_to_be_purged(session: Session, state_ids: list) -> ) _LOGGER.debug("Updated %s states to remove old_state_id", disconnected_rows) - -def _purge_state_ids(session: Session, state_ids: list) -> None: - """Delete by state id.""" deleted_rows = ( session.query(States) .filter(States.state_id.in_(state_ids)) @@ -117,7 +118,7 @@ def _purge_state_ids(session: Session, state_ids: list) -> None: _LOGGER.debug("Deleted %s states", deleted_rows) -def _purge_event_ids(session: Session, event_ids: list) -> None: +def _purge_event_ids(session: Session, event_ids: list[int]) -> None: """Delete by event id.""" deleted_rows = ( session.query(Events) diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index b945386de82..b04a4fb7f1f 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -1,4 +1,7 @@ """SQLAlchemy util functions.""" +from __future__ import annotations + +from collections.abc import Generator from contextlib import contextmanager from datetime import timedelta import logging @@ -6,7 +9,9 @@ import os import time from sqlalchemy.exc import OperationalError, SQLAlchemyError +from sqlalchemy.orm.session import Session +from homeassistant.helpers.typing import HomeAssistantType import homeassistant.util.dt as dt_util from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, SQLITE_URL_PREFIX @@ -25,7 +30,9 @@ MAX_RESTART_TIME = timedelta(minutes=10) @contextmanager -def session_scope(*, hass=None, session=None): +def session_scope( + *, hass: HomeAssistantType | None = None, session: Session | None = None +) -> Generator[Session, None, None]: """Provide a transactional scope around a series of operations.""" if session is None and hass is not None: session = hass.data[DATA_INSTANCE].get_session() diff --git a/tests/components/recorder/common.py b/tests/components/recorder/common.py index d2b731777e2..79f0f1f00d0 100644 --- a/tests/components/recorder/common.py +++ b/tests/components/recorder/common.py @@ -1,14 +1,15 @@ """Common test utils for working with recorder.""" - from datetime import timedelta +from homeassistant import core as ha from homeassistant.components import recorder +from homeassistant.helpers.typing import HomeAssistantType from homeassistant.util import dt as dt_util -from tests.common import fire_time_changed +from tests.common import async_fire_time_changed, fire_time_changed -def wait_recording_done(hass): +def wait_recording_done(hass: HomeAssistantType) -> None: """Block till recording is done.""" hass.block_till_done() trigger_db_commit(hass) @@ -17,18 +18,45 @@ def wait_recording_done(hass): hass.block_till_done() -async def async_wait_recording_done(hass): +async def async_wait_recording_done_without_instance(hass: HomeAssistantType) -> None: """Block till recording is done.""" await hass.loop.run_in_executor(None, wait_recording_done, hass) -def trigger_db_commit(hass): +def trigger_db_commit(hass: HomeAssistantType) -> None: """Force the recorder to commit.""" for _ in range(recorder.DEFAULT_COMMIT_INTERVAL): # We only commit on time change fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1)) +async def async_wait_recording_done( + hass: HomeAssistantType, + instance: recorder.Recorder, +) -> None: + """Async wait until recording is done.""" + await hass.async_block_till_done() + async_trigger_db_commit(hass) + await hass.async_block_till_done() + await async_recorder_block_till_done(hass, instance) + await hass.async_block_till_done() + + +@ha.callback +def async_trigger_db_commit(hass: HomeAssistantType) -> None: + """Fore the recorder to commit. Async friendly.""" + for _ in range(recorder.DEFAULT_COMMIT_INTERVAL): + async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1)) + + +async def async_recorder_block_till_done( + hass: HomeAssistantType, + instance: recorder.Recorder, +) -> None: + """Non blocking version of recorder.block_till_done().""" + await hass.async_add_executor_job(instance.block_till_done) + + def corrupt_db_file(test_db_file): """Corrupt an sqlite3 database file.""" with open(test_db_file, "w+") as fhandle: diff --git a/tests/components/recorder/conftest.py b/tests/components/recorder/conftest.py index d91a86402ac..6eadb1c62ed 100644 --- a/tests/components/recorder/conftest.py +++ b/tests/components/recorder/conftest.py @@ -1,10 +1,24 @@ """Common test tools.""" +from __future__ import annotations + +from collections.abc import AsyncGenerator +from typing import Awaitable, Callable, cast import pytest +from homeassistant.components.recorder import Recorder from homeassistant.components.recorder.const import DATA_INSTANCE +from homeassistant.helpers.typing import ConfigType, HomeAssistantType -from tests.common import get_test_home_assistant, init_recorder_component +from .common import async_recorder_block_till_done + +from tests.common import ( + async_init_recorder_component, + get_test_home_assistant, + init_recorder_component, +) + +SetupRecorderInstanceT = Callable[..., Awaitable[Recorder]] @pytest.fixture @@ -22,3 +36,23 @@ def hass_recorder(): yield setup_recorder hass.stop() + + +@pytest.fixture +async def async_setup_recorder_instance() -> AsyncGenerator[ + SetupRecorderInstanceT, None +]: + """Yield callable to setup recorder instance.""" + + async def async_setup_recorder( + hass: HomeAssistantType, config: ConfigType | None = None + ) -> Recorder: + """Setup and return recorder instance.""" # noqa: D401 + await async_init_recorder_component(hass, config) + await hass.async_block_till_done() + instance = cast(Recorder, hass.data[DATA_INSTANCE]) + await async_recorder_block_till_done(hass, instance) + assert isinstance(instance, Recorder) + return instance + + yield async_setup_recorder diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 63f4b9887c6..b3c58995b37 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -28,10 +28,17 @@ from homeassistant.const import ( STATE_UNLOCKED, ) from homeassistant.core import Context, CoreState, callback +from homeassistant.helpers.typing import HomeAssistantType from homeassistant.setup import async_setup_component, setup_component from homeassistant.util import dt as dt_util -from .common import async_wait_recording_done, corrupt_db_file, wait_recording_done +from .common import ( + async_wait_recording_done, + async_wait_recording_done_without_instance, + corrupt_db_file, + wait_recording_done, +) +from .conftest import SetupRecorderInstanceT from tests.common import ( async_init_recorder_component, @@ -62,17 +69,19 @@ async def test_shutdown_before_startup_finishes(hass): assert run_info.end is not None -def test_saving_state(hass, hass_recorder): +async def test_saving_state( + hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT +): """Test saving and restoring a state.""" - hass = hass_recorder() + instance = await async_setup_recorder_instance(hass) entity_id = "test.recorder" state = "restoring_from_db" attributes = {"test_attr": 5, "test_attr_10": "nice"} - hass.states.set(entity_id, state, attributes) + hass.states.async_set(entity_id, state, attributes) - wait_recording_done(hass) + await async_wait_recording_done(hass, instance) with session_scope(hass=hass) as session: db_states = list(session.query(States)) @@ -690,15 +699,15 @@ async def test_database_corruption_while_running(hass, tmpdir, caplog): hass.states.async_set("test.lost", "on", {}) - await async_wait_recording_done(hass) + await async_wait_recording_done_without_instance(hass) await hass.async_add_executor_job(corrupt_db_file, test_db_file) - await async_wait_recording_done(hass) + await async_wait_recording_done_without_instance(hass) # This state will not be recorded because # the database corruption will be discovered # and we will have to rollback to recover hass.states.async_set("test.one", "off", {}) - await async_wait_recording_done(hass) + await async_wait_recording_done_without_instance(hass) assert "Unrecoverable sqlite3 database corruption detected" in caplog.text assert "The system will rename the corrupt database file" in caplog.text @@ -706,7 +715,7 @@ async def test_database_corruption_while_running(hass, tmpdir, caplog): # This state should go into the new database hass.states.async_set("test.two", "on", {}) - await async_wait_recording_done(hass) + await async_wait_recording_done_without_instance(hass) def _get_last_state(): with session_scope(hass=hass) as session: diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index aaf53000865..3535a58d33d 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -3,19 +3,23 @@ from datetime import timedelta import json from homeassistant.components import recorder -from homeassistant.components.recorder.const import DATA_INSTANCE from homeassistant.components.recorder.models import Events, RecorderRuns, States from homeassistant.components.recorder.purge import purge_old_data from homeassistant.components.recorder.util import session_scope +from homeassistant.helpers.typing import HomeAssistantType from homeassistant.util import dt as dt_util -from .common import wait_recording_done +from .common import async_wait_recording_done +from .conftest import SetupRecorderInstanceT -def test_purge_old_states(hass, hass_recorder): +async def test_purge_old_states( + hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT +): """Test deleting old states.""" - hass = hass_recorder() - _add_test_states(hass) + instance = await async_setup_recorder_instance(hass) + + await _add_test_states(hass, instance) # make sure we start with 6 states with session_scope(hass=hass) as session: @@ -28,7 +32,7 @@ def test_purge_old_states(hass, hass_recorder): assert events.count() == 6 # run purge_old_data() - finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) + finished = purge_old_data(instance, 4, repack=False) assert not finished assert states.count() == 2 @@ -36,35 +40,41 @@ def test_purge_old_states(hass, hass_recorder): assert states_after_purge[1].old_state_id == states_after_purge[0].state_id assert states_after_purge[0].old_state_id is None - finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) + finished = purge_old_data(instance, 4, repack=False) assert finished assert states.count() == 2 -def test_purge_old_events(hass, hass_recorder): +async def test_purge_old_events( + hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT +): """Test deleting old events.""" - hass = hass_recorder() - _add_test_events(hass) + instance = await async_setup_recorder_instance(hass) + + await _add_test_events(hass, instance) with session_scope(hass=hass) as session: events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) assert events.count() == 6 # run purge_old_data() - finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) + finished = purge_old_data(instance, 4, repack=False) assert not finished assert events.count() == 2 # we should only have 2 events left - finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) + finished = purge_old_data(instance, 4, repack=False) assert finished assert events.count() == 2 -def test_purge_old_recorder_runs(hass, hass_recorder): +async def test_purge_old_recorder_runs( + hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT +): """Test deleting old recorder runs keeps current run.""" - hass = hass_recorder() - _add_test_recorder_runs(hass) + instance = await async_setup_recorder_instance(hass) + + await _add_test_recorder_runs(hass, instance) # make sure we start with 7 recorder runs with session_scope(hass=hass) as session: @@ -72,21 +82,26 @@ def test_purge_old_recorder_runs(hass, hass_recorder): assert recorder_runs.count() == 7 # run purge_old_data() - finished = purge_old_data(hass.data[DATA_INSTANCE], 0, repack=False) + finished = purge_old_data(instance, 0, repack=False) assert not finished - finished = purge_old_data(hass.data[DATA_INSTANCE], 0, repack=False) + finished = purge_old_data(instance, 0, repack=False) assert finished assert recorder_runs.count() == 1 -def test_purge_method(hass, hass_recorder, caplog): +async def test_purge_method( + hass: HomeAssistantType, + async_setup_recorder_instance: SetupRecorderInstanceT, + caplog, +): """Test purge method.""" - hass = hass_recorder() + instance = await async_setup_recorder_instance(hass) + service_data = {"keep_days": 4} - _add_test_events(hass) - _add_test_states(hass) - _add_test_recorder_runs(hass) + await _add_test_events(hass, instance) + await _add_test_states(hass, instance) + await _add_test_recorder_runs(hass, instance) # make sure we start with 6 states with session_scope(hass=hass) as session: @@ -99,28 +114,26 @@ def test_purge_method(hass, hass_recorder, caplog): recorder_runs = session.query(RecorderRuns) assert recorder_runs.count() == 7 - hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(hass) + await hass.async_block_till_done() + await async_wait_recording_done(hass, instance) # run purge method - no service data, use defaults - hass.services.call("recorder", "purge") - hass.block_till_done() + await hass.services.async_call("recorder", "purge") + await hass.async_block_till_done() # Small wait for recorder thread - hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(hass) + await async_wait_recording_done(hass, instance) # only purged old events assert states.count() == 4 assert events.count() == 4 # run purge method - correct service data - hass.services.call("recorder", "purge", service_data=service_data) - hass.block_till_done() + await hass.services.async_call("recorder", "purge", service_data=service_data) + await hass.async_block_till_done() # Small wait for recorder thread - hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(hass) + await async_wait_recording_done(hass, instance) # we should only have 2 states left after purging assert states.count() == 2 @@ -135,23 +148,21 @@ def test_purge_method(hass, hass_recorder, caplog): # run purge method - correct service data, with repack service_data["repack"] = True - hass.services.call("recorder", "purge", service_data=service_data) - hass.block_till_done() - hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(hass) + await hass.services.async_call("recorder", "purge", service_data=service_data) + await hass.async_block_till_done() + await async_wait_recording_done(hass, instance) assert "Vacuuming SQL DB to free space" in caplog.text -def _add_test_states(hass): +async def _add_test_states(hass: HomeAssistantType, instance: recorder.Recorder): """Add multiple states to the db for testing.""" utcnow = dt_util.utcnow() five_days_ago = utcnow - timedelta(days=5) eleven_days_ago = utcnow - timedelta(days=11) attributes = {"test_attr": 5, "test_attr_10": "nice"} - hass.block_till_done() - hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(hass) + await hass.async_block_till_done() + await async_wait_recording_done(hass, instance) with recorder.session_scope(hass=hass) as session: old_state_id = None @@ -191,16 +202,15 @@ def _add_test_states(hass): old_state_id = state.state_id -def _add_test_events(hass): +async def _add_test_events(hass: HomeAssistantType, instance: recorder.Recorder): """Add a few events for testing.""" utcnow = dt_util.utcnow() five_days_ago = utcnow - timedelta(days=5) eleven_days_ago = utcnow - timedelta(days=11) event_data = {"test_attr": 5, "test_attr_10": "nice"} - hass.block_till_done() - hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(hass) + await hass.async_block_till_done() + await async_wait_recording_done(hass, instance) with recorder.session_scope(hass=hass) as session: for event_id in range(6): @@ -225,15 +235,14 @@ def _add_test_events(hass): ) -def _add_test_recorder_runs(hass): +async def _add_test_recorder_runs(hass: HomeAssistantType, instance: recorder.Recorder): """Add a few recorder_runs for testing.""" utcnow = dt_util.utcnow() five_days_ago = utcnow - timedelta(days=5) eleven_days_ago = utcnow - timedelta(days=11) - hass.block_till_done() - hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(hass) + await hass.async_block_till_done() + await async_wait_recording_done(hass, instance) with recorder.session_scope(hass=hass) as session: for rec_id in range(6):