hass-core/tests/components/recorder/test_purge.py
J. Nick Koston 73f6e3c07b
Narrow sqlite database corruption check to ensure disk image is malformed ()
* Narrow sqlite database corruption check to ensure disk image is malformed

The database corruption check would also replace the database when it
locked externally instead of only when its malformed.

This was discovered in https://github.com/home-assistant/core/issues/121909#issuecomment-2227409124
when a user did a manual index creation while HA was online

* tweak

* tweak

* fix

* fix
2024-07-14 23:23:07 +02:00

1976 lines
69 KiB
Python

"""Test data purging."""
from collections.abc import Generator
from datetime import datetime, timedelta
import json
import sqlite3
from unittest.mock import patch
from freezegun import freeze_time
import pytest
from sqlalchemy.exc import DatabaseError, OperationalError
from sqlalchemy.orm.session import Session
from voluptuous.error import MultipleInvalid
from homeassistant.components.recorder import DOMAIN as RECORDER_DOMAIN, Recorder
from homeassistant.components.recorder.const import SupportedDialect
from homeassistant.components.recorder.db_schema import (
Events,
EventTypes,
RecorderRuns,
StateAttributes,
States,
StatesMeta,
StatisticsRuns,
StatisticsShortTerm,
)
from homeassistant.components.recorder.history import get_significant_states
from homeassistant.components.recorder.purge import purge_old_data
from homeassistant.components.recorder.queries import select_event_type_ids
from homeassistant.components.recorder.services import (
SERVICE_PURGE,
SERVICE_PURGE_ENTITIES,
)
from homeassistant.components.recorder.tasks import PurgeTask
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import EVENT_STATE_CHANGED, EVENT_THEMES_UPDATED, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util
from .common import (
async_recorder_block_till_done,
async_wait_purge_done,
async_wait_recording_done,
convert_pending_events_to_event_types,
convert_pending_states_to_meta,
)
from tests.typing import RecorderInstanceGenerator
TEST_EVENT_TYPES = (
"EVENT_TEST_AUTOPURGE",
"EVENT_TEST_PURGE",
"EVENT_TEST",
"EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA",
"EVENT_TEST_PURGE_WITH_EVENT_DATA",
"EVENT_TEST_WITH_EVENT_DATA",
)
@pytest.fixture
async def mock_recorder_before_hass(
async_test_recorder: RecorderInstanceGenerator,
) -> None:
"""Set up recorder."""
@pytest.fixture(name="use_sqlite")
def mock_use_sqlite(request: pytest.FixtureRequest) -> Generator[None]:
"""Pytest fixture to switch purge method."""
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name",
return_value=SupportedDialect.SQLITE
if request.param
else SupportedDialect.MYSQL,
):
yield
async def test_purge_big_database(hass: HomeAssistant, recorder_mock: Recorder) -> None:
"""Test deleting 2/3 old states from a big database."""
for _ in range(12):
await _add_test_states(hass, wait_recording_done=False)
await async_wait_recording_done(hass)
with (
patch.object(recorder_mock, "max_bind_vars", 72),
patch.object(recorder_mock.database_engine, "max_bind_vars", 72),
session_scope(hass=hass) as session,
):
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 72
assert state_attributes.count() == 3
purge_before = dt_util.utcnow() - timedelta(days=4)
finished = purge_old_data(
recorder_mock,
purge_before,
states_batch_size=1,
events_batch_size=1,
repack=False,
)
assert not finished
assert states.count() == 24
assert state_attributes.count() == 1
async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) -> None:
"""Test deleting old states."""
await _add_test_states(hass)
# make sure we start with 6 states
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 6
assert states[0].old_state_id is None
assert states[5].old_state_id == states[4].state_id
assert state_attributes.count() == 3
events = session.query(Events).filter(Events.event_type == "state_changed")
assert events.count() == 0
assert "test.recorder2" in recorder_mock.states_manager._last_committed_id
purge_before = dt_util.utcnow() - timedelta(days=4)
# run purge_old_data()
finished = purge_old_data(
recorder_mock,
purge_before,
states_batch_size=1,
events_batch_size=1,
repack=False,
)
assert not finished
assert states.count() == 2
assert state_attributes.count() == 1
assert "test.recorder2" in recorder_mock.states_manager._last_committed_id
states_after_purge = list(session.query(States))
# Since these states are deleted in batches, we can't guarantee the order
# but we can look them up by state
state_map_by_state = {state.state: state for state in states_after_purge}
dontpurgeme_5 = state_map_by_state["dontpurgeme_5"]
dontpurgeme_4 = state_map_by_state["dontpurgeme_4"]
assert dontpurgeme_5.old_state_id == dontpurgeme_4.state_id
assert dontpurgeme_4.old_state_id is None
finished = purge_old_data(recorder_mock, purge_before, repack=False)
assert finished
assert states.count() == 2
assert state_attributes.count() == 1
assert "test.recorder2" in recorder_mock.states_manager._last_committed_id
# run purge_old_data again
purge_before = dt_util.utcnow()
finished = purge_old_data(
recorder_mock,
purge_before,
states_batch_size=1,
events_batch_size=1,
repack=False,
)
assert not finished
assert states.count() == 0
assert state_attributes.count() == 0
assert "test.recorder2" not in recorder_mock.states_manager._last_committed_id
# Add some more states
await _add_test_states(hass)
# make sure we start with 6 states
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 6
assert states[0].old_state_id is None
assert states[5].old_state_id == states[4].state_id
events = session.query(Events).filter(Events.event_type == "state_changed")
assert events.count() == 0
assert "test.recorder2" in recorder_mock.states_manager._last_committed_id
state_attributes = session.query(StateAttributes)
assert state_attributes.count() == 3
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("recorder_mock", "skip_by_db_engine")
async def test_purge_old_states_encouters_database_corruption(
hass: HomeAssistant,
) -> None:
"""Test database image image is malformed while deleting old states.
This test is specific for SQLite, wiping the database on error only happens
with SQLite.
"""
await _add_test_states(hass)
await async_wait_recording_done(hass)
sqlite3_exception = DatabaseError("statement", {}, [])
sqlite3_exception.__cause__ = sqlite3.DatabaseError("not a database")
with (
patch(
"homeassistant.components.recorder.core.move_away_broken_database"
) as move_away,
patch(
"homeassistant.components.recorder.purge.purge_old_data",
side_effect=sqlite3_exception,
),
):
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, {"keep_days": 0})
await hass.async_block_till_done()
await async_wait_recording_done(hass)
assert move_away.called
# Ensure the whole database was reset due to the database error
with session_scope(hass=hass) as session:
states_after_purge = session.query(States)
assert states_after_purge.count() == 0
async def test_purge_old_states_encounters_temporary_mysql_error(
hass: HomeAssistant,
recorder_mock: Recorder,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test retry on specific mysql operational errors."""
await _add_test_states(hass)
await async_wait_recording_done(hass)
mysql_exception = OperationalError("statement", {}, [])
mysql_exception.orig = Exception(1205, "retryable")
with (
patch("homeassistant.components.recorder.util.time.sleep") as sleep_mock,
patch(
"homeassistant.components.recorder.purge._purge_old_recorder_runs",
side_effect=[mysql_exception, None],
),
patch.object(recorder_mock.engine.dialect, "name", "mysql"),
):
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, {"keep_days": 0})
await hass.async_block_till_done()
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
assert "retrying" in caplog.text
assert sleep_mock.called
@pytest.mark.usefixtures("recorder_mock")
async def test_purge_old_states_encounters_operational_error(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test error on operational errors that are not mysql does not retry."""
await _add_test_states(hass)
await async_wait_recording_done(hass)
exception = OperationalError("statement", {}, [])
with patch(
"homeassistant.components.recorder.purge._purge_old_recorder_runs",
side_effect=exception,
):
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, {"keep_days": 0})
await hass.async_block_till_done()
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
assert "retrying" not in caplog.text
assert "Error executing purge" in caplog.text
async def test_purge_old_events(hass: HomeAssistant, recorder_mock: Recorder) -> None:
"""Test deleting old events."""
await _add_test_events(hass)
with session_scope(hass=hass) as session:
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
assert events.count() == 6
purge_before = dt_util.utcnow() - timedelta(days=4)
# run purge_old_data()
finished = purge_old_data(
recorder_mock,
purge_before,
repack=False,
events_batch_size=1,
states_batch_size=1,
)
assert not finished
all_events = events.all()
assert events.count() == 2, f"Should have 2 events left: {all_events}"
# we should only have 2 events left
finished = purge_old_data(
recorder_mock,
purge_before,
repack=False,
events_batch_size=1,
states_batch_size=1,
)
assert finished
assert events.count() == 2
async def test_purge_old_recorder_runs(
hass: HomeAssistant, recorder_mock: Recorder
) -> None:
"""Test deleting old recorder runs keeps current run."""
await _add_test_recorder_runs(hass)
# make sure we start with 7 recorder runs
with session_scope(hass=hass) as session:
recorder_runs = session.query(RecorderRuns)
assert recorder_runs.count() == 7
purge_before = dt_util.utcnow()
# run purge_old_data()
finished = purge_old_data(
recorder_mock,
purge_before,
repack=False,
events_batch_size=1,
states_batch_size=1,
)
assert not finished
finished = purge_old_data(
recorder_mock,
purge_before,
repack=False,
events_batch_size=1,
states_batch_size=1,
)
assert finished
assert recorder_runs.count() == 1
async def test_purge_old_statistics_runs(
hass: HomeAssistant, recorder_mock: Recorder
) -> None:
"""Test deleting old statistics runs keeps the latest run."""
await _add_test_statistics_runs(hass)
# make sure we start with 7 statistics runs
with session_scope(hass=hass) as session:
statistics_runs = session.query(StatisticsRuns)
assert statistics_runs.count() == 7
purge_before = dt_util.utcnow()
# run purge_old_data()
finished = purge_old_data(recorder_mock, purge_before, repack=False)
assert not finished
finished = purge_old_data(recorder_mock, purge_before, repack=False)
assert finished
assert statistics_runs.count() == 1
@pytest.mark.parametrize("use_sqlite", [True, False], indirect=True)
@pytest.mark.usefixtures("recorder_mock")
async def test_purge_method(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
use_sqlite: bool,
) -> None:
"""Test purge method."""
def assert_recorder_runs_equal(run1, run2):
assert run1.run_id == run2.run_id
assert run1.start == run2.start
assert run1.end == run2.end
assert run1.closed_incorrect == run2.closed_incorrect
assert run1.created == run2.created
def assert_statistic_runs_equal(run1, run2):
assert run1.run_id == run2.run_id
assert run1.start == run2.start
service_data = {"keep_days": 4}
await _add_test_events(hass)
await _add_test_states(hass)
await _add_test_statistics(hass)
await _add_test_recorder_runs(hass)
await _add_test_statistics_runs(hass)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
# make sure we start with 6 states
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 6
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
assert events.count() == 6
statistics = session.query(StatisticsShortTerm)
assert statistics.count() == 6
recorder_runs = session.query(RecorderRuns)
assert recorder_runs.count() == 7
runs_before_purge = recorder_runs.all()
statistics_runs = session.query(StatisticsRuns).order_by(StatisticsRuns.run_id)
assert statistics_runs.count() == 7
statistic_runs_before_purge = statistics_runs.all()
for itm in runs_before_purge:
session.expunge(itm)
for itm in statistic_runs_before_purge:
session.expunge(itm)
await hass.async_block_till_done()
await async_wait_purge_done(hass)
# run purge method - no service data, use defaults
await hass.services.async_call("recorder", "purge")
await hass.async_block_till_done()
# Small wait for recorder thread
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
statistics = session.query(StatisticsShortTerm)
# only purged old states, events and statistics
assert states.count() == 4
assert events.count() == 4
assert statistics.count() == 4
# run purge method - correct service data
await hass.services.async_call("recorder", "purge", service_data=service_data)
await hass.async_block_till_done()
# Small wait for recorder thread
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
statistics = session.query(StatisticsShortTerm)
recorder_runs = session.query(RecorderRuns)
statistics_runs = session.query(StatisticsRuns)
# we should only have 2 states, events and statistics left after purging
assert states.count() == 2
assert events.count() == 2
assert statistics.count() == 2
# now we should only have 3 recorder runs left
runs = recorder_runs.all()
assert_recorder_runs_equal(runs[0], runs_before_purge[0])
assert_recorder_runs_equal(runs[1], runs_before_purge[5])
assert_recorder_runs_equal(runs[2], runs_before_purge[6])
# now we should only have 3 statistics runs left
runs = statistics_runs.all()
assert_statistic_runs_equal(runs[0], statistic_runs_before_purge[0])
assert_statistic_runs_equal(runs[1], statistic_runs_before_purge[5])
assert_statistic_runs_equal(runs[2], statistic_runs_before_purge[6])
assert "EVENT_TEST_PURGE" not in (event.event_type for event in events.all())
# run purge method - correct service data, with repack
service_data["repack"] = True
await hass.services.async_call("recorder", "purge", service_data=service_data)
await hass.async_block_till_done()
await async_wait_purge_done(hass)
assert (
"Vacuuming SQL DB to free space" in caplog.text
or "Optimizing SQL DB to free space" in caplog.text
)
@pytest.mark.parametrize("use_sqlite", [True, False], indirect=True)
async def test_purge_edge_case(
hass: HomeAssistant,
recorder_mock: Recorder,
use_sqlite: bool,
) -> None:
"""Test states and events are purged even if they occurred shortly before purge_before."""
async def _add_db_entries(hass: HomeAssistant, timestamp: datetime) -> None:
with session_scope(hass=hass) as session:
session.add(
Events(
event_id=1001,
event_type="EVENT_TEST_PURGE",
event_data="{}",
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
session.add(
States(
entity_id="test.recorder2",
state="purgeme",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
event_id=1001,
attributes_id=1002,
)
)
session.add(
StateAttributes(
shared_attrs="{}",
hash=1234,
attributes_id=1002,
)
)
convert_pending_events_to_event_types(recorder_mock, session)
convert_pending_states_to_meta(recorder_mock, session)
await async_wait_purge_done(hass)
service_data = {"keep_days": 2}
timestamp = dt_util.utcnow() - timedelta(days=2, minutes=1)
await _add_db_entries(hass, timestamp)
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 1
state_attributes = session.query(StateAttributes)
assert state_attributes.count() == 1
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
assert events.count() == 1
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 0
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
assert events.count() == 0
async def test_purge_cutoff_date(hass: HomeAssistant, recorder_mock: Recorder) -> None:
"""Test states and events are purged only if they occurred before "now() - keep_days"."""
async def _add_db_entries(hass: HomeAssistant, cutoff: datetime, rows: int) -> None:
timestamp_keep = cutoff
timestamp_purge = cutoff - timedelta(microseconds=1)
with session_scope(hass=hass) as session:
session.add(
Events(
event_id=1000,
event_type="KEEP",
event_data="{}",
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp_keep),
)
)
session.add(
States(
entity_id="test.cutoff",
state="keep",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp_keep),
last_updated_ts=dt_util.utc_to_timestamp(timestamp_keep),
event_id=1000,
attributes_id=1000,
)
)
session.add(
StateAttributes(
shared_attrs="{}",
hash=1234,
attributes_id=1000,
)
)
for row in range(1, rows):
session.add(
Events(
event_id=1000 + row,
event_type="PURGE",
event_data="{}",
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp_purge),
)
)
session.add(
States(
entity_id="test.cutoff",
state="purge",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp_purge),
last_updated_ts=dt_util.utc_to_timestamp(timestamp_purge),
event_id=1000 + row,
attributes_id=1000 + row,
)
)
session.add(
StateAttributes(
shared_attrs="{}",
hash=1234,
attributes_id=1000 + row,
)
)
convert_pending_events_to_event_types(recorder_mock, session)
convert_pending_states_to_meta(recorder_mock, session)
await async_wait_purge_done(hass)
service_data = {"keep_days": 2}
# Force multiple purge batches to be run
rows = 999
cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"])
await _add_db_entries(hass, cutoff, rows)
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.filter(States.state == "purge").count() == rows - 1
assert states.filter(States.state == "keep").count() == 1
assert (
state_attributes.outerjoin(
States, StateAttributes.attributes_id == States.attributes_id
)
.filter(States.state == "keep")
.count()
== 1
)
assert (
session.query(Events)
.filter(Events.event_type_id.in_(select_event_type_ids(("PURGE",))))
.count()
== rows - 1
)
assert (
session.query(Events)
.filter(Events.event_type_id.in_(select_event_type_ids(("KEEP",))))
.count()
== 1
)
recorder_mock.queue_task(PurgeTask(cutoff, repack=False, apply_filter=False))
await hass.async_block_till_done()
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
session.query(Events)
assert states.filter(States.state == "purge").count() == 0
assert (
state_attributes.outerjoin(
States, StateAttributes.attributes_id == States.attributes_id
)
.filter(States.state == "purge")
.count()
== 0
)
assert states.filter(States.state == "keep").count() == 1
assert (
state_attributes.outerjoin(
States, StateAttributes.attributes_id == States.attributes_id
)
.filter(States.state == "keep")
.count()
== 1
)
assert (
session.query(Events)
.filter(Events.event_type_id.in_(select_event_type_ids(("PURGE",))))
.count()
== 0
)
assert (
session.query(Events)
.filter(Events.event_type_id.in_(select_event_type_ids(("KEEP",))))
.count()
== 1
)
# Make sure we can purge everything
recorder_mock.queue_task(
PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False)
)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 0
assert state_attributes.count() == 0
# Make sure we can purge everything when the db is already empty
recorder_mock.queue_task(
PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False)
)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 0
assert state_attributes.count() == 0
@pytest.mark.parametrize("use_sqlite", [True, False], indirect=True)
@pytest.mark.parametrize(
"recorder_config", [{"exclude": {"entities": ["sensor.excluded"]}}]
)
async def test_purge_filtered_states(
hass: HomeAssistant,
recorder_mock: Recorder,
use_sqlite: bool,
) -> None:
"""Test filtered states are purged."""
assert recorder_mock.entity_filter("sensor.excluded") is False
def _add_db_entries(hass: HomeAssistant) -> None:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
_add_state_with_state_attributes(
session,
"sensor.excluded",
"purgeme",
timestamp,
event_id * days,
)
# Add state **without** state_changed event that should be purged
timestamp = dt_util.utcnow() - timedelta(days=1)
session.add(
States(
entity_id="sensor.excluded",
state="purgeme",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
)
)
# Add states and state_changed events that should be keeped
timestamp = dt_util.utcnow() - timedelta(days=2)
for event_id in range(200, 210):
_add_state_with_state_attributes(
session,
"sensor.keep",
"keep",
timestamp,
event_id,
)
# Add states with linked old_state_ids that need to be handled
timestamp = dt_util.utcnow() - timedelta(days=0)
state_attrs = StateAttributes(
hash=0,
shared_attrs=json.dumps(
{"sensor.linked_old_state_id": "sensor.linked_old_state_id"}
),
)
state_1 = States(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=1,
state_attributes=state_attrs,
)
timestamp = dt_util.utcnow() - timedelta(days=4)
state_2 = States(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=2,
state_attributes=state_attrs,
)
state_3 = States(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=62, # keep
state_attributes=state_attrs,
)
session.add_all((state_attrs, state_1, state_2, state_3))
# Add event that should be keeped
session.add(
Events(
event_id=100,
event_type="EVENT_KEEP",
event_data="{}",
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
convert_pending_states_to_meta(recorder_mock, session)
convert_pending_events_to_event_types(recorder_mock, session)
service_data = {"keep_days": 10}
_add_db_entries(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 74
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
)
assert events_keep.count() == 1
# Normal purge doesn't remove excluded entities
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 74
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
)
assert events_keep.count() == 1
# Test with 'apply_filter' = True
service_data["apply_filter"] = True
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 13
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
)
assert events_keep.count() == 1
states_sensor_excluded = (
session.query(States)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
.filter(StatesMeta.entity_id == "sensor.excluded")
)
assert states_sensor_excluded.count() == 0
assert (
session.query(States).filter(States.state_id == 72).first().old_state_id
is None
)
assert (
session.query(States).filter(States.state_id == 72).first().attributes_id
== 71
)
assert (
session.query(States).filter(States.state_id == 73).first().old_state_id
is None
)
assert (
session.query(States).filter(States.state_id == 73).first().attributes_id
== 71
)
final_keep_state = session.query(States).filter(States.state_id == 74).first()
assert final_keep_state.old_state_id == 62 # should have been kept
assert final_keep_state.attributes_id == 71
assert session.query(StateAttributes).count() == 11
# Do it again to make sure nothing changes
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
final_keep_state = session.query(States).filter(States.state_id == 74).first()
assert final_keep_state.old_state_id == 62 # should have been kept
assert final_keep_state.attributes_id == 71
assert session.query(StateAttributes).count() == 11
service_data = {"keep_days": 0}
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
remaining = list(session.query(States))
for state in remaining:
assert state.event_id is None
assert len(remaining) == 0
assert session.query(StateAttributes).count() == 0
@pytest.mark.parametrize("use_sqlite", [True, False], indirect=True)
@pytest.mark.parametrize(
"recorder_config", [{"exclude": {"entities": ["sensor.excluded"]}}]
)
async def test_purge_filtered_states_to_empty(
hass: HomeAssistant,
recorder_mock: Recorder,
use_sqlite: bool,
) -> None:
"""Test filtered states are purged all the way to an empty db."""
assert recorder_mock.entity_filter("sensor.excluded") is False
def _add_db_entries(hass: HomeAssistant) -> None:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
_add_state_with_state_attributes(
session,
"sensor.excluded",
"purgeme",
timestamp,
event_id * days,
)
convert_pending_states_to_meta(recorder_mock, session)
service_data = {"keep_days": 10}
_add_db_entries(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 60
assert state_attributes.count() == 60
# Test with 'apply_filter' = True
service_data["apply_filter"] = True
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 0
assert state_attributes.count() == 0
# Do it again to make sure nothing changes
# Why do we do this? Should we check the end result?
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
@pytest.mark.parametrize("use_sqlite", [True, False], indirect=True)
@pytest.mark.parametrize(
"recorder_config", [{"exclude": {"entities": ["sensor.old_format"]}}]
)
async def test_purge_without_state_attributes_filtered_states_to_empty(
hass: HomeAssistant,
recorder_mock: Recorder,
use_sqlite: bool,
) -> None:
"""Test filtered legacy states without state attributes are purged all the way to an empty db."""
assert recorder_mock.entity_filter("sensor.old_format") is False
def _add_db_entries(hass: HomeAssistant) -> None:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
# in the legacy format
timestamp = dt_util.utcnow() - timedelta(days=5)
event_id = 1021
session.add(
States(
entity_id="sensor.old_format",
state=STATE_ON,
attributes=json.dumps({"old": "not_using_state_attributes"}),
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
event_id=event_id,
state_attributes=None,
)
)
session.add(
Events(
event_id=event_id,
event_type=EVENT_STATE_CHANGED,
event_data="{}",
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
session.add(
Events(
event_id=event_id + 1,
event_type=EVENT_THEMES_UPDATED,
event_data="{}",
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
convert_pending_states_to_meta(recorder_mock, session)
convert_pending_events_to_event_types(recorder_mock, session)
service_data = {"keep_days": 10}
_add_db_entries(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 1
assert state_attributes.count() == 0
# Test with 'apply_filter' = True
service_data["apply_filter"] = True
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 0
assert state_attributes.count() == 0
# Do it again to make sure nothing changes
# Why do we do this? Should we check the end result?
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
@pytest.mark.parametrize(
"recorder_config", [{"exclude": {"event_types": ["EVENT_PURGE"]}}]
)
async def test_purge_filtered_events(
hass: HomeAssistant, recorder_mock: Recorder
) -> None:
"""Test filtered events are purged."""
await async_wait_recording_done(hass)
def _add_db_entries(hass: HomeAssistant) -> None:
with session_scope(hass=hass) as session:
# Add events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
session.add(
Events(
event_id=event_id * days,
event_type="EVENT_PURGE",
event_data="{}",
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
# Add states and state_changed events that should be keeped
timestamp = dt_util.utcnow() - timedelta(days=1)
for event_id in range(200, 210):
_add_state_with_state_attributes(
session,
"sensor.keep",
"keep",
timestamp,
event_id,
)
convert_pending_events_to_event_types(recorder_mock, session)
convert_pending_states_to_meta(recorder_mock, session)
service_data = {"keep_days": 10}
await recorder_mock.async_add_executor_job(_add_db_entries, hass)
await async_wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
events_purge = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
)
states = session.query(States)
assert events_purge.count() == 60
assert states.count() == 10
# Normal purge doesn't remove excluded events
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass, read_only=True) as session:
events_purge = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
)
states = session.query(States)
assert events_purge.count() == 60
assert states.count() == 10
# Test with 'apply_filter' = True
service_data["apply_filter"] = True
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass, read_only=True) as session:
events_purge = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
)
states = session.query(States)
assert events_purge.count() == 0
assert states.count() == 10
@pytest.mark.parametrize(
"recorder_config",
[
{
"exclude": {
"event_types": ["excluded_event"],
"entities": ["sensor.excluded", "sensor.old_format"],
}
}
],
)
async def test_purge_filtered_events_state_changed(
hass: HomeAssistant, recorder_mock: Recorder
) -> None:
"""Test filtered state_changed events are purged. This should also remove all states."""
# Assert entity_id is NOT excluded
assert recorder_mock.entity_filter("sensor.excluded") is False
assert recorder_mock.entity_filter("sensor.old_format") is False
assert recorder_mock.entity_filter("sensor.keep") is True
assert "excluded_event" in recorder_mock.exclude_event_types
def _add_db_entries(hass: HomeAssistant) -> None:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
_add_state_with_state_attributes(
session,
"sensor.excluded",
"purgeme",
timestamp,
event_id * days,
)
# Add events that should be keeped
timestamp = dt_util.utcnow() - timedelta(days=1)
for event_id in range(200, 210):
session.add(
Events(
event_id=event_id,
event_type="EVENT_KEEP",
event_data="{}",
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
# Add states with linked old_state_ids that need to be handled
timestamp = dt_util.utcnow() - timedelta(days=0)
state_1 = States(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=1,
)
timestamp = dt_util.utcnow() - timedelta(days=4)
state_2 = States(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=2,
)
state_3 = States(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=62, # keep
)
session.add_all((state_1, state_2, state_3))
session.add(
Events(
event_id=231,
event_type="excluded_event",
event_data="{}",
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
session.add(
States(
entity_id="sensor.old_format",
state="remove",
attributes="{}",
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
)
)
convert_pending_events_to_event_types(recorder_mock, session)
convert_pending_states_to_meta(recorder_mock, session)
service_data = {"keep_days": 10, "apply_filter": True}
_add_db_entries(hass)
with session_scope(hass=hass) as session:
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
)
events_purge = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("excluded_event",)))
)
states = session.query(States)
assert events_keep.count() == 10
assert events_purge.count() == 1
assert states.count() == 64
await hass.services.async_call(RECORDER_DOMAIN, SERVICE_PURGE, service_data)
await hass.async_block_till_done()
for _ in range(4):
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
with session_scope(hass=hass) as session:
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
)
events_purge = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("excluded_event",)))
)
states = session.query(States)
assert events_keep.count() == 10
assert events_purge.count() == 0
assert states.count() == 3
assert (
session.query(States).filter(States.state_id == 61).first().old_state_id
is None
)
assert (
session.query(States).filter(States.state_id == 62).first().old_state_id
is None
)
assert (
session.query(States).filter(States.state_id == 63).first().old_state_id
== 62
) # should have been kept
async def test_purge_entities(hass: HomeAssistant, recorder_mock: Recorder) -> None:
"""Test purging of specific entities."""
async def _purge_entities(hass, entity_ids, domains, entity_globs):
service_data = {
"entity_id": entity_ids,
"domains": domains,
"entity_globs": entity_globs,
}
await hass.services.async_call(
RECORDER_DOMAIN, SERVICE_PURGE_ENTITIES, service_data
)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
def _add_purge_records(hass: HomeAssistant) -> None:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
_add_state_with_state_attributes(
session,
"sensor.purge_entity",
"purgeme",
timestamp,
event_id * days,
)
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(10000, 10020):
_add_state_with_state_attributes(
session,
"purge_domain.entity",
"purgeme",
timestamp,
event_id * days,
)
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(100000, 100020):
_add_state_with_state_attributes(
session,
"binary_sensor.purge_glob",
"purgeme",
timestamp,
event_id * days,
)
convert_pending_states_to_meta(recorder_mock, session)
convert_pending_events_to_event_types(recorder_mock, session)
def _add_keep_records(hass: HomeAssistant) -> None:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be kept
timestamp = dt_util.utcnow() - timedelta(days=2)
for event_id in range(200, 210):
_add_state_with_state_attributes(
session,
"sensor.keep",
"keep",
timestamp,
event_id,
)
convert_pending_states_to_meta(recorder_mock, session)
convert_pending_events_to_event_types(recorder_mock, session)
_add_purge_records(hass)
_add_keep_records(hass)
# Confirm standard service call
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 190
await _purge_entities(hass, "sensor.purge_entity", "purge_domain", "*purge_glob")
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 10
states_sensor_kept = (
session.query(States)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
.filter(StatesMeta.entity_id == "sensor.keep")
)
assert states_sensor_kept.count() == 10
_add_purge_records(hass)
# Confirm each parameter purges only the associated records
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 190
await _purge_entities(hass, "sensor.purge_entity", [], [])
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 130
await _purge_entities(hass, [], "purge_domain", [])
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 70
await _purge_entities(hass, [], [], "*purge_glob")
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 10
states_sensor_kept = (
session.query(States)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
.filter(StatesMeta.entity_id == "sensor.keep")
)
assert states_sensor_kept.count() == 10
# sensor.keep should remain in the StatesMeta table
states_meta_remain = session.query(StatesMeta).filter(
StatesMeta.entity_id == "sensor.keep"
)
assert states_meta_remain.count() == 1
# sensor.purge_entity should be removed from the StatesMeta table
states_meta_remain = session.query(StatesMeta).filter(
StatesMeta.entity_id == "sensor.purge_entity"
)
assert states_meta_remain.count() == 0
_add_purge_records(hass)
# Confirm calling service without arguments is invalid
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 190
with pytest.raises(MultipleInvalid):
await _purge_entities(hass, [], [], [])
with session_scope(hass=hass, read_only=True) as session:
states = session.query(States)
assert states.count() == 190
states_meta_remain = session.query(StatesMeta)
assert states_meta_remain.count() == 4
async def _add_test_states(hass: HomeAssistant, wait_recording_done: bool = True):
"""Add multiple states to the db for testing."""
utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
base_attributes = {"test_attr": 5, "test_attr_10": "nice"}
async def set_state(entity_id, state, **kwargs):
"""Set the state."""
hass.states.async_set(entity_id, state, **kwargs)
if wait_recording_done:
await hass.async_block_till_done()
await async_wait_recording_done(hass)
with freeze_time() as freezer:
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
state = f"autopurgeme_{event_id}"
attributes = {"autopurgeme": True, **base_attributes}
elif event_id < 4:
timestamp = five_days_ago
state = f"purgeme_{event_id}"
attributes = {"purgeme": True, **base_attributes}
else:
timestamp = utcnow
state = f"dontpurgeme_{event_id}"
attributes = {"dontpurgeme": True, **base_attributes}
freezer.move_to(timestamp)
await set_state("test.recorder2", state, attributes=attributes)
async def _add_test_events(hass: HomeAssistant, iterations: int = 1):
"""Add a few events for testing."""
utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
event_data = {"test_attr": 5, "test_attr_10": "nice"}
# Make sure recording is done before freezing time
# because the time freeze can affect the recorder
# thread as well can cause the test to fail
await async_wait_recording_done(hass)
with freeze_time() as freezer:
for _ in range(iterations):
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
event_type = "EVENT_TEST_AUTOPURGE"
elif event_id < 4:
timestamp = five_days_ago
event_type = "EVENT_TEST_PURGE"
else:
timestamp = utcnow
event_type = "EVENT_TEST"
freezer.move_to(timestamp)
hass.bus.async_fire(event_type, event_data)
await async_wait_recording_done(hass)
async def _add_test_statistics(hass: HomeAssistant):
"""Add multiple statistics to the db for testing."""
utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
with session_scope(hass=hass) as session:
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
state = "-11"
elif event_id < 4:
timestamp = five_days_ago
state = "-5"
else:
timestamp = utcnow
state = "0"
session.add(
StatisticsShortTerm(
start_ts=timestamp.timestamp(),
state=state,
)
)
async def _add_test_recorder_runs(hass: HomeAssistant):
"""Add a few recorder_runs for testing."""
utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
with session_scope(hass=hass) as session:
for rec_id in range(6):
if rec_id < 2:
timestamp = eleven_days_ago
elif rec_id < 4:
timestamp = five_days_ago
else:
timestamp = utcnow
session.add(
RecorderRuns(
start=timestamp,
created=dt_util.utcnow(),
end=timestamp + timedelta(days=1),
)
)
async def _add_test_statistics_runs(hass: HomeAssistant):
"""Add a few recorder_runs for testing."""
utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
with session_scope(hass=hass) as session:
for rec_id in range(6):
if rec_id < 2:
timestamp = eleven_days_ago
elif rec_id < 4:
timestamp = five_days_ago
else:
timestamp = utcnow
session.add(
StatisticsRuns(
start=timestamp,
)
)
def _add_state_without_event_linkage(
session: Session,
entity_id: str,
state: str,
timestamp: datetime,
):
state_attrs = StateAttributes(
hash=1234, shared_attrs=json.dumps({entity_id: entity_id})
)
session.add(state_attrs)
session.add(
States(
entity_id=entity_id,
state=state,
attributes=None,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
event_id=None,
state_attributes=state_attrs,
)
)
def _add_state_with_state_attributes(
session: Session,
entity_id: str,
state: str,
timestamp: datetime,
event_id: int,
) -> None:
"""Add state and state_changed event to database for testing."""
state_attrs = StateAttributes(
hash=event_id, shared_attrs=json.dumps({entity_id: entity_id})
)
session.add(state_attrs)
session.add(
States(
entity_id=entity_id,
state=state,
attributes=None,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
event_id=event_id,
state_attributes=state_attrs,
)
)
@pytest.mark.timeout(30)
async def test_purge_many_old_events(
hass: HomeAssistant, recorder_mock: Recorder
) -> None:
"""Test deleting old events."""
old_events_count = 5
with (
patch.object(recorder_mock, "max_bind_vars", old_events_count),
patch.object(recorder_mock.database_engine, "max_bind_vars", old_events_count),
):
await _add_test_events(hass, old_events_count)
with session_scope(hass=hass) as session:
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
assert events.count() == old_events_count * 6
purge_before = dt_util.utcnow() - timedelta(days=4)
# run purge_old_data()
finished = purge_old_data(
recorder_mock,
purge_before,
repack=False,
states_batch_size=3,
events_batch_size=3,
)
assert not finished
assert events.count() == old_events_count * 3
# we should only have 2 groups of events left
finished = purge_old_data(
recorder_mock,
purge_before,
repack=False,
states_batch_size=3,
events_batch_size=3,
)
assert finished
assert events.count() == old_events_count * 2
# we should now purge everything
finished = purge_old_data(
recorder_mock,
dt_util.utcnow(),
repack=False,
states_batch_size=20,
events_batch_size=20,
)
assert finished
assert events.count() == 0
async def test_purge_old_events_purges_the_event_type_ids(
hass: HomeAssistant, recorder_mock: Recorder
) -> None:
"""Test deleting old events purges event type ids."""
assert recorder_mock.event_type_manager.active is True
utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
far_past = utcnow - timedelta(days=1000)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
def _insert_events():
with session_scope(hass=hass) as session:
event_type_test_auto_purge = EventTypes(event_type="EVENT_TEST_AUTOPURGE")
event_type_test_purge = EventTypes(event_type="EVENT_TEST_PURGE")
event_type_test = EventTypes(event_type="EVENT_TEST")
event_type_unused = EventTypes(event_type="EVENT_TEST_UNUSED")
session.add_all(
(
event_type_test_auto_purge,
event_type_test_purge,
event_type_test,
event_type_unused,
)
)
session.flush()
for _ in range(5):
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
event_type = event_type_test_auto_purge
elif event_id < 4:
timestamp = five_days_ago
event_type = event_type_test_purge
else:
timestamp = utcnow
event_type = event_type_test
session.add(
Events(
event_type=None,
event_type_id=event_type.event_type_id,
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
return recorder_mock.event_type_manager.get_many(
[
"EVENT_TEST_AUTOPURGE",
"EVENT_TEST_PURGE",
"EVENT_TEST",
"EVENT_TEST_UNUSED",
],
session,
)
event_type_to_id = await recorder_mock.async_add_executor_job(_insert_events)
test_event_type_ids = event_type_to_id.values()
with session_scope(hass=hass) as session:
events = session.query(Events).where(
Events.event_type_id.in_(test_event_type_ids)
)
event_types = session.query(EventTypes).where(
EventTypes.event_type_id.in_(test_event_type_ids)
)
assert events.count() == 30
assert event_types.count() == 4
# run purge_old_data()
finished = purge_old_data(
recorder_mock,
far_past,
repack=False,
)
assert finished
assert events.count() == 30
# We should remove the unused event type
assert event_types.count() == 3
assert "EVENT_TEST_UNUSED" not in recorder_mock.event_type_manager._id_map
# we should only have 10 events left since
# only one event type was recorded now
finished = purge_old_data(
recorder_mock,
utcnow,
repack=False,
)
assert finished
assert events.count() == 10
assert event_types.count() == 1
# Purge everything
finished = purge_old_data(
recorder_mock,
utcnow + timedelta(seconds=1),
repack=False,
)
assert finished
assert events.count() == 0
assert event_types.count() == 0
async def test_purge_old_states_purges_the_state_metadata_ids(
hass: HomeAssistant, recorder_mock: Recorder
) -> None:
"""Test deleting old states purges state metadata_ids."""
assert recorder_mock.states_meta_manager.active is True
utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
far_past = utcnow - timedelta(days=1000)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
def _insert_states():
with session_scope(hass=hass) as session:
states_meta_sensor_one = StatesMeta(entity_id="sensor.one")
states_meta_sensor_two = StatesMeta(entity_id="sensor.two")
states_meta_sensor_three = StatesMeta(entity_id="sensor.three")
states_meta_sensor_unused = StatesMeta(entity_id="sensor.unused")
session.add_all(
(
states_meta_sensor_one,
states_meta_sensor_two,
states_meta_sensor_three,
states_meta_sensor_unused,
)
)
session.flush()
for _ in range(5):
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
metadata_id = states_meta_sensor_one.metadata_id
elif event_id < 4:
timestamp = five_days_ago
metadata_id = states_meta_sensor_two.metadata_id
else:
timestamp = utcnow
metadata_id = states_meta_sensor_three.metadata_id
session.add(
States(
metadata_id=metadata_id,
state="any",
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
)
)
return recorder_mock.states_meta_manager.get_many(
["sensor.one", "sensor.two", "sensor.three", "sensor.unused"],
session,
True,
)
entity_id_to_metadata_id = await recorder_mock.async_add_executor_job(
_insert_states
)
test_metadata_ids = entity_id_to_metadata_id.values()
with session_scope(hass=hass) as session:
states = session.query(States).where(States.metadata_id.in_(test_metadata_ids))
states_meta = session.query(StatesMeta).where(
StatesMeta.metadata_id.in_(test_metadata_ids)
)
assert states.count() == 30
assert states_meta.count() == 4
# run purge_old_data()
finished = purge_old_data(
recorder_mock,
far_past,
repack=False,
)
assert finished
assert states.count() == 30
# We should remove the unused entity_id
assert states_meta.count() == 3
assert "sensor.unused" not in recorder_mock.event_type_manager._id_map
# we should only have 10 states left since
# only one event type was recorded now
finished = purge_old_data(
recorder_mock,
utcnow,
repack=False,
)
assert finished
assert states.count() == 10
assert states_meta.count() == 1
# Purge everything
finished = purge_old_data(
recorder_mock,
utcnow + timedelta(seconds=1),
repack=False,
)
assert finished
assert states.count() == 0
assert states_meta.count() == 0
async def test_purge_entities_keep_days(
hass: HomeAssistant, recorder_mock: Recorder
) -> None:
"""Test purging states with an entity filter and keep_days."""
await hass.async_block_till_done()
await async_wait_recording_done(hass)
start = dt_util.utcnow()
two_days_ago = start - timedelta(days=2)
one_week_ago = start - timedelta(days=7)
one_month_ago = start - timedelta(days=30)
with freeze_time(one_week_ago):
hass.states.async_set("sensor.keep", "initial")
hass.states.async_set("sensor.purge", "initial")
await async_wait_recording_done(hass)
with freeze_time(two_days_ago):
hass.states.async_set("sensor.purge", "two_days_ago")
await async_wait_recording_done(hass)
hass.states.async_set("sensor.purge", "now")
hass.states.async_set("sensor.keep", "now")
await async_recorder_block_till_done(hass)
states = await recorder_mock.async_add_executor_job(
get_significant_states,
hass,
one_month_ago,
None,
["sensor.keep", "sensor.purge"],
)
assert len(states["sensor.keep"]) == 2
assert len(states["sensor.purge"]) == 3
await hass.services.async_call(
RECORDER_DOMAIN,
SERVICE_PURGE_ENTITIES,
{
"entity_id": "sensor.purge",
"keep_days": 1,
},
)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
states = await recorder_mock.async_add_executor_job(
get_significant_states,
hass,
one_month_ago,
None,
["sensor.keep", "sensor.purge"],
)
assert len(states["sensor.keep"]) == 2
assert len(states["sensor.purge"]) == 1
await hass.services.async_call(
RECORDER_DOMAIN,
SERVICE_PURGE_ENTITIES,
{
"entity_id": "sensor.purge",
},
)
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
states = await recorder_mock.async_add_executor_job(
get_significant_states,
hass,
one_month_ago,
None,
["sensor.keep", "sensor.purge"],
)
assert len(states["sensor.keep"]) == 2
assert "sensor.purge" not in states