Ensure purge can cleanup old format detached states in the database (#92145)
This commit is contained in:
parent
293fb374ed
commit
e156d3132e
3 changed files with 217 additions and 8 deletions
|
@ -34,6 +34,7 @@ from .queries import (
|
|||
find_event_types_to_purge,
|
||||
find_events_to_purge,
|
||||
find_latest_statistics_runs_run_id,
|
||||
find_legacy_detached_states_and_attributes_to_purge,
|
||||
find_legacy_event_state_and_attributes_and_data_ids_to_purge,
|
||||
find_legacy_row,
|
||||
find_short_term_statistics_to_purge,
|
||||
|
@ -146,7 +147,28 @@ def _purge_legacy_format(
|
|||
_purge_unused_attributes_ids(instance, session, attributes_ids)
|
||||
_purge_event_ids(session, event_ids)
|
||||
_purge_unused_data_ids(instance, session, data_ids)
|
||||
return bool(event_ids or state_ids or attributes_ids or data_ids)
|
||||
|
||||
# The database may still have some rows that have an event_id but are not
|
||||
# linked to any event. These rows are not linked to any event because the
|
||||
# event was deleted. We need to purge these rows as well or we will never
|
||||
# switch to the new format which will prevent us from purging any events
|
||||
# that happened after the detached states.
|
||||
(
|
||||
detached_state_ids,
|
||||
detached_attributes_ids,
|
||||
) = _select_legacy_detached_state_and_attributes_and_data_ids_to_purge(
|
||||
session, purge_before
|
||||
)
|
||||
_purge_state_ids(instance, session, detached_state_ids)
|
||||
_purge_unused_attributes_ids(instance, session, detached_attributes_ids)
|
||||
return bool(
|
||||
event_ids
|
||||
or state_ids
|
||||
or attributes_ids
|
||||
or data_ids
|
||||
or detached_state_ids
|
||||
or detached_attributes_ids
|
||||
)
|
||||
|
||||
|
||||
def _purge_states_and_attributes_ids(
|
||||
|
@ -412,6 +434,31 @@ def _select_short_term_statistics_to_purge(
|
|||
return [statistic.id for statistic in statistics]
|
||||
|
||||
|
||||
def _select_legacy_detached_state_and_attributes_and_data_ids_to_purge(
|
||||
session: Session, purge_before: datetime
|
||||
) -> tuple[set[int], set[int]]:
|
||||
"""Return a list of state, and attribute ids to purge.
|
||||
|
||||
We do not link these anymore since state_change events
|
||||
do not exist in the events table anymore, however we
|
||||
still need to be able to purge them.
|
||||
"""
|
||||
states = session.execute(
|
||||
find_legacy_detached_states_and_attributes_to_purge(
|
||||
dt_util.utc_to_timestamp(purge_before)
|
||||
)
|
||||
).all()
|
||||
_LOGGER.debug("Selected %s state ids to remove", len(states))
|
||||
state_ids = set()
|
||||
attributes_ids = set()
|
||||
for state in states:
|
||||
if state_id := state.state_id:
|
||||
state_ids.add(state_id)
|
||||
if attributes_id := state.attributes_id:
|
||||
attributes_ids.add(attributes_id)
|
||||
return state_ids, attributes_ids
|
||||
|
||||
|
||||
def _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
|
||||
session: Session, purge_before: datetime
|
||||
) -> tuple[set[int], set[int], set[int], set[int]]:
|
||||
|
@ -433,12 +480,12 @@ def _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
|
|||
data_ids = set()
|
||||
for event in events:
|
||||
event_ids.add(event.event_id)
|
||||
if event.state_id:
|
||||
state_ids.add(event.state_id)
|
||||
if event.attributes_id:
|
||||
attributes_ids.add(event.attributes_id)
|
||||
if event.data_id:
|
||||
data_ids.add(event.data_id)
|
||||
if state_id := event.state_id:
|
||||
state_ids.add(state_id)
|
||||
if attributes_id := event.attributes_id:
|
||||
attributes_ids.add(attributes_id)
|
||||
if data_id := event.data_id:
|
||||
data_ids.add(data_id)
|
||||
return event_ids, state_ids, attributes_ids, data_ids
|
||||
|
||||
|
||||
|
|
|
@ -678,6 +678,22 @@ def find_legacy_event_state_and_attributes_and_data_ids_to_purge(
|
|||
)
|
||||
|
||||
|
||||
def find_legacy_detached_states_and_attributes_to_purge(
|
||||
purge_before: float,
|
||||
) -> StatementLambdaElement:
|
||||
"""Find states rows with event_id set but not linked event_id in Events."""
|
||||
return lambda_stmt(
|
||||
lambda: select(States.state_id, States.attributes_id)
|
||||
.outerjoin(Events, States.event_id == Events.event_id)
|
||||
.filter(States.event_id.isnot(None))
|
||||
.filter(
|
||||
(States.last_updated_ts < purge_before) | States.last_updated_ts.is_(None)
|
||||
)
|
||||
.filter(Events.event_id.is_(None))
|
||||
.limit(SQLITE_MAX_BIND_VARS)
|
||||
)
|
||||
|
||||
|
||||
def find_legacy_row() -> StatementLambdaElement:
|
||||
"""Check if there are still states in the table with an event_id."""
|
||||
# https://github.com/sqlalchemy/sqlalchemy/issues/9189
|
||||
|
|
|
@ -8,6 +8,7 @@ from unittest.mock import MagicMock, patch
|
|||
|
||||
from freezegun import freeze_time
|
||||
import pytest
|
||||
from sqlalchemy import text, update
|
||||
from sqlalchemy.exc import DatabaseError, OperationalError
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
|
@ -1000,7 +1001,7 @@ async def test_purge_many_old_events(
|
|||
async def test_purge_can_mix_legacy_and_new_format(
|
||||
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Test purging with legacy a new events."""
|
||||
"""Test purging with legacy and new events."""
|
||||
instance = await async_setup_recorder_instance(hass)
|
||||
await _async_attach_db_engine(hass)
|
||||
|
||||
|
@ -1018,6 +1019,7 @@ async def test_purge_can_mix_legacy_and_new_format(
|
|||
|
||||
utcnow = dt_util.utcnow()
|
||||
eleven_days_ago = utcnow - timedelta(days=11)
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
broken_state_no_time = States(
|
||||
event_id=None,
|
||||
|
@ -1104,6 +1106,150 @@ async def test_purge_can_mix_legacy_and_new_format(
|
|||
assert states_without_event_id.count() == 1
|
||||
|
||||
|
||||
async def test_purge_can_mix_legacy_and_new_format_with_detached_state(
|
||||
async_setup_recorder_instance: RecorderInstanceGenerator,
|
||||
hass: HomeAssistant,
|
||||
recorder_db_url: str,
|
||||
) -> None:
|
||||
"""Test purging with legacy and new events with a detached state."""
|
||||
if recorder_db_url.startswith(("mysql://", "postgresql://")):
|
||||
return pytest.skip("This tests disables foreign key checks on SQLite")
|
||||
|
||||
instance = await async_setup_recorder_instance(hass)
|
||||
await _async_attach_db_engine(hass)
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
# New databases are no longer created with the legacy events index
|
||||
assert instance.use_legacy_events_index is False
|
||||
|
||||
def _recreate_legacy_events_index():
|
||||
"""Recreate the legacy events index since its no longer created on new instances."""
|
||||
migration._create_index(instance.get_session, "states", "ix_states_event_id")
|
||||
instance.use_legacy_events_index = True
|
||||
|
||||
await instance.async_add_executor_job(_recreate_legacy_events_index)
|
||||
assert instance.use_legacy_events_index is True
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
session.execute(text("PRAGMA foreign_keys = OFF"))
|
||||
|
||||
utcnow = dt_util.utcnow()
|
||||
eleven_days_ago = utcnow - timedelta(days=11)
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
broken_state_no_time = States(
|
||||
event_id=None,
|
||||
entity_id="orphened.state",
|
||||
last_updated_ts=None,
|
||||
last_changed_ts=None,
|
||||
)
|
||||
session.add(broken_state_no_time)
|
||||
detached_state_deleted_event_id = States(
|
||||
event_id=99999999999,
|
||||
entity_id="event.deleted",
|
||||
last_updated_ts=1,
|
||||
last_changed_ts=None,
|
||||
)
|
||||
session.add(detached_state_deleted_event_id)
|
||||
detached_state_deleted_event_id.last_changed = None
|
||||
detached_state_deleted_event_id.last_changed_ts = None
|
||||
detached_state_deleted_event_id.last_updated = None
|
||||
detached_state_deleted_event_id = States(
|
||||
event_id=99999999999,
|
||||
entity_id="event.deleted.no_time",
|
||||
last_updated_ts=None,
|
||||
last_changed_ts=None,
|
||||
)
|
||||
detached_state_deleted_event_id.last_changed = None
|
||||
detached_state_deleted_event_id.last_changed_ts = None
|
||||
detached_state_deleted_event_id.last_updated = None
|
||||
detached_state_deleted_event_id.last_updated_ts = None
|
||||
session.add(detached_state_deleted_event_id)
|
||||
start_id = 50000
|
||||
for event_id in range(start_id, start_id + 50):
|
||||
_add_state_and_state_changed_event(
|
||||
session,
|
||||
"sensor.excluded",
|
||||
"purgeme",
|
||||
eleven_days_ago,
|
||||
event_id,
|
||||
)
|
||||
with session_scope(hass=hass) as session:
|
||||
session.execute(
|
||||
update(States)
|
||||
.where(States.entity_id == "event.deleted.no_time")
|
||||
.values(last_updated_ts=None)
|
||||
)
|
||||
|
||||
await _add_test_events(hass, 50)
|
||||
await _add_events_with_event_data(hass, 50)
|
||||
with session_scope(hass=hass) as session:
|
||||
for _ in range(50):
|
||||
_add_state_without_event_linkage(
|
||||
session, "switch.random", "on", eleven_days_ago
|
||||
)
|
||||
states_with_event_id = session.query(States).filter(
|
||||
States.event_id.is_not(None)
|
||||
)
|
||||
states_without_event_id = session.query(States).filter(
|
||||
States.event_id.is_(None)
|
||||
)
|
||||
|
||||
assert states_with_event_id.count() == 52
|
||||
assert states_without_event_id.count() == 51
|
||||
|
||||
purge_before = dt_util.utcnow() - timedelta(days=4)
|
||||
finished = purge_old_data(
|
||||
instance,
|
||||
purge_before,
|
||||
repack=False,
|
||||
)
|
||||
assert not finished
|
||||
assert states_with_event_id.count() == 0
|
||||
assert states_without_event_id.count() == 51
|
||||
# At this point all the legacy states are gone
|
||||
# and we switch methods
|
||||
purge_before = dt_util.utcnow() - timedelta(days=4)
|
||||
finished = purge_old_data(
|
||||
instance,
|
||||
purge_before,
|
||||
repack=False,
|
||||
events_batch_size=1,
|
||||
states_batch_size=1,
|
||||
)
|
||||
# Since we only allow one iteration, we won't
|
||||
# check if we are finished this loop similar
|
||||
# to the legacy method
|
||||
assert not finished
|
||||
assert states_with_event_id.count() == 0
|
||||
assert states_without_event_id.count() == 1
|
||||
finished = purge_old_data(
|
||||
instance,
|
||||
purge_before,
|
||||
repack=False,
|
||||
events_batch_size=100,
|
||||
states_batch_size=100,
|
||||
)
|
||||
assert finished
|
||||
assert states_with_event_id.count() == 0
|
||||
assert states_without_event_id.count() == 1
|
||||
_add_state_without_event_linkage(
|
||||
session, "switch.random", "on", eleven_days_ago
|
||||
)
|
||||
assert states_with_event_id.count() == 0
|
||||
assert states_without_event_id.count() == 2
|
||||
finished = purge_old_data(
|
||||
instance,
|
||||
purge_before,
|
||||
repack=False,
|
||||
)
|
||||
assert finished
|
||||
# The broken state without a timestamp
|
||||
# does not prevent future purges. Its ignored.
|
||||
assert states_with_event_id.count() == 0
|
||||
assert states_without_event_id.count() == 1
|
||||
|
||||
|
||||
async def test_purge_entities_keep_days(
|
||||
async_setup_recorder_instance: RecorderInstanceGenerator,
|
||||
hass: HomeAssistant,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue