* Fix cpu thrashing during purge after all legacy events were removed We now remove the the index of of event ids on the states table when its all NULLs to save space. The purge path needs to avoid checking for legacy rows to purge if the index has been removed since it will result in a full table scan each purge cycle that will always find no legacy rows to purge * one more place * drop the key constraint as well * fixes * more sqlite
2104 lines
74 KiB
Python
2104 lines
74 KiB
Python
"""Test data purging."""
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
import sqlite3
|
|
from unittest.mock import patch
|
|
|
|
from freezegun import freeze_time
|
|
import pytest
|
|
from sqlalchemy.exc import DatabaseError, OperationalError
|
|
from sqlalchemy.orm.session import Session
|
|
|
|
from homeassistant.components import recorder
|
|
from homeassistant.components.recorder import Recorder, migration
|
|
from homeassistant.components.recorder.const import (
|
|
SQLITE_MAX_BIND_VARS,
|
|
SupportedDialect,
|
|
)
|
|
from homeassistant.components.recorder.db_schema import (
|
|
EventData,
|
|
Events,
|
|
EventTypes,
|
|
RecorderRuns,
|
|
StateAttributes,
|
|
States,
|
|
StatesMeta,
|
|
StatisticsRuns,
|
|
StatisticsShortTerm,
|
|
)
|
|
from homeassistant.components.recorder.history import get_significant_states
|
|
from homeassistant.components.recorder.purge import purge_old_data
|
|
from homeassistant.components.recorder.queries import select_event_type_ids
|
|
from homeassistant.components.recorder.services import (
|
|
SERVICE_PURGE,
|
|
SERVICE_PURGE_ENTITIES,
|
|
)
|
|
from homeassistant.components.recorder.tasks import PurgeTask
|
|
from homeassistant.components.recorder.util import session_scope
|
|
from homeassistant.const import EVENT_STATE_CHANGED, EVENT_THEMES_UPDATED, STATE_ON
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.json import json_dumps
|
|
from homeassistant.helpers.typing import ConfigType
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .common import (
|
|
async_recorder_block_till_done,
|
|
async_wait_purge_done,
|
|
async_wait_recording_done,
|
|
)
|
|
|
|
from tests.typing import RecorderInstanceGenerator
|
|
|
|
|
|
@pytest.fixture(name="use_sqlite")
|
|
def mock_use_sqlite(request):
|
|
"""Pytest fixture to switch purge method."""
|
|
with patch(
|
|
"homeassistant.components.recorder.core.Recorder.dialect_name",
|
|
return_value=SupportedDialect.SQLITE
|
|
if request.param
|
|
else SupportedDialect.MYSQL,
|
|
):
|
|
yield
|
|
|
|
|
|
async def test_purge_old_states(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
|
) -> None:
|
|
"""Test deleting old states."""
|
|
instance = await async_setup_recorder_instance(hass)
|
|
|
|
await _add_test_states(hass)
|
|
|
|
# make sure we start with 6 states
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
|
|
assert states.count() == 6
|
|
assert states[0].old_state_id is None
|
|
assert states[5].old_state_id == states[4].state_id
|
|
assert state_attributes.count() == 3
|
|
|
|
events = session.query(Events).filter(Events.event_type == "state_changed")
|
|
assert events.count() == 0
|
|
assert "test.recorder2" in instance.states_manager._last_committed_id
|
|
|
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(
|
|
instance,
|
|
purge_before,
|
|
states_batch_size=1,
|
|
events_batch_size=1,
|
|
repack=False,
|
|
)
|
|
assert not finished
|
|
assert states.count() == 2
|
|
assert state_attributes.count() == 1
|
|
|
|
assert "test.recorder2" in instance.states_manager._last_committed_id
|
|
|
|
states_after_purge = list(session.query(States))
|
|
# Since these states are deleted in batches, we can't guarantee the order
|
|
# but we can look them up by state
|
|
state_map_by_state = {state.state: state for state in states_after_purge}
|
|
dontpurgeme_5 = state_map_by_state["dontpurgeme_5"]
|
|
dontpurgeme_4 = state_map_by_state["dontpurgeme_4"]
|
|
|
|
assert dontpurgeme_5.old_state_id == dontpurgeme_4.state_id
|
|
assert dontpurgeme_4.old_state_id is None
|
|
|
|
finished = purge_old_data(instance, purge_before, repack=False)
|
|
assert finished
|
|
assert states.count() == 2
|
|
assert state_attributes.count() == 1
|
|
|
|
assert "test.recorder2" in instance.states_manager._last_committed_id
|
|
|
|
# run purge_old_data again
|
|
purge_before = dt_util.utcnow()
|
|
finished = purge_old_data(
|
|
instance,
|
|
purge_before,
|
|
states_batch_size=1,
|
|
events_batch_size=1,
|
|
repack=False,
|
|
)
|
|
assert not finished
|
|
assert states.count() == 0
|
|
assert state_attributes.count() == 0
|
|
|
|
assert "test.recorder2" not in instance.states_manager._last_committed_id
|
|
|
|
# Add some more states
|
|
await _add_test_states(hass)
|
|
|
|
# make sure we start with 6 states
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 6
|
|
assert states[0].old_state_id is None
|
|
assert states[5].old_state_id == states[4].state_id
|
|
|
|
events = session.query(Events).filter(Events.event_type == "state_changed")
|
|
assert events.count() == 0
|
|
assert "test.recorder2" in instance.states_manager._last_committed_id
|
|
|
|
state_attributes = session.query(StateAttributes)
|
|
assert state_attributes.count() == 3
|
|
|
|
|
|
async def test_purge_old_states_encouters_database_corruption(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
|
hass: HomeAssistant,
|
|
recorder_db_url: str,
|
|
) -> None:
|
|
"""Test database image image is malformed while deleting old states."""
|
|
if recorder_db_url.startswith(("mysql://", "postgresql://")):
|
|
# This test is specific for SQLite, wiping the database on error only happens
|
|
# with SQLite.
|
|
return
|
|
|
|
await async_setup_recorder_instance(hass)
|
|
|
|
await _add_test_states(hass)
|
|
await async_wait_recording_done(hass)
|
|
|
|
sqlite3_exception = DatabaseError("statement", {}, [])
|
|
sqlite3_exception.__cause__ = sqlite3.DatabaseError()
|
|
|
|
with patch(
|
|
"homeassistant.components.recorder.core.move_away_broken_database"
|
|
) as move_away, patch(
|
|
"homeassistant.components.recorder.purge.purge_old_data",
|
|
side_effect=sqlite3_exception,
|
|
):
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, {"keep_days": 0})
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
assert move_away.called
|
|
|
|
# Ensure the whole database was reset due to the database error
|
|
with session_scope(hass=hass) as session:
|
|
states_after_purge = session.query(States)
|
|
assert states_after_purge.count() == 0
|
|
|
|
|
|
async def test_purge_old_states_encounters_temporary_mysql_error(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
|
hass: HomeAssistant,
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""Test retry on specific mysql operational errors."""
|
|
instance = await async_setup_recorder_instance(hass)
|
|
|
|
await _add_test_states(hass)
|
|
await async_wait_recording_done(hass)
|
|
|
|
mysql_exception = OperationalError("statement", {}, [])
|
|
mysql_exception.orig = Exception(1205, "retryable")
|
|
|
|
with patch(
|
|
"homeassistant.components.recorder.util.time.sleep"
|
|
) as sleep_mock, patch(
|
|
"homeassistant.components.recorder.purge._purge_old_recorder_runs",
|
|
side_effect=[mysql_exception, None],
|
|
), patch.object(
|
|
instance.engine.dialect, "name", "mysql"
|
|
):
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, {"keep_days": 0})
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
await async_wait_recording_done(hass)
|
|
|
|
assert "retrying" in caplog.text
|
|
assert sleep_mock.called
|
|
|
|
|
|
async def test_purge_old_states_encounters_operational_error(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
|
hass: HomeAssistant,
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""Test error on operational errors that are not mysql does not retry."""
|
|
await async_setup_recorder_instance(hass)
|
|
|
|
await _add_test_states(hass)
|
|
await async_wait_recording_done(hass)
|
|
|
|
exception = OperationalError("statement", {}, [])
|
|
|
|
with patch(
|
|
"homeassistant.components.recorder.purge._purge_old_recorder_runs",
|
|
side_effect=exception,
|
|
):
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, {"keep_days": 0})
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
await async_wait_recording_done(hass)
|
|
|
|
assert "retrying" not in caplog.text
|
|
assert "Error executing purge" in caplog.text
|
|
|
|
|
|
async def test_purge_old_events(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
|
) -> None:
|
|
"""Test deleting old events."""
|
|
instance = await async_setup_recorder_instance(hass)
|
|
|
|
await _add_test_events(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
|
|
assert events.count() == 6
|
|
|
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(
|
|
instance,
|
|
purge_before,
|
|
repack=False,
|
|
events_batch_size=1,
|
|
states_batch_size=1,
|
|
)
|
|
assert not finished
|
|
assert events.count() == 2
|
|
|
|
# we should only have 2 events left
|
|
finished = purge_old_data(
|
|
instance,
|
|
purge_before,
|
|
repack=False,
|
|
events_batch_size=1,
|
|
states_batch_size=1,
|
|
)
|
|
assert finished
|
|
assert events.count() == 2
|
|
|
|
|
|
async def test_purge_old_recorder_runs(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
|
) -> None:
|
|
"""Test deleting old recorder runs keeps current run."""
|
|
instance = await async_setup_recorder_instance(hass)
|
|
|
|
await _add_test_recorder_runs(hass)
|
|
|
|
# make sure we start with 7 recorder runs
|
|
with session_scope(hass=hass) as session:
|
|
recorder_runs = session.query(RecorderRuns)
|
|
assert recorder_runs.count() == 7
|
|
|
|
purge_before = dt_util.utcnow()
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(
|
|
instance,
|
|
purge_before,
|
|
repack=False,
|
|
events_batch_size=1,
|
|
states_batch_size=1,
|
|
)
|
|
assert not finished
|
|
|
|
finished = purge_old_data(
|
|
instance,
|
|
purge_before,
|
|
repack=False,
|
|
events_batch_size=1,
|
|
states_batch_size=1,
|
|
)
|
|
assert finished
|
|
assert recorder_runs.count() == 1
|
|
|
|
|
|
async def test_purge_old_statistics_runs(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
|
) -> None:
|
|
"""Test deleting old statistics runs keeps the latest run."""
|
|
instance = await async_setup_recorder_instance(hass)
|
|
|
|
await _add_test_statistics_runs(hass)
|
|
|
|
# make sure we start with 7 statistics runs
|
|
with session_scope(hass=hass) as session:
|
|
statistics_runs = session.query(StatisticsRuns)
|
|
assert statistics_runs.count() == 7
|
|
|
|
purge_before = dt_util.utcnow()
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(instance, purge_before, repack=False)
|
|
assert not finished
|
|
|
|
finished = purge_old_data(instance, purge_before, repack=False)
|
|
assert finished
|
|
assert statistics_runs.count() == 1
|
|
|
|
|
|
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
|
|
async def test_purge_method(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
|
hass: HomeAssistant,
|
|
caplog: pytest.LogCaptureFixture,
|
|
use_sqlite: bool,
|
|
) -> None:
|
|
"""Test purge method."""
|
|
|
|
def assert_recorder_runs_equal(run1, run2):
|
|
assert run1.run_id == run2.run_id
|
|
assert run1.start == run2.start
|
|
assert run1.end == run2.end
|
|
assert run1.closed_incorrect == run2.closed_incorrect
|
|
assert run1.created == run2.created
|
|
|
|
def assert_statistic_runs_equal(run1, run2):
|
|
assert run1.run_id == run2.run_id
|
|
assert run1.start == run2.start
|
|
|
|
await async_setup_recorder_instance(hass)
|
|
|
|
service_data = {"keep_days": 4}
|
|
await _add_test_events(hass)
|
|
await _add_test_states(hass)
|
|
await _add_test_statistics(hass)
|
|
await _add_test_recorder_runs(hass)
|
|
await _add_test_statistics_runs(hass)
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
# make sure we start with 6 states
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 6
|
|
|
|
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
|
|
assert events.count() == 6
|
|
|
|
statistics = session.query(StatisticsShortTerm)
|
|
assert statistics.count() == 6
|
|
|
|
recorder_runs = session.query(RecorderRuns)
|
|
assert recorder_runs.count() == 7
|
|
runs_before_purge = recorder_runs.all()
|
|
|
|
statistics_runs = session.query(StatisticsRuns).order_by(StatisticsRuns.run_id)
|
|
assert statistics_runs.count() == 7
|
|
statistic_runs_before_purge = statistics_runs.all()
|
|
|
|
for itm in runs_before_purge:
|
|
session.expunge(itm)
|
|
for itm in statistic_runs_before_purge:
|
|
session.expunge(itm)
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_purge_done(hass)
|
|
|
|
# run purge method - no service data, use defaults
|
|
await hass.services.async_call("recorder", "purge")
|
|
await hass.async_block_till_done()
|
|
|
|
# Small wait for recorder thread
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
|
|
statistics = session.query(StatisticsShortTerm)
|
|
|
|
# only purged old states, events and statistics
|
|
assert states.count() == 4
|
|
assert events.count() == 4
|
|
assert statistics.count() == 4
|
|
|
|
# run purge method - correct service data
|
|
await hass.services.async_call("recorder", "purge", service_data=service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
# Small wait for recorder thread
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
|
|
statistics = session.query(StatisticsShortTerm)
|
|
recorder_runs = session.query(RecorderRuns)
|
|
statistics_runs = session.query(StatisticsRuns)
|
|
|
|
# we should only have 2 states, events and statistics left after purging
|
|
assert states.count() == 2
|
|
assert events.count() == 2
|
|
assert statistics.count() == 2
|
|
|
|
# now we should only have 3 recorder runs left
|
|
runs = recorder_runs.all()
|
|
assert_recorder_runs_equal(runs[0], runs_before_purge[0])
|
|
assert_recorder_runs_equal(runs[1], runs_before_purge[5])
|
|
assert_recorder_runs_equal(runs[2], runs_before_purge[6])
|
|
|
|
# now we should only have 3 statistics runs left
|
|
runs = statistics_runs.all()
|
|
assert_statistic_runs_equal(runs[0], statistic_runs_before_purge[0])
|
|
assert_statistic_runs_equal(runs[1], statistic_runs_before_purge[5])
|
|
assert_statistic_runs_equal(runs[2], statistic_runs_before_purge[6])
|
|
|
|
assert "EVENT_TEST_PURGE" not in (event.event_type for event in events.all())
|
|
|
|
# run purge method - correct service data, with repack
|
|
service_data["repack"] = True
|
|
await hass.services.async_call("recorder", "purge", service_data=service_data)
|
|
await hass.async_block_till_done()
|
|
await async_wait_purge_done(hass)
|
|
assert (
|
|
"Vacuuming SQL DB to free space" in caplog.text
|
|
or "Optimizing SQL DB to free space" in caplog.text
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
|
|
async def test_purge_edge_case(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
|
hass: HomeAssistant,
|
|
use_sqlite: bool,
|
|
) -> None:
|
|
"""Test states and events are purged even if they occurred shortly before purge_before."""
|
|
|
|
async def _add_db_entries(hass: HomeAssistant, timestamp: datetime) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
session.add(
|
|
Events(
|
|
event_id=1001,
|
|
event_type="EVENT_TEST_PURGE",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
session.add(
|
|
States(
|
|
entity_id="test.recorder2",
|
|
state="purgeme",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
event_id=1001,
|
|
attributes_id=1002,
|
|
)
|
|
)
|
|
session.add(
|
|
StateAttributes(
|
|
shared_attrs="{}",
|
|
hash=1234,
|
|
attributes_id=1002,
|
|
)
|
|
)
|
|
|
|
await async_setup_recorder_instance(hass, None)
|
|
await async_wait_purge_done(hass)
|
|
|
|
service_data = {"keep_days": 2}
|
|
timestamp = dt_util.utcnow() - timedelta(days=2, minutes=1)
|
|
|
|
await _add_db_entries(hass, timestamp)
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 1
|
|
|
|
state_attributes = session.query(StateAttributes)
|
|
assert state_attributes.count() == 1
|
|
|
|
events = session.query(Events).filter(Events.event_type == "EVENT_TEST_PURGE")
|
|
assert events.count() == 1
|
|
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 0
|
|
events = session.query(Events).filter(Events.event_type == "EVENT_TEST_PURGE")
|
|
assert events.count() == 0
|
|
|
|
|
|
async def test_purge_cutoff_date(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
|
hass: HomeAssistant,
|
|
) -> None:
|
|
"""Test states and events are purged only if they occurred before "now() - keep_days"."""
|
|
|
|
async def _add_db_entries(hass: HomeAssistant, cutoff: datetime, rows: int) -> None:
|
|
timestamp_keep = cutoff
|
|
timestamp_purge = cutoff - timedelta(microseconds=1)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
session.add(
|
|
Events(
|
|
event_id=1000,
|
|
event_type="KEEP",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp_keep),
|
|
)
|
|
)
|
|
session.add(
|
|
States(
|
|
entity_id="test.cutoff",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp_keep),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp_keep),
|
|
event_id=1000,
|
|
attributes_id=1000,
|
|
)
|
|
)
|
|
session.add(
|
|
StateAttributes(
|
|
shared_attrs="{}",
|
|
hash=1234,
|
|
attributes_id=1000,
|
|
)
|
|
)
|
|
for row in range(1, rows):
|
|
session.add(
|
|
Events(
|
|
event_id=1000 + row,
|
|
event_type="PURGE",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp_purge),
|
|
)
|
|
)
|
|
session.add(
|
|
States(
|
|
entity_id="test.cutoff",
|
|
state="purge",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp_purge),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp_purge),
|
|
event_id=1000 + row,
|
|
attributes_id=1000 + row,
|
|
)
|
|
)
|
|
session.add(
|
|
StateAttributes(
|
|
shared_attrs="{}",
|
|
hash=1234,
|
|
attributes_id=1000 + row,
|
|
)
|
|
)
|
|
|
|
instance = await async_setup_recorder_instance(hass, None)
|
|
await async_wait_purge_done(hass)
|
|
|
|
service_data = {"keep_days": 2}
|
|
|
|
# Force multiple purge batches to be run
|
|
rows = SQLITE_MAX_BIND_VARS + 1
|
|
cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"])
|
|
await _add_db_entries(hass, cutoff, rows)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
events = session.query(Events)
|
|
assert states.filter(States.state == "purge").count() == rows - 1
|
|
assert states.filter(States.state == "keep").count() == 1
|
|
assert (
|
|
state_attributes.outerjoin(
|
|
States, StateAttributes.attributes_id == States.attributes_id
|
|
)
|
|
.filter(States.state == "keep")
|
|
.count()
|
|
== 1
|
|
)
|
|
assert events.filter(Events.event_type == "PURGE").count() == rows - 1
|
|
assert events.filter(Events.event_type == "KEEP").count() == 1
|
|
|
|
instance.queue_task(PurgeTask(cutoff, repack=False, apply_filter=False))
|
|
await hass.async_block_till_done()
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
events = session.query(Events)
|
|
assert states.filter(States.state == "purge").count() == 0
|
|
assert (
|
|
state_attributes.outerjoin(
|
|
States, StateAttributes.attributes_id == States.attributes_id
|
|
)
|
|
.filter(States.state == "purge")
|
|
.count()
|
|
== 0
|
|
)
|
|
assert states.filter(States.state == "keep").count() == 1
|
|
assert (
|
|
state_attributes.outerjoin(
|
|
States, StateAttributes.attributes_id == States.attributes_id
|
|
)
|
|
.filter(States.state == "keep")
|
|
.count()
|
|
== 1
|
|
)
|
|
assert events.filter(Events.event_type == "PURGE").count() == 0
|
|
assert events.filter(Events.event_type == "KEEP").count() == 1
|
|
|
|
# Make sure we can purge everything
|
|
instance.queue_task(PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False))
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 0
|
|
assert state_attributes.count() == 0
|
|
|
|
# Make sure we can purge everything when the db is already empty
|
|
instance.queue_task(PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False))
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 0
|
|
assert state_attributes.count() == 0
|
|
|
|
|
|
def _convert_pending_states_to_meta(instance: Recorder, session: Session) -> None:
|
|
"""Convert pending states to use states_metadata."""
|
|
entity_ids: set[str] = set()
|
|
states: set[States] = set()
|
|
states_meta_objects: dict[str, StatesMeta] = {}
|
|
for object in session:
|
|
if isinstance(object, States):
|
|
entity_ids.add(object.entity_id)
|
|
states.add(object)
|
|
|
|
entity_id_to_metadata_ids = instance.states_meta_manager.get_many(
|
|
entity_ids, session, True
|
|
)
|
|
|
|
for state in states:
|
|
entity_id = state.entity_id
|
|
state.entity_id = None
|
|
if metadata_id := entity_id_to_metadata_ids.get(entity_id):
|
|
state.metadata_id = metadata_id
|
|
continue
|
|
if entity_id not in states_meta_objects:
|
|
states_meta_objects[entity_id] = StatesMeta(entity_id=entity_id)
|
|
state.states_meta_rel = states_meta_objects[entity_id]
|
|
|
|
|
|
def _convert_pending_events_to_event_types(
|
|
instance: Recorder, session: Session
|
|
) -> None:
|
|
"""Convert pending events to use event_type_ids."""
|
|
event_types: set[str] = set()
|
|
events: set[Events] = set()
|
|
event_types_objects: dict[str, EventTypes] = {}
|
|
for object in session:
|
|
if isinstance(object, Events):
|
|
event_types.add(object.event_type)
|
|
events.add(object)
|
|
|
|
event_type_to_event_type_ids = instance.event_type_manager.get_many(
|
|
event_types, session
|
|
)
|
|
|
|
for event in events:
|
|
event_type = event.event_type
|
|
event.event_type = None
|
|
if event_type_id := event_type_to_event_type_ids.get(event_type):
|
|
event.event_type_id = event_type_id
|
|
continue
|
|
if event_type not in event_types_objects:
|
|
event_types_objects[event_type] = EventTypes(event_type=event_type)
|
|
event.event_type_rel = event_types_objects[event_type]
|
|
|
|
|
|
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
|
|
async def test_purge_filtered_states(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
|
hass: HomeAssistant,
|
|
use_sqlite: bool,
|
|
) -> None:
|
|
"""Test filtered states are purged."""
|
|
config: ConfigType = {"exclude": {"entities": ["sensor.excluded"]}}
|
|
instance = await async_setup_recorder_instance(hass, config)
|
|
assert instance.entity_filter("sensor.excluded") is False
|
|
|
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be purged
|
|
for days in range(1, 4):
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(1000, 1020):
|
|
_add_state_and_state_changed_event(
|
|
session,
|
|
"sensor.excluded",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
# Add state **without** state_changed event that should be purged
|
|
timestamp = dt_util.utcnow() - timedelta(days=1)
|
|
session.add(
|
|
States(
|
|
entity_id="sensor.excluded",
|
|
state="purgeme",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
# Add states and state_changed events that should be keeped
|
|
timestamp = dt_util.utcnow() - timedelta(days=2)
|
|
for event_id in range(200, 210):
|
|
_add_state_and_state_changed_event(
|
|
session,
|
|
"sensor.keep",
|
|
"keep",
|
|
timestamp,
|
|
event_id,
|
|
)
|
|
# Add states with linked old_state_ids that need to be handled
|
|
timestamp = dt_util.utcnow() - timedelta(days=0)
|
|
state_attrs = StateAttributes(
|
|
hash=0,
|
|
shared_attrs=json.dumps(
|
|
{"sensor.linked_old_state_id": "sensor.linked_old_state_id"}
|
|
),
|
|
)
|
|
state_1 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=1,
|
|
state_attributes=state_attrs,
|
|
)
|
|
timestamp = dt_util.utcnow() - timedelta(days=4)
|
|
state_2 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=2,
|
|
state_attributes=state_attrs,
|
|
)
|
|
state_3 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=62, # keep
|
|
state_attributes=state_attrs,
|
|
)
|
|
session.add_all((state_attrs, state_1, state_2, state_3))
|
|
# Add event that should be keeped
|
|
session.add(
|
|
Events(
|
|
event_id=100,
|
|
event_type="EVENT_KEEP",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
_convert_pending_states_to_meta(instance, session)
|
|
|
|
service_data = {"keep_days": 10}
|
|
_add_db_entries(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 74
|
|
|
|
events_state_changed = session.query(Events).filter(
|
|
Events.event_type == EVENT_STATE_CHANGED
|
|
)
|
|
events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP")
|
|
assert events_state_changed.count() == 70
|
|
assert events_keep.count() == 1
|
|
|
|
# Normal purge doesn't remove excluded entities
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 74
|
|
events_state_changed = session.query(Events).filter(
|
|
Events.event_type == EVENT_STATE_CHANGED
|
|
)
|
|
assert events_state_changed.count() == 70
|
|
events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP")
|
|
assert events_keep.count() == 1
|
|
|
|
# Test with 'apply_filter' = True
|
|
service_data["apply_filter"] = True
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 13
|
|
events_state_changed = session.query(Events).filter(
|
|
Events.event_type == EVENT_STATE_CHANGED
|
|
)
|
|
assert events_state_changed.count() == 10
|
|
events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP")
|
|
assert events_keep.count() == 1
|
|
|
|
states_sensor_excluded = (
|
|
session.query(States)
|
|
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
|
|
.filter(StatesMeta.entity_id == "sensor.excluded")
|
|
)
|
|
assert states_sensor_excluded.count() == 0
|
|
|
|
assert (
|
|
session.query(States).filter(States.state_id == 72).first().old_state_id
|
|
is None
|
|
)
|
|
assert (
|
|
session.query(States).filter(States.state_id == 72).first().attributes_id
|
|
== 71
|
|
)
|
|
assert (
|
|
session.query(States).filter(States.state_id == 73).first().old_state_id
|
|
is None
|
|
)
|
|
assert (
|
|
session.query(States).filter(States.state_id == 73).first().attributes_id
|
|
== 71
|
|
)
|
|
|
|
final_keep_state = session.query(States).filter(States.state_id == 74).first()
|
|
assert final_keep_state.old_state_id == 62 # should have been kept
|
|
assert final_keep_state.attributes_id == 71
|
|
|
|
assert session.query(StateAttributes).count() == 11
|
|
|
|
# Do it again to make sure nothing changes
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
final_keep_state = session.query(States).filter(States.state_id == 74).first()
|
|
assert final_keep_state.old_state_id == 62 # should have been kept
|
|
assert final_keep_state.attributes_id == 71
|
|
|
|
assert session.query(StateAttributes).count() == 11
|
|
|
|
service_data = {"keep_days": 0}
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
remaining = list(session.query(States))
|
|
for state in remaining:
|
|
assert state.event_id is None
|
|
assert len(remaining) == 0
|
|
assert session.query(StateAttributes).count() == 0
|
|
|
|
|
|
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
|
|
async def test_purge_filtered_states_to_empty(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
|
hass: HomeAssistant,
|
|
use_sqlite: bool,
|
|
) -> None:
|
|
"""Test filtered states are purged all the way to an empty db."""
|
|
config: ConfigType = {"exclude": {"entities": ["sensor.excluded"]}}
|
|
instance = await async_setup_recorder_instance(hass, config)
|
|
assert instance.entity_filter("sensor.excluded") is False
|
|
|
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be purged
|
|
for days in range(1, 4):
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(1000, 1020):
|
|
_add_state_and_state_changed_event(
|
|
session,
|
|
"sensor.excluded",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
_convert_pending_states_to_meta(instance, session)
|
|
|
|
service_data = {"keep_days": 10}
|
|
_add_db_entries(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 60
|
|
assert state_attributes.count() == 60
|
|
|
|
# Test with 'apply_filter' = True
|
|
service_data["apply_filter"] = True
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 0
|
|
assert state_attributes.count() == 0
|
|
|
|
# Do it again to make sure nothing changes
|
|
# Why do we do this? Should we check the end result?
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
|
|
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
|
|
async def test_purge_without_state_attributes_filtered_states_to_empty(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
|
hass: HomeAssistant,
|
|
use_sqlite: bool,
|
|
) -> None:
|
|
"""Test filtered legacy states without state attributes are purged all the way to an empty db."""
|
|
config: ConfigType = {"exclude": {"entities": ["sensor.old_format"]}}
|
|
instance = await async_setup_recorder_instance(hass, config)
|
|
assert instance.entity_filter("sensor.old_format") is False
|
|
|
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be purged
|
|
# in the legacy format
|
|
timestamp = dt_util.utcnow() - timedelta(days=5)
|
|
event_id = 1021
|
|
session.add(
|
|
States(
|
|
entity_id="sensor.old_format",
|
|
state=STATE_ON,
|
|
attributes=json.dumps({"old": "not_using_state_attributes"}),
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
event_id=event_id,
|
|
state_attributes=None,
|
|
)
|
|
)
|
|
session.add(
|
|
Events(
|
|
event_id=event_id,
|
|
event_type=EVENT_STATE_CHANGED,
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
session.add(
|
|
Events(
|
|
event_id=event_id + 1,
|
|
event_type=EVENT_THEMES_UPDATED,
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
_convert_pending_states_to_meta(instance, session)
|
|
|
|
service_data = {"keep_days": 10}
|
|
_add_db_entries(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 1
|
|
assert state_attributes.count() == 0
|
|
|
|
# Test with 'apply_filter' = True
|
|
service_data["apply_filter"] = True
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
state_attributes = session.query(StateAttributes)
|
|
assert states.count() == 0
|
|
assert state_attributes.count() == 0
|
|
|
|
# Do it again to make sure nothing changes
|
|
# Why do we do this? Should we check the end result?
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
|
|
async def test_purge_filtered_events(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
|
hass: HomeAssistant,
|
|
) -> None:
|
|
"""Test filtered events are purged."""
|
|
config: ConfigType = {"exclude": {"event_types": ["EVENT_PURGE"]}}
|
|
instance = await async_setup_recorder_instance(hass, config)
|
|
|
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add events that should be purged
|
|
for days in range(1, 4):
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(1000, 1020):
|
|
session.add(
|
|
Events(
|
|
event_id=event_id * days,
|
|
event_type="EVENT_PURGE",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
|
|
# Add states and state_changed events that should be keeped
|
|
timestamp = dt_util.utcnow() - timedelta(days=1)
|
|
for event_id in range(200, 210):
|
|
_add_state_and_state_changed_event(
|
|
session,
|
|
"sensor.keep",
|
|
"keep",
|
|
timestamp,
|
|
event_id,
|
|
)
|
|
_convert_pending_events_to_event_types(instance, session)
|
|
|
|
service_data = {"keep_days": 10}
|
|
_add_db_entries(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events_purge = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
|
|
)
|
|
events_keep = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids((EVENT_STATE_CHANGED,)))
|
|
)
|
|
states = session.query(States)
|
|
|
|
assert events_purge.count() == 60
|
|
assert events_keep.count() == 10
|
|
assert states.count() == 10
|
|
|
|
# Normal purge doesn't remove excluded events
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events_purge = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
|
|
)
|
|
events_keep = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids((EVENT_STATE_CHANGED,)))
|
|
)
|
|
states = session.query(States)
|
|
assert events_purge.count() == 60
|
|
assert events_keep.count() == 10
|
|
assert states.count() == 10
|
|
|
|
# Test with 'apply_filter' = True
|
|
service_data["apply_filter"] = True
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events_purge = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
|
|
)
|
|
events_keep = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids((EVENT_STATE_CHANGED,)))
|
|
)
|
|
states = session.query(States)
|
|
assert events_purge.count() == 0
|
|
assert events_keep.count() == 10
|
|
assert states.count() == 10
|
|
|
|
|
|
async def test_purge_filtered_events_state_changed(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
|
hass: HomeAssistant,
|
|
) -> None:
|
|
"""Test filtered state_changed events are purged. This should also remove all states."""
|
|
config: ConfigType = {
|
|
"exclude": {
|
|
"event_types": ["excluded_event"],
|
|
"entities": ["sensor.excluded", "sensor.old_format"],
|
|
}
|
|
}
|
|
instance = await async_setup_recorder_instance(hass, config)
|
|
# Assert entity_id is NOT excluded
|
|
assert instance.entity_filter("sensor.excluded") is False
|
|
assert instance.entity_filter("sensor.old_format") is False
|
|
assert instance.entity_filter("sensor.keep") is True
|
|
assert "excluded_event" in instance.exclude_event_types
|
|
|
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be purged
|
|
for days in range(1, 4):
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(1000, 1020):
|
|
_add_state_and_state_changed_event(
|
|
session,
|
|
"sensor.excluded",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
# Add events that should be keeped
|
|
timestamp = dt_util.utcnow() - timedelta(days=1)
|
|
for event_id in range(200, 210):
|
|
session.add(
|
|
Events(
|
|
event_id=event_id,
|
|
event_type="EVENT_KEEP",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
# Add states with linked old_state_ids that need to be handled
|
|
timestamp = dt_util.utcnow() - timedelta(days=0)
|
|
state_1 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=1,
|
|
)
|
|
timestamp = dt_util.utcnow() - timedelta(days=4)
|
|
state_2 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=2,
|
|
)
|
|
state_3 = States(
|
|
entity_id="sensor.linked_old_state_id",
|
|
state="keep",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
old_state_id=62, # keep
|
|
)
|
|
session.add_all((state_1, state_2, state_3))
|
|
session.add(
|
|
Events(
|
|
event_id=231,
|
|
event_type="excluded_event",
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
session.add(
|
|
States(
|
|
entity_id="sensor.old_format",
|
|
state="remove",
|
|
attributes="{}",
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
_convert_pending_events_to_event_types(instance, session)
|
|
_convert_pending_states_to_meta(instance, session)
|
|
|
|
service_data = {"keep_days": 10, "apply_filter": True}
|
|
_add_db_entries(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events_keep = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
|
|
)
|
|
events_purge = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("excluded_event",)))
|
|
)
|
|
states = session.query(States)
|
|
|
|
assert events_keep.count() == 10
|
|
assert events_purge.count() == 1
|
|
assert states.count() == 64
|
|
|
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
|
await hass.async_block_till_done()
|
|
|
|
for _ in range(4):
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events_keep = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
|
|
)
|
|
events_purge = session.query(Events).filter(
|
|
Events.event_type_id.in_(select_event_type_ids(("excluded_event",)))
|
|
)
|
|
states = session.query(States)
|
|
|
|
assert events_keep.count() == 10
|
|
assert events_purge.count() == 0
|
|
assert states.count() == 3
|
|
|
|
assert (
|
|
session.query(States).filter(States.state_id == 61).first().old_state_id
|
|
is None
|
|
)
|
|
assert (
|
|
session.query(States).filter(States.state_id == 62).first().old_state_id
|
|
is None
|
|
)
|
|
assert (
|
|
session.query(States).filter(States.state_id == 63).first().old_state_id
|
|
== 62
|
|
) # should have been kept
|
|
|
|
|
|
async def test_purge_entities(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
|
) -> None:
|
|
"""Test purging of specific entities."""
|
|
instance = await async_setup_recorder_instance(hass)
|
|
|
|
async def _purge_entities(hass, entity_ids, domains, entity_globs):
|
|
service_data = {
|
|
"entity_id": entity_ids,
|
|
"domains": domains,
|
|
"entity_globs": entity_globs,
|
|
}
|
|
|
|
await hass.services.async_call(
|
|
recorder.DOMAIN, SERVICE_PURGE_ENTITIES, service_data
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
def _add_purge_records(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be purged
|
|
for days in range(1, 4):
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(1000, 1020):
|
|
_add_state_and_state_changed_event(
|
|
session,
|
|
"sensor.purge_entity",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(10000, 10020):
|
|
_add_state_and_state_changed_event(
|
|
session,
|
|
"purge_domain.entity",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
timestamp = dt_util.utcnow() - timedelta(days=days)
|
|
for event_id in range(100000, 100020):
|
|
_add_state_and_state_changed_event(
|
|
session,
|
|
"binary_sensor.purge_glob",
|
|
"purgeme",
|
|
timestamp,
|
|
event_id * days,
|
|
)
|
|
_convert_pending_states_to_meta(instance, session)
|
|
|
|
def _add_keep_records(hass: HomeAssistant) -> None:
|
|
with session_scope(hass=hass) as session:
|
|
# Add states and state_changed events that should be kept
|
|
timestamp = dt_util.utcnow() - timedelta(days=2)
|
|
for event_id in range(200, 210):
|
|
_add_state_and_state_changed_event(
|
|
session,
|
|
"sensor.keep",
|
|
"keep",
|
|
timestamp,
|
|
event_id,
|
|
)
|
|
_convert_pending_states_to_meta(instance, session)
|
|
|
|
_add_purge_records(hass)
|
|
_add_keep_records(hass)
|
|
|
|
# Confirm standard service call
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 190
|
|
|
|
await _purge_entities(hass, "sensor.purge_entity", "purge_domain", "*purge_glob")
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 10
|
|
|
|
states_sensor_kept = (
|
|
session.query(States)
|
|
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
|
|
.filter(StatesMeta.entity_id == "sensor.keep")
|
|
)
|
|
assert states_sensor_kept.count() == 10
|
|
|
|
_add_purge_records(hass)
|
|
|
|
# Confirm each parameter purges only the associated records
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 190
|
|
|
|
await _purge_entities(hass, "sensor.purge_entity", [], [])
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 130
|
|
|
|
await _purge_entities(hass, [], "purge_domain", [])
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 70
|
|
|
|
await _purge_entities(hass, [], [], "*purge_glob")
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 10
|
|
|
|
states_sensor_kept = (
|
|
session.query(States)
|
|
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
|
|
.filter(StatesMeta.entity_id == "sensor.keep")
|
|
)
|
|
assert states_sensor_kept.count() == 10
|
|
|
|
_add_purge_records(hass)
|
|
|
|
# Confirm calling service without arguments matches all records (default filter behavior)
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 190
|
|
|
|
await _purge_entities(hass, [], [], [])
|
|
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 0
|
|
|
|
|
|
async def _add_test_states(hass: HomeAssistant):
|
|
"""Add multiple states to the db for testing."""
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
base_attributes = {"test_attr": 5, "test_attr_10": "nice"}
|
|
|
|
async def set_state(entity_id, state, **kwargs):
|
|
"""Set the state."""
|
|
hass.states.async_set(entity_id, state, **kwargs)
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
state = f"autopurgeme_{event_id}"
|
|
attributes = {"autopurgeme": True, **base_attributes}
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
state = f"purgeme_{event_id}"
|
|
attributes = {"purgeme": True, **base_attributes}
|
|
else:
|
|
timestamp = utcnow
|
|
state = f"dontpurgeme_{event_id}"
|
|
attributes = {"dontpurgeme": True, **base_attributes}
|
|
|
|
with patch(
|
|
"homeassistant.components.recorder.core.dt_util.utcnow",
|
|
return_value=timestamp,
|
|
):
|
|
await set_state("test.recorder2", state, attributes=attributes)
|
|
|
|
|
|
async def _add_test_events(hass: HomeAssistant, iterations: int = 1):
|
|
"""Add a few events for testing."""
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
event_data = {"test_attr": 5, "test_attr_10": "nice"}
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
for _ in range(iterations):
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
event_type = "EVENT_TEST_AUTOPURGE"
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
event_type = "EVENT_TEST_PURGE"
|
|
else:
|
|
timestamp = utcnow
|
|
event_type = "EVENT_TEST"
|
|
|
|
session.add(
|
|
Events(
|
|
event_type=event_type,
|
|
event_data=json.dumps(event_data),
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
|
|
|
|
async def _add_events_with_event_data(hass: HomeAssistant, iterations: int = 1):
|
|
"""Add a few events with linked event_data for testing."""
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
event_data = {"test_attr": 5, "test_attr_10": "nice"}
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
for _ in range(iterations):
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
event_type = "EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA"
|
|
shared_data = '{"type":{"EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA"}'
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
event_type = "EVENT_TEST_PURGE_WITH_EVENT_DATA"
|
|
shared_data = '{"type":{"EVENT_TEST_PURGE_WITH_EVENT_DATA"}'
|
|
else:
|
|
timestamp = utcnow
|
|
event_type = "EVENT_TEST_WITH_EVENT_DATA"
|
|
shared_data = '{"type":{"EVENT_TEST_WITH_EVENT_DATA"}'
|
|
|
|
event_data = EventData(hash=1234, shared_data=shared_data)
|
|
|
|
session.add(
|
|
Events(
|
|
event_type=event_type,
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
event_data_rel=event_data,
|
|
)
|
|
)
|
|
|
|
|
|
async def _add_test_statistics(hass: HomeAssistant):
|
|
"""Add multiple statistics to the db for testing."""
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
state = "-11"
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
state = "-5"
|
|
else:
|
|
timestamp = utcnow
|
|
state = "0"
|
|
|
|
session.add(
|
|
StatisticsShortTerm(
|
|
start_ts=timestamp.timestamp(),
|
|
state=state,
|
|
)
|
|
)
|
|
|
|
|
|
async def _add_test_recorder_runs(hass: HomeAssistant):
|
|
"""Add a few recorder_runs for testing."""
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
for rec_id in range(6):
|
|
if rec_id < 2:
|
|
timestamp = eleven_days_ago
|
|
elif rec_id < 4:
|
|
timestamp = five_days_ago
|
|
else:
|
|
timestamp = utcnow
|
|
|
|
session.add(
|
|
RecorderRuns(
|
|
start=timestamp,
|
|
created=dt_util.utcnow(),
|
|
end=timestamp + timedelta(days=1),
|
|
)
|
|
)
|
|
|
|
|
|
async def _add_test_statistics_runs(hass: HomeAssistant):
|
|
"""Add a few recorder_runs for testing."""
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
for rec_id in range(6):
|
|
if rec_id < 2:
|
|
timestamp = eleven_days_ago
|
|
elif rec_id < 4:
|
|
timestamp = five_days_ago
|
|
else:
|
|
timestamp = utcnow
|
|
|
|
session.add(
|
|
StatisticsRuns(
|
|
start=timestamp,
|
|
)
|
|
)
|
|
|
|
|
|
def _add_state_without_event_linkage(
|
|
session: Session,
|
|
entity_id: str,
|
|
state: str,
|
|
timestamp: datetime,
|
|
):
|
|
state_attrs = StateAttributes(
|
|
hash=1234, shared_attrs=json.dumps({entity_id: entity_id})
|
|
)
|
|
session.add(state_attrs)
|
|
session.add(
|
|
States(
|
|
entity_id=entity_id,
|
|
state=state,
|
|
attributes=None,
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
event_id=None,
|
|
state_attributes=state_attrs,
|
|
)
|
|
)
|
|
|
|
|
|
def _add_state_and_state_changed_event(
|
|
session: Session,
|
|
entity_id: str,
|
|
state: str,
|
|
timestamp: datetime,
|
|
event_id: int,
|
|
) -> None:
|
|
"""Add state and state_changed event to database for testing."""
|
|
state_attrs = StateAttributes(
|
|
hash=event_id, shared_attrs=json.dumps({entity_id: entity_id})
|
|
)
|
|
session.add(state_attrs)
|
|
session.add(
|
|
States(
|
|
entity_id=entity_id,
|
|
state=state,
|
|
attributes=None,
|
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
event_id=event_id,
|
|
state_attributes=state_attrs,
|
|
)
|
|
)
|
|
session.add(
|
|
Events(
|
|
event_id=event_id,
|
|
event_type=EVENT_STATE_CHANGED,
|
|
event_data="{}",
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
|
|
|
|
async def test_purge_many_old_events(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
|
) -> None:
|
|
"""Test deleting old events."""
|
|
instance = await async_setup_recorder_instance(hass)
|
|
|
|
await _add_test_events(hass, SQLITE_MAX_BIND_VARS)
|
|
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
|
|
assert events.count() == SQLITE_MAX_BIND_VARS * 6
|
|
|
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(
|
|
instance,
|
|
purge_before,
|
|
repack=False,
|
|
states_batch_size=3,
|
|
events_batch_size=3,
|
|
)
|
|
assert not finished
|
|
assert events.count() == SQLITE_MAX_BIND_VARS * 3
|
|
|
|
# we should only have 2 groups of events left
|
|
finished = purge_old_data(
|
|
instance,
|
|
purge_before,
|
|
repack=False,
|
|
states_batch_size=3,
|
|
events_batch_size=3,
|
|
)
|
|
assert finished
|
|
assert events.count() == SQLITE_MAX_BIND_VARS * 2
|
|
|
|
# we should now purge everything
|
|
finished = purge_old_data(
|
|
instance,
|
|
dt_util.utcnow(),
|
|
repack=False,
|
|
states_batch_size=20,
|
|
events_batch_size=20,
|
|
)
|
|
assert finished
|
|
assert events.count() == 0
|
|
|
|
|
|
async def test_purge_can_mix_legacy_and_new_format(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
|
) -> None:
|
|
"""Test purging with legacy a new events."""
|
|
instance = await async_setup_recorder_instance(hass)
|
|
await async_wait_recording_done(hass)
|
|
# New databases are no longer created with the legacy events index
|
|
assert instance.use_legacy_events_index is False
|
|
|
|
def _recreate_legacy_events_index():
|
|
"""Recreate the legacy events index since its no longer created on new instances."""
|
|
migration._create_index(instance.get_session, "states", "ix_states_event_id")
|
|
instance.use_legacy_events_index = True
|
|
|
|
await instance.async_add_executor_job(_recreate_legacy_events_index)
|
|
assert instance.use_legacy_events_index is True
|
|
|
|
utcnow = dt_util.utcnow()
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
with session_scope(hass=hass) as session:
|
|
broken_state_no_time = States(
|
|
event_id=None,
|
|
entity_id="orphened.state",
|
|
last_updated_ts=None,
|
|
last_changed_ts=None,
|
|
)
|
|
session.add(broken_state_no_time)
|
|
start_id = 50000
|
|
for event_id in range(start_id, start_id + 50):
|
|
_add_state_and_state_changed_event(
|
|
session,
|
|
"sensor.excluded",
|
|
"purgeme",
|
|
eleven_days_ago,
|
|
event_id,
|
|
)
|
|
await _add_test_events(hass, 50)
|
|
await _add_events_with_event_data(hass, 50)
|
|
with session_scope(hass=hass) as session:
|
|
for _ in range(50):
|
|
_add_state_without_event_linkage(
|
|
session, "switch.random", "on", eleven_days_ago
|
|
)
|
|
states_with_event_id = session.query(States).filter(
|
|
States.event_id.is_not(None)
|
|
)
|
|
states_without_event_id = session.query(States).filter(
|
|
States.event_id.is_(None)
|
|
)
|
|
|
|
assert states_with_event_id.count() == 50
|
|
assert states_without_event_id.count() == 51
|
|
|
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
|
finished = purge_old_data(
|
|
instance,
|
|
purge_before,
|
|
repack=False,
|
|
)
|
|
assert not finished
|
|
assert states_with_event_id.count() == 0
|
|
assert states_without_event_id.count() == 51
|
|
# At this point all the legacy states are gone
|
|
# and we switch methods
|
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
|
finished = purge_old_data(
|
|
instance,
|
|
purge_before,
|
|
repack=False,
|
|
events_batch_size=1,
|
|
states_batch_size=1,
|
|
)
|
|
# Since we only allow one iteration, we won't
|
|
# check if we are finished this loop similar
|
|
# to the legacy method
|
|
assert not finished
|
|
assert states_with_event_id.count() == 0
|
|
assert states_without_event_id.count() == 1
|
|
finished = purge_old_data(
|
|
instance,
|
|
purge_before,
|
|
repack=False,
|
|
events_batch_size=100,
|
|
states_batch_size=100,
|
|
)
|
|
assert finished
|
|
assert states_with_event_id.count() == 0
|
|
assert states_without_event_id.count() == 1
|
|
_add_state_without_event_linkage(
|
|
session, "switch.random", "on", eleven_days_ago
|
|
)
|
|
assert states_with_event_id.count() == 0
|
|
assert states_without_event_id.count() == 2
|
|
finished = purge_old_data(
|
|
instance,
|
|
purge_before,
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
# The broken state without a timestamp
|
|
# does not prevent future purges. Its ignored.
|
|
assert states_with_event_id.count() == 0
|
|
assert states_without_event_id.count() == 1
|
|
|
|
|
|
async def test_purge_old_events_purges_the_event_type_ids(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
|
) -> None:
|
|
"""Test deleting old events purges event type ids."""
|
|
instance = await async_setup_recorder_instance(hass)
|
|
assert instance.event_type_manager.active is True
|
|
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
far_past = utcnow - timedelta(days=1000)
|
|
event_data = {"test_attr": 5, "test_attr_10": "nice"}
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
def _insert_events():
|
|
with session_scope(hass=hass) as session:
|
|
event_type_test_auto_purge = EventTypes(event_type="EVENT_TEST_AUTOPURGE")
|
|
event_type_test_purge = EventTypes(event_type="EVENT_TEST_PURGE")
|
|
event_type_test = EventTypes(event_type="EVENT_TEST")
|
|
event_type_unused = EventTypes(event_type="EVENT_TEST_UNUSED")
|
|
session.add_all(
|
|
(
|
|
event_type_test_auto_purge,
|
|
event_type_test_purge,
|
|
event_type_test,
|
|
event_type_unused,
|
|
)
|
|
)
|
|
session.flush()
|
|
for _ in range(5):
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
event_type = event_type_test_auto_purge
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
event_type = event_type_test_purge
|
|
else:
|
|
timestamp = utcnow
|
|
event_type = event_type_test
|
|
|
|
session.add(
|
|
Events(
|
|
event_type=None,
|
|
event_type_id=event_type.event_type_id,
|
|
event_data=json_dumps(event_data),
|
|
origin="LOCAL",
|
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
return instance.event_type_manager.get_many(
|
|
[
|
|
"EVENT_TEST_AUTOPURGE",
|
|
"EVENT_TEST_PURGE",
|
|
"EVENT_TEST",
|
|
"EVENT_TEST_UNUSED",
|
|
],
|
|
session,
|
|
)
|
|
|
|
event_type_to_id = await instance.async_add_executor_job(_insert_events)
|
|
test_event_type_ids = event_type_to_id.values()
|
|
with session_scope(hass=hass) as session:
|
|
events = session.query(Events).where(
|
|
Events.event_type_id.in_(test_event_type_ids)
|
|
)
|
|
event_types = session.query(EventTypes).where(
|
|
EventTypes.event_type_id.in_(test_event_type_ids)
|
|
)
|
|
|
|
assert events.count() == 30
|
|
assert event_types.count() == 4
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(
|
|
instance,
|
|
far_past,
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
assert events.count() == 30
|
|
# We should remove the unused event type
|
|
assert event_types.count() == 3
|
|
|
|
assert "EVENT_TEST_UNUSED" not in instance.event_type_manager._id_map
|
|
|
|
# we should only have 10 events left since
|
|
# only one event type was recorded now
|
|
finished = purge_old_data(
|
|
instance,
|
|
utcnow,
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
assert events.count() == 10
|
|
assert event_types.count() == 1
|
|
|
|
# Purge everything
|
|
finished = purge_old_data(
|
|
instance,
|
|
utcnow + timedelta(seconds=1),
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
assert events.count() == 0
|
|
assert event_types.count() == 0
|
|
|
|
|
|
async def test_purge_old_states_purges_the_state_metadata_ids(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
|
) -> None:
|
|
"""Test deleting old states purges state metadata_ids."""
|
|
instance = await async_setup_recorder_instance(hass)
|
|
assert instance.states_meta_manager.active is True
|
|
|
|
utcnow = dt_util.utcnow()
|
|
five_days_ago = utcnow - timedelta(days=5)
|
|
eleven_days_ago = utcnow - timedelta(days=11)
|
|
far_past = utcnow - timedelta(days=1000)
|
|
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
|
|
def _insert_states():
|
|
with session_scope(hass=hass) as session:
|
|
states_meta_sensor_one = StatesMeta(entity_id="sensor.one")
|
|
states_meta_sensor_two = StatesMeta(entity_id="sensor.two")
|
|
states_meta_sensor_three = StatesMeta(entity_id="sensor.three")
|
|
states_meta_sensor_unused = StatesMeta(entity_id="sensor.unused")
|
|
session.add_all(
|
|
(
|
|
states_meta_sensor_one,
|
|
states_meta_sensor_two,
|
|
states_meta_sensor_three,
|
|
states_meta_sensor_unused,
|
|
)
|
|
)
|
|
session.flush()
|
|
for _ in range(5):
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
metadata_id = states_meta_sensor_one.metadata_id
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
metadata_id = states_meta_sensor_two.metadata_id
|
|
else:
|
|
timestamp = utcnow
|
|
metadata_id = states_meta_sensor_three.metadata_id
|
|
|
|
session.add(
|
|
States(
|
|
metadata_id=metadata_id,
|
|
state="any",
|
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
|
)
|
|
)
|
|
return instance.states_meta_manager.get_many(
|
|
["sensor.one", "sensor.two", "sensor.three", "sensor.unused"],
|
|
session,
|
|
True,
|
|
)
|
|
|
|
entity_id_to_metadata_id = await instance.async_add_executor_job(_insert_states)
|
|
test_metadata_ids = entity_id_to_metadata_id.values()
|
|
with session_scope(hass=hass) as session:
|
|
states = session.query(States).where(States.metadata_id.in_(test_metadata_ids))
|
|
states_meta = session.query(StatesMeta).where(
|
|
StatesMeta.metadata_id.in_(test_metadata_ids)
|
|
)
|
|
|
|
assert states.count() == 30
|
|
assert states_meta.count() == 4
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(
|
|
instance,
|
|
far_past,
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
assert states.count() == 30
|
|
# We should remove the unused entity_id
|
|
assert states_meta.count() == 3
|
|
|
|
assert "sensor.unused" not in instance.event_type_manager._id_map
|
|
|
|
# we should only have 10 states left since
|
|
# only one event type was recorded now
|
|
finished = purge_old_data(
|
|
instance,
|
|
utcnow,
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
assert states.count() == 10
|
|
assert states_meta.count() == 1
|
|
|
|
# Purge everything
|
|
finished = purge_old_data(
|
|
instance,
|
|
utcnow + timedelta(seconds=1),
|
|
repack=False,
|
|
)
|
|
assert finished
|
|
assert states.count() == 0
|
|
assert states_meta.count() == 0
|
|
|
|
|
|
async def test_purge_entities_keep_days(
|
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
|
hass: HomeAssistant,
|
|
) -> None:
|
|
"""Test purging states with an entity filter and keep_days."""
|
|
instance = await async_setup_recorder_instance(hass, {})
|
|
await hass.async_block_till_done()
|
|
await async_wait_recording_done(hass)
|
|
start = dt_util.utcnow()
|
|
two_days_ago = start - timedelta(days=2)
|
|
one_week_ago = start - timedelta(days=7)
|
|
one_month_ago = start - timedelta(days=30)
|
|
with freeze_time(one_week_ago):
|
|
hass.states.async_set("sensor.keep", "initial")
|
|
hass.states.async_set("sensor.purge", "initial")
|
|
|
|
await async_wait_recording_done(hass)
|
|
|
|
with freeze_time(two_days_ago):
|
|
hass.states.async_set("sensor.purge", "two_days_ago")
|
|
|
|
await async_wait_recording_done(hass)
|
|
|
|
hass.states.async_set("sensor.purge", "now")
|
|
hass.states.async_set("sensor.keep", "now")
|
|
await async_recorder_block_till_done(hass)
|
|
|
|
states = await instance.async_add_executor_job(
|
|
get_significant_states, hass, one_month_ago
|
|
)
|
|
assert len(states["sensor.keep"]) == 2
|
|
assert len(states["sensor.purge"]) == 3
|
|
|
|
await hass.services.async_call(
|
|
recorder.DOMAIN,
|
|
SERVICE_PURGE_ENTITIES,
|
|
{
|
|
"entity_id": "sensor.purge",
|
|
"keep_days": 1,
|
|
},
|
|
)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
states = await instance.async_add_executor_job(
|
|
get_significant_states, hass, one_month_ago
|
|
)
|
|
assert len(states["sensor.keep"]) == 2
|
|
assert len(states["sensor.purge"]) == 1
|
|
|
|
await hass.services.async_call(
|
|
recorder.DOMAIN,
|
|
SERVICE_PURGE_ENTITIES,
|
|
{
|
|
"entity_id": "sensor.purge",
|
|
},
|
|
)
|
|
await async_recorder_block_till_done(hass)
|
|
await async_wait_purge_done(hass)
|
|
|
|
states = await instance.async_add_executor_job(
|
|
get_significant_states, hass, one_month_ago
|
|
)
|
|
assert len(states["sensor.keep"]) == 2
|
|
assert "sensor.purge" not in states
|