Recorder improvements (#47739)

This commit is contained in:
Marc Mueller 2021-03-11 18:52:07 +01:00 committed by GitHub
parent af4d06b12e
commit 10848b9bdf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 207 additions and 77 deletions

View file

@ -1,6 +1,5 @@
"""Support for recording details."""
import asyncio
from collections import namedtuple
import concurrent.futures
from datetime import datetime
import logging
@ -8,7 +7,7 @@ import queue
import sqlite3
import threading
import time
from typing import Any, Callable, List, Optional
from typing import Any, Callable, List, NamedTuple, Optional
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select
from sqlalchemy.orm import scoped_session, sessionmaker
@ -223,7 +222,11 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return await instance.async_db_ready
PurgeTask = namedtuple("PurgeTask", ["keep_days", "repack"])
class PurgeTask(NamedTuple):
"""Object to store information about purge task."""
keep_days: int
repack: bool
class WaitTask:

View file

@ -64,6 +64,15 @@ class Events(Base): # type: ignore
Index("ix_events_event_type_time_fired", "event_type", "time_fired"),
)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.Events("
f"id={self.event_id}, type='{self.event_type}', data='{self.event_data}', "
f"origin='{self.origin}', time_fired='{self.time_fired}'"
f")>"
)
@staticmethod
def from_event(event, event_data=None):
"""Create an event database object from a native event."""
@ -129,6 +138,17 @@ class States(Base): # type: ignore
Index("ix_states_entity_id_last_updated", "entity_id", "last_updated"),
)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.States("
f"id={self.state_id}, domain='{self.domain}', entity_id='{self.entity_id}', "
f"state='{self.state}', event_id='{self.event_id}', "
f"last_updated='{self.last_updated.isoformat(sep=' ', timespec='seconds')}', "
f"old_state_id={self.old_state_id}"
f")>"
)
@staticmethod
def from_event(event):
"""Create object from a state_changed event."""
@ -185,6 +205,16 @@ class RecorderRuns(Base): # type: ignore
__table_args__ = (Index("ix_recorder_runs_start_end", "start", "end"),)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.RecorderRuns("
f"id={self.run_id}, start='{self.start.isoformat(sep='', timespec='seconds')}', "
f"end='{self.end.isoformat(sep='', timespec='seconds')}', closed_incorrect={self.closed_incorrect}, "
f"created='{self.created.isoformat(sep='', timespec='seconds')}'"
f")>"
)
def entity_ids(self, point_in_time=None):
"""Return the entity ids that existed in this run.
@ -219,6 +249,15 @@ class SchemaChanges(Base): # type: ignore
schema_version = Column(Integer)
changed = Column(DateTime(timezone=True), default=dt_util.utcnow)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.SchemaChanges("
f"id={self.change_id}, schema_version={self.schema_version}, "
f"changed='{self.changed.isoformat(sep=' ', timespec='seconds')}'"
f")>"
)
def process_timestamp(ts):
"""Process a timestamp into datetime object."""

View file

@ -28,14 +28,16 @@ def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool:
Cleans up an timeframe of an hour, based on the oldest record.
"""
purge_before = dt_util.utcnow() - timedelta(days=purge_days)
_LOGGER.debug("Purging states and events before target %s", purge_before)
_LOGGER.debug(
"Purging states and events before target %s",
purge_before.isoformat(sep=" ", timespec="seconds"),
)
try:
with session_scope(session=instance.get_session()) as session: # type: ignore
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
event_ids = _select_event_ids_to_purge(session, purge_before)
state_ids = _select_state_ids_to_purge(session, purge_before, event_ids)
if state_ids:
_disconnect_states_about_to_be_purged(session, state_ids)
_purge_state_ids(session, state_ids)
if event_ids:
_purge_event_ids(session, event_ids)
@ -66,7 +68,7 @@ def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool:
return True
def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list:
def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list[int]:
"""Return a list of event ids to purge."""
events = (
session.query(Events.event_id)
@ -79,8 +81,8 @@ def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list
def _select_state_ids_to_purge(
session: Session, purge_before: datetime, event_ids: list
) -> list:
session: Session, purge_before: datetime, event_ids: list[int]
) -> list[int]:
"""Return a list of state ids to purge."""
if not event_ids:
return []
@ -94,7 +96,9 @@ def _select_state_ids_to_purge(
return [state.state_id for state in states]
def _disconnect_states_about_to_be_purged(session: Session, state_ids: list) -> None:
def _purge_state_ids(session: Session, state_ids: list[int]) -> None:
"""Disconnect states and delete by state id."""
# Update old_state_id to NULL before deleting to ensure
# the delete does not fail due to a foreign key constraint
# since some databases (MSSQL) cannot do the ON DELETE SET NULL
@ -106,9 +110,6 @@ def _disconnect_states_about_to_be_purged(session: Session, state_ids: list) ->
)
_LOGGER.debug("Updated %s states to remove old_state_id", disconnected_rows)
def _purge_state_ids(session: Session, state_ids: list) -> None:
"""Delete by state id."""
deleted_rows = (
session.query(States)
.filter(States.state_id.in_(state_ids))
@ -117,7 +118,7 @@ def _purge_state_ids(session: Session, state_ids: list) -> None:
_LOGGER.debug("Deleted %s states", deleted_rows)
def _purge_event_ids(session: Session, event_ids: list) -> None:
def _purge_event_ids(session: Session, event_ids: list[int]) -> None:
"""Delete by event id."""
deleted_rows = (
session.query(Events)

View file

@ -1,4 +1,7 @@
"""SQLAlchemy util functions."""
from __future__ import annotations
from collections.abc import Generator
from contextlib import contextmanager
from datetime import timedelta
import logging
@ -6,7 +9,9 @@ import os
import time
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm.session import Session
from homeassistant.helpers.typing import HomeAssistantType
import homeassistant.util.dt as dt_util
from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, SQLITE_URL_PREFIX
@ -25,7 +30,9 @@ MAX_RESTART_TIME = timedelta(minutes=10)
@contextmanager
def session_scope(*, hass=None, session=None):
def session_scope(
*, hass: HomeAssistantType | None = None, session: Session | None = None
) -> Generator[Session, None, None]:
"""Provide a transactional scope around a series of operations."""
if session is None and hass is not None:
session = hass.data[DATA_INSTANCE].get_session()

View file

@ -1,14 +1,15 @@
"""Common test utils for working with recorder."""
from datetime import timedelta
from homeassistant import core as ha
from homeassistant.components import recorder
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util import dt as dt_util
from tests.common import fire_time_changed
from tests.common import async_fire_time_changed, fire_time_changed
def wait_recording_done(hass):
def wait_recording_done(hass: HomeAssistantType) -> None:
"""Block till recording is done."""
hass.block_till_done()
trigger_db_commit(hass)
@ -17,18 +18,45 @@ def wait_recording_done(hass):
hass.block_till_done()
async def async_wait_recording_done(hass):
async def async_wait_recording_done_without_instance(hass: HomeAssistantType) -> None:
"""Block till recording is done."""
await hass.loop.run_in_executor(None, wait_recording_done, hass)
def trigger_db_commit(hass):
def trigger_db_commit(hass: HomeAssistantType) -> None:
"""Force the recorder to commit."""
for _ in range(recorder.DEFAULT_COMMIT_INTERVAL):
# We only commit on time change
fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1))
async def async_wait_recording_done(
hass: HomeAssistantType,
instance: recorder.Recorder,
) -> None:
"""Async wait until recording is done."""
await hass.async_block_till_done()
async_trigger_db_commit(hass)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass, instance)
await hass.async_block_till_done()
@ha.callback
def async_trigger_db_commit(hass: HomeAssistantType) -> None:
"""Fore the recorder to commit. Async friendly."""
for _ in range(recorder.DEFAULT_COMMIT_INTERVAL):
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1))
async def async_recorder_block_till_done(
hass: HomeAssistantType,
instance: recorder.Recorder,
) -> None:
"""Non blocking version of recorder.block_till_done()."""
await hass.async_add_executor_job(instance.block_till_done)
def corrupt_db_file(test_db_file):
"""Corrupt an sqlite3 database file."""
with open(test_db_file, "w+") as fhandle:

View file

@ -1,10 +1,24 @@
"""Common test tools."""
from __future__ import annotations
from collections.abc import AsyncGenerator
from typing import Awaitable, Callable, cast
import pytest
from homeassistant.components.recorder import Recorder
from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
from tests.common import get_test_home_assistant, init_recorder_component
from .common import async_recorder_block_till_done
from tests.common import (
async_init_recorder_component,
get_test_home_assistant,
init_recorder_component,
)
SetupRecorderInstanceT = Callable[..., Awaitable[Recorder]]
@pytest.fixture
@ -22,3 +36,23 @@ def hass_recorder():
yield setup_recorder
hass.stop()
@pytest.fixture
async def async_setup_recorder_instance() -> AsyncGenerator[
SetupRecorderInstanceT, None
]:
"""Yield callable to setup recorder instance."""
async def async_setup_recorder(
hass: HomeAssistantType, config: ConfigType | None = None
) -> Recorder:
"""Setup and return recorder instance.""" # noqa: D401
await async_init_recorder_component(hass, config)
await hass.async_block_till_done()
instance = cast(Recorder, hass.data[DATA_INSTANCE])
await async_recorder_block_till_done(hass, instance)
assert isinstance(instance, Recorder)
return instance
yield async_setup_recorder

View file

@ -28,10 +28,17 @@ from homeassistant.const import (
STATE_UNLOCKED,
)
from homeassistant.core import Context, CoreState, callback
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.setup import async_setup_component, setup_component
from homeassistant.util import dt as dt_util
from .common import async_wait_recording_done, corrupt_db_file, wait_recording_done
from .common import (
async_wait_recording_done,
async_wait_recording_done_without_instance,
corrupt_db_file,
wait_recording_done,
)
from .conftest import SetupRecorderInstanceT
from tests.common import (
async_init_recorder_component,
@ -62,17 +69,19 @@ async def test_shutdown_before_startup_finishes(hass):
assert run_info.end is not None
def test_saving_state(hass, hass_recorder):
async def test_saving_state(
hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT
):
"""Test saving and restoring a state."""
hass = hass_recorder()
instance = await async_setup_recorder_instance(hass)
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
hass.states.set(entity_id, state, attributes)
hass.states.async_set(entity_id, state, attributes)
wait_recording_done(hass)
await async_wait_recording_done(hass, instance)
with session_scope(hass=hass) as session:
db_states = list(session.query(States))
@ -690,15 +699,15 @@ async def test_database_corruption_while_running(hass, tmpdir, caplog):
hass.states.async_set("test.lost", "on", {})
await async_wait_recording_done(hass)
await async_wait_recording_done_without_instance(hass)
await hass.async_add_executor_job(corrupt_db_file, test_db_file)
await async_wait_recording_done(hass)
await async_wait_recording_done_without_instance(hass)
# This state will not be recorded because
# the database corruption will be discovered
# and we will have to rollback to recover
hass.states.async_set("test.one", "off", {})
await async_wait_recording_done(hass)
await async_wait_recording_done_without_instance(hass)
assert "Unrecoverable sqlite3 database corruption detected" in caplog.text
assert "The system will rename the corrupt database file" in caplog.text
@ -706,7 +715,7 @@ async def test_database_corruption_while_running(hass, tmpdir, caplog):
# This state should go into the new database
hass.states.async_set("test.two", "on", {})
await async_wait_recording_done(hass)
await async_wait_recording_done_without_instance(hass)
def _get_last_state():
with session_scope(hass=hass) as session:

View file

@ -3,19 +3,23 @@ from datetime import timedelta
import json
from homeassistant.components import recorder
from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.models import Events, RecorderRuns, States
from homeassistant.components.recorder.purge import purge_old_data
from homeassistant.components.recorder.util import session_scope
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util import dt as dt_util
from .common import wait_recording_done
from .common import async_wait_recording_done
from .conftest import SetupRecorderInstanceT
def test_purge_old_states(hass, hass_recorder):
async def test_purge_old_states(
hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT
):
"""Test deleting old states."""
hass = hass_recorder()
_add_test_states(hass)
instance = await async_setup_recorder_instance(hass)
await _add_test_states(hass, instance)
# make sure we start with 6 states
with session_scope(hass=hass) as session:
@ -28,7 +32,7 @@ def test_purge_old_states(hass, hass_recorder):
assert events.count() == 6
# run purge_old_data()
finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False)
finished = purge_old_data(instance, 4, repack=False)
assert not finished
assert states.count() == 2
@ -36,35 +40,41 @@ def test_purge_old_states(hass, hass_recorder):
assert states_after_purge[1].old_state_id == states_after_purge[0].state_id
assert states_after_purge[0].old_state_id is None
finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False)
finished = purge_old_data(instance, 4, repack=False)
assert finished
assert states.count() == 2
def test_purge_old_events(hass, hass_recorder):
async def test_purge_old_events(
hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT
):
"""Test deleting old events."""
hass = hass_recorder()
_add_test_events(hass)
instance = await async_setup_recorder_instance(hass)
await _add_test_events(hass, instance)
with session_scope(hass=hass) as session:
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
assert events.count() == 6
# run purge_old_data()
finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False)
finished = purge_old_data(instance, 4, repack=False)
assert not finished
assert events.count() == 2
# we should only have 2 events left
finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False)
finished = purge_old_data(instance, 4, repack=False)
assert finished
assert events.count() == 2
def test_purge_old_recorder_runs(hass, hass_recorder):
async def test_purge_old_recorder_runs(
hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT
):
"""Test deleting old recorder runs keeps current run."""
hass = hass_recorder()
_add_test_recorder_runs(hass)
instance = await async_setup_recorder_instance(hass)
await _add_test_recorder_runs(hass, instance)
# make sure we start with 7 recorder runs
with session_scope(hass=hass) as session:
@ -72,21 +82,26 @@ def test_purge_old_recorder_runs(hass, hass_recorder):
assert recorder_runs.count() == 7
# run purge_old_data()
finished = purge_old_data(hass.data[DATA_INSTANCE], 0, repack=False)
finished = purge_old_data(instance, 0, repack=False)
assert not finished
finished = purge_old_data(hass.data[DATA_INSTANCE], 0, repack=False)
finished = purge_old_data(instance, 0, repack=False)
assert finished
assert recorder_runs.count() == 1
def test_purge_method(hass, hass_recorder, caplog):
async def test_purge_method(
hass: HomeAssistantType,
async_setup_recorder_instance: SetupRecorderInstanceT,
caplog,
):
"""Test purge method."""
hass = hass_recorder()
instance = await async_setup_recorder_instance(hass)
service_data = {"keep_days": 4}
_add_test_events(hass)
_add_test_states(hass)
_add_test_recorder_runs(hass)
await _add_test_events(hass, instance)
await _add_test_states(hass, instance)
await _add_test_recorder_runs(hass, instance)
# make sure we start with 6 states
with session_scope(hass=hass) as session:
@ -99,28 +114,26 @@ def test_purge_method(hass, hass_recorder, caplog):
recorder_runs = session.query(RecorderRuns)
assert recorder_runs.count() == 7
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
await hass.async_block_till_done()
await async_wait_recording_done(hass, instance)
# run purge method - no service data, use defaults
hass.services.call("recorder", "purge")
hass.block_till_done()
await hass.services.async_call("recorder", "purge")
await hass.async_block_till_done()
# Small wait for recorder thread
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
await async_wait_recording_done(hass, instance)
# only purged old events
assert states.count() == 4
assert events.count() == 4
# run purge method - correct service data
hass.services.call("recorder", "purge", service_data=service_data)
hass.block_till_done()
await hass.services.async_call("recorder", "purge", service_data=service_data)
await hass.async_block_till_done()
# Small wait for recorder thread
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
await async_wait_recording_done(hass, instance)
# we should only have 2 states left after purging
assert states.count() == 2
@ -135,23 +148,21 @@ def test_purge_method(hass, hass_recorder, caplog):
# run purge method - correct service data, with repack
service_data["repack"] = True
hass.services.call("recorder", "purge", service_data=service_data)
hass.block_till_done()
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
await hass.services.async_call("recorder", "purge", service_data=service_data)
await hass.async_block_till_done()
await async_wait_recording_done(hass, instance)
assert "Vacuuming SQL DB to free space" in caplog.text
def _add_test_states(hass):
async def _add_test_states(hass: HomeAssistantType, instance: recorder.Recorder):
"""Add multiple states to the db for testing."""
utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
attributes = {"test_attr": 5, "test_attr_10": "nice"}
hass.block_till_done()
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
await hass.async_block_till_done()
await async_wait_recording_done(hass, instance)
with recorder.session_scope(hass=hass) as session:
old_state_id = None
@ -191,16 +202,15 @@ def _add_test_states(hass):
old_state_id = state.state_id
def _add_test_events(hass):
async def _add_test_events(hass: HomeAssistantType, instance: recorder.Recorder):
"""Add a few events for testing."""
utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
event_data = {"test_attr": 5, "test_attr_10": "nice"}
hass.block_till_done()
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
await hass.async_block_till_done()
await async_wait_recording_done(hass, instance)
with recorder.session_scope(hass=hass) as session:
for event_id in range(6):
@ -225,15 +235,14 @@ def _add_test_events(hass):
)
def _add_test_recorder_runs(hass):
async def _add_test_recorder_runs(hass: HomeAssistantType, instance: recorder.Recorder):
"""Add a few recorder_runs for testing."""
utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
hass.block_till_done()
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
await hass.async_block_till_done()
await async_wait_recording_done(hass, instance)
with recorder.session_scope(hass=hass) as session:
for rec_id in range(6):