Fixing purging legacy rows and improve performance (#71916)
This commit is contained in:
parent
089eb9960a
commit
a70e2a33dc
4 changed files with 530 additions and 81 deletions
|
@ -130,10 +130,10 @@ class Events(Base): # type: ignore[misc,valid-type]
|
||||||
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
|
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
|
||||||
)
|
)
|
||||||
__tablename__ = TABLE_EVENTS
|
__tablename__ = TABLE_EVENTS
|
||||||
event_id = Column(Integer, Identity(), primary_key=True) # no longer used
|
event_id = Column(Integer, Identity(), primary_key=True)
|
||||||
event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE))
|
event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE))
|
||||||
event_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
|
event_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
|
||||||
origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used
|
origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used for new rows
|
||||||
origin_idx = Column(SmallInteger)
|
origin_idx = Column(SmallInteger)
|
||||||
time_fired = Column(DATETIME_TYPE, index=True)
|
time_fired = Column(DATETIME_TYPE, index=True)
|
||||||
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
|
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
|
||||||
|
@ -247,8 +247,10 @@ class States(Base): # type: ignore[misc,valid-type]
|
||||||
state_id = Column(Integer, Identity(), primary_key=True)
|
state_id = Column(Integer, Identity(), primary_key=True)
|
||||||
entity_id = Column(String(MAX_LENGTH_STATE_ENTITY_ID))
|
entity_id = Column(String(MAX_LENGTH_STATE_ENTITY_ID))
|
||||||
state = Column(String(MAX_LENGTH_STATE_STATE))
|
state = Column(String(MAX_LENGTH_STATE_STATE))
|
||||||
attributes = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
|
attributes = Column(
|
||||||
event_id = Column(
|
Text().with_variant(mysql.LONGTEXT, "mysql")
|
||||||
|
) # no longer used for new rows
|
||||||
|
event_id = Column( # no longer used for new rows
|
||||||
Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True
|
Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True
|
||||||
)
|
)
|
||||||
last_changed = Column(DATETIME_TYPE)
|
last_changed = Column(DATETIME_TYPE)
|
||||||
|
|
|
@ -3,9 +3,10 @@ from __future__ import annotations
|
||||||
|
|
||||||
from collections.abc import Callable, Iterable
|
from collections.abc import Callable, Iterable
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from itertools import zip_longest
|
from functools import partial
|
||||||
|
from itertools import islice, zip_longest
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
from sqlalchemy.orm.session import Session
|
from sqlalchemy.orm.session import Session
|
||||||
from sqlalchemy.sql.expression import distinct
|
from sqlalchemy.sql.expression import distinct
|
||||||
|
@ -29,6 +30,8 @@ from .queries import (
|
||||||
disconnect_states_rows,
|
disconnect_states_rows,
|
||||||
find_events_to_purge,
|
find_events_to_purge,
|
||||||
find_latest_statistics_runs_run_id,
|
find_latest_statistics_runs_run_id,
|
||||||
|
find_legacy_event_state_and_attributes_and_data_ids_to_purge,
|
||||||
|
find_legacy_row,
|
||||||
find_short_term_statistics_to_purge,
|
find_short_term_statistics_to_purge,
|
||||||
find_states_to_purge,
|
find_states_to_purge,
|
||||||
find_statistics_runs_to_purge,
|
find_statistics_runs_to_purge,
|
||||||
|
@ -42,9 +45,34 @@ if TYPE_CHECKING:
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
DEFAULT_STATES_BATCHES_PER_PURGE = 20 # We expect ~95% de-dupe rate
|
||||||
|
DEFAULT_EVENTS_BATCHES_PER_PURGE = 15 # We expect ~92% de-dupe rate
|
||||||
|
|
||||||
|
|
||||||
|
def take(take_num: int, iterable: Iterable) -> list[Any]:
|
||||||
|
"""Return first n items of the iterable as a list.
|
||||||
|
|
||||||
|
From itertools recipes
|
||||||
|
"""
|
||||||
|
return list(islice(iterable, take_num))
|
||||||
|
|
||||||
|
|
||||||
|
def chunked(iterable: Iterable, chunked_num: int) -> Iterable[Any]:
|
||||||
|
"""Break *iterable* into lists of length *n*.
|
||||||
|
|
||||||
|
From more-itertools
|
||||||
|
"""
|
||||||
|
return iter(partial(take, chunked_num, iter(iterable)), [])
|
||||||
|
|
||||||
|
|
||||||
@retryable_database_job("purge")
|
@retryable_database_job("purge")
|
||||||
def purge_old_data(
|
def purge_old_data(
|
||||||
instance: Recorder, purge_before: datetime, repack: bool, apply_filter: bool = False
|
instance: Recorder,
|
||||||
|
purge_before: datetime,
|
||||||
|
repack: bool,
|
||||||
|
apply_filter: bool = False,
|
||||||
|
events_batch_size: int = DEFAULT_EVENTS_BATCHES_PER_PURGE,
|
||||||
|
states_batch_size: int = DEFAULT_STATES_BATCHES_PER_PURGE,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Purge events and states older than purge_before.
|
"""Purge events and states older than purge_before.
|
||||||
|
|
||||||
|
@ -58,40 +86,37 @@ def purge_old_data(
|
||||||
|
|
||||||
with session_scope(session=instance.get_session()) as session:
|
with session_scope(session=instance.get_session()) as session:
|
||||||
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
|
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
|
||||||
(
|
has_more_to_purge = False
|
||||||
event_ids,
|
if _purging_legacy_format(session):
|
||||||
state_ids,
|
_LOGGER.debug(
|
||||||
attributes_ids,
|
"Purge running in legacy format as there are states with event_id remaining"
|
||||||
data_ids,
|
)
|
||||||
) = _select_event_state_attributes_ids_data_ids_to_purge(session, purge_before)
|
has_more_to_purge |= _purge_legacy_format(
|
||||||
|
instance, session, purge_before, using_sqlite
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
_LOGGER.debug(
|
||||||
|
"Purge running in new format as there are NO states with event_id remaining"
|
||||||
|
)
|
||||||
|
# Once we are done purging legacy rows, we use the new method
|
||||||
|
has_more_to_purge |= _purge_states_and_attributes_ids(
|
||||||
|
instance, session, states_batch_size, purge_before, using_sqlite
|
||||||
|
)
|
||||||
|
has_more_to_purge |= _purge_events_and_data_ids(
|
||||||
|
instance, session, events_batch_size, purge_before, using_sqlite
|
||||||
|
)
|
||||||
|
|
||||||
statistics_runs = _select_statistics_runs_to_purge(session, purge_before)
|
statistics_runs = _select_statistics_runs_to_purge(session, purge_before)
|
||||||
short_term_statistics = _select_short_term_statistics_to_purge(
|
short_term_statistics = _select_short_term_statistics_to_purge(
|
||||||
session, purge_before
|
session, purge_before
|
||||||
)
|
)
|
||||||
|
|
||||||
if state_ids:
|
|
||||||
_purge_state_ids(instance, session, state_ids)
|
|
||||||
|
|
||||||
if unused_attribute_ids_set := _select_unused_attributes_ids(
|
|
||||||
session, attributes_ids, using_sqlite
|
|
||||||
):
|
|
||||||
_purge_attributes_ids(instance, session, unused_attribute_ids_set)
|
|
||||||
|
|
||||||
if event_ids:
|
|
||||||
_purge_event_ids(session, event_ids)
|
|
||||||
|
|
||||||
if unused_data_ids_set := _select_unused_event_data_ids(
|
|
||||||
session, data_ids, using_sqlite
|
|
||||||
):
|
|
||||||
_purge_event_data_ids(instance, session, unused_data_ids_set)
|
|
||||||
|
|
||||||
if statistics_runs:
|
if statistics_runs:
|
||||||
_purge_statistics_runs(session, statistics_runs)
|
_purge_statistics_runs(session, statistics_runs)
|
||||||
|
|
||||||
if short_term_statistics:
|
if short_term_statistics:
|
||||||
_purge_short_term_statistics(session, short_term_statistics)
|
_purge_short_term_statistics(session, short_term_statistics)
|
||||||
|
|
||||||
if state_ids or event_ids or statistics_runs or short_term_statistics:
|
if has_more_to_purge or statistics_runs or short_term_statistics:
|
||||||
# Return false, as we might not be done yet.
|
# Return false, as we might not be done yet.
|
||||||
_LOGGER.debug("Purging hasn't fully completed yet")
|
_LOGGER.debug("Purging hasn't fully completed yet")
|
||||||
return False
|
return False
|
||||||
|
@ -106,27 +131,132 @@ def purge_old_data(
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def _select_event_state_attributes_ids_data_ids_to_purge(
|
def _purging_legacy_format(session: Session) -> bool:
|
||||||
|
"""Check if there are any legacy event_id linked states rows remaining."""
|
||||||
|
return bool(session.execute(find_legacy_row()).scalar())
|
||||||
|
|
||||||
|
|
||||||
|
def _purge_legacy_format(
|
||||||
|
instance: Recorder, session: Session, purge_before: datetime, using_sqlite: bool
|
||||||
|
) -> bool:
|
||||||
|
"""Purge rows that are still linked by the event_ids."""
|
||||||
|
(
|
||||||
|
event_ids,
|
||||||
|
state_ids,
|
||||||
|
attributes_ids,
|
||||||
|
data_ids,
|
||||||
|
) = _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
|
||||||
|
session, purge_before
|
||||||
|
)
|
||||||
|
if state_ids:
|
||||||
|
_purge_state_ids(instance, session, state_ids)
|
||||||
|
_purge_unused_attributes_ids(instance, session, attributes_ids, using_sqlite)
|
||||||
|
if event_ids:
|
||||||
|
_purge_event_ids(session, event_ids)
|
||||||
|
_purge_unused_data_ids(instance, session, data_ids, using_sqlite)
|
||||||
|
return bool(event_ids or state_ids or attributes_ids or data_ids)
|
||||||
|
|
||||||
|
|
||||||
|
def _purge_states_and_attributes_ids(
|
||||||
|
instance: Recorder,
|
||||||
|
session: Session,
|
||||||
|
states_batch_size: int,
|
||||||
|
purge_before: datetime,
|
||||||
|
using_sqlite: bool,
|
||||||
|
) -> bool:
|
||||||
|
"""Purge states and linked attributes id in a batch.
|
||||||
|
|
||||||
|
Returns true if there are more states to purge.
|
||||||
|
"""
|
||||||
|
has_remaining_state_ids_to_purge = True
|
||||||
|
# There are more states relative to attributes_ids so
|
||||||
|
# we purge enough state_ids to try to generate a full
|
||||||
|
# size batch of attributes_ids that will be around the size
|
||||||
|
# MAX_ROWS_TO_PURGE
|
||||||
|
attributes_ids_batch: set[int] = set()
|
||||||
|
for _ in range(states_batch_size):
|
||||||
|
state_ids, attributes_ids = _select_state_attributes_ids_to_purge(
|
||||||
|
session, purge_before
|
||||||
|
)
|
||||||
|
if not state_ids:
|
||||||
|
has_remaining_state_ids_to_purge = False
|
||||||
|
break
|
||||||
|
_purge_state_ids(instance, session, state_ids)
|
||||||
|
attributes_ids_batch = attributes_ids_batch | attributes_ids
|
||||||
|
|
||||||
|
_purge_unused_attributes_ids(instance, session, attributes_ids_batch, using_sqlite)
|
||||||
|
_LOGGER.debug(
|
||||||
|
"After purging states and attributes_ids remaining=%s",
|
||||||
|
has_remaining_state_ids_to_purge,
|
||||||
|
)
|
||||||
|
return has_remaining_state_ids_to_purge
|
||||||
|
|
||||||
|
|
||||||
|
def _purge_events_and_data_ids(
|
||||||
|
instance: Recorder,
|
||||||
|
session: Session,
|
||||||
|
events_batch_size: int,
|
||||||
|
purge_before: datetime,
|
||||||
|
using_sqlite: bool,
|
||||||
|
) -> bool:
|
||||||
|
"""Purge states and linked attributes id in a batch.
|
||||||
|
|
||||||
|
Returns true if there are more states to purge.
|
||||||
|
"""
|
||||||
|
has_remaining_event_ids_to_purge = True
|
||||||
|
# There are more events relative to data_ids so
|
||||||
|
# we purge enough event_ids to try to generate a full
|
||||||
|
# size batch of data_ids that will be around the size
|
||||||
|
# MAX_ROWS_TO_PURGE
|
||||||
|
data_ids_batch: set[int] = set()
|
||||||
|
for _ in range(events_batch_size):
|
||||||
|
event_ids, data_ids = _select_event_data_ids_to_purge(session, purge_before)
|
||||||
|
if not event_ids:
|
||||||
|
has_remaining_event_ids_to_purge = False
|
||||||
|
break
|
||||||
|
_purge_event_ids(session, event_ids)
|
||||||
|
data_ids_batch = data_ids_batch | data_ids
|
||||||
|
|
||||||
|
_purge_unused_data_ids(instance, session, data_ids_batch, using_sqlite)
|
||||||
|
_LOGGER.debug(
|
||||||
|
"After purging event and data_ids remaining=%s",
|
||||||
|
has_remaining_event_ids_to_purge,
|
||||||
|
)
|
||||||
|
return has_remaining_event_ids_to_purge
|
||||||
|
|
||||||
|
|
||||||
|
def _select_state_attributes_ids_to_purge(
|
||||||
session: Session, purge_before: datetime
|
session: Session, purge_before: datetime
|
||||||
) -> tuple[set[int], set[int], set[int], set[int]]:
|
) -> tuple[set[int], set[int]]:
|
||||||
"""Return a list of event, state, and attribute ids to purge."""
|
"""Return sets of state and attribute ids to purge."""
|
||||||
events = session.execute(find_events_to_purge(purge_before)).all()
|
|
||||||
_LOGGER.debug("Selected %s event ids to remove", len(events))
|
|
||||||
states = session.execute(find_states_to_purge(purge_before)).all()
|
|
||||||
_LOGGER.debug("Selected %s state ids to remove", len(states))
|
|
||||||
event_ids = set()
|
|
||||||
state_ids = set()
|
state_ids = set()
|
||||||
attributes_ids = set()
|
attributes_ids = set()
|
||||||
data_ids = set()
|
for state in session.execute(find_states_to_purge(purge_before)).all():
|
||||||
for event in events:
|
|
||||||
event_ids.add(event.event_id)
|
|
||||||
if event.data_id:
|
|
||||||
data_ids.add(event.data_id)
|
|
||||||
for state in states:
|
|
||||||
state_ids.add(state.state_id)
|
state_ids.add(state.state_id)
|
||||||
if state.attributes_id:
|
if state.attributes_id:
|
||||||
attributes_ids.add(state.attributes_id)
|
attributes_ids.add(state.attributes_id)
|
||||||
return event_ids, state_ids, attributes_ids, data_ids
|
_LOGGER.debug(
|
||||||
|
"Selected %s state ids and %s attributes_ids to remove",
|
||||||
|
len(state_ids),
|
||||||
|
len(attributes_ids),
|
||||||
|
)
|
||||||
|
return state_ids, attributes_ids
|
||||||
|
|
||||||
|
|
||||||
|
def _select_event_data_ids_to_purge(
|
||||||
|
session: Session, purge_before: datetime
|
||||||
|
) -> tuple[set[int], set[int]]:
|
||||||
|
"""Return sets of event and data ids to purge."""
|
||||||
|
event_ids = set()
|
||||||
|
data_ids = set()
|
||||||
|
for event in session.execute(find_events_to_purge(purge_before)).all():
|
||||||
|
event_ids.add(event.event_id)
|
||||||
|
if event.data_id:
|
||||||
|
data_ids.add(event.data_id)
|
||||||
|
_LOGGER.debug(
|
||||||
|
"Selected %s event ids and %s data_ids to remove", len(event_ids), len(data_ids)
|
||||||
|
)
|
||||||
|
return event_ids, data_ids
|
||||||
|
|
||||||
|
|
||||||
def _select_unused_attributes_ids(
|
def _select_unused_attributes_ids(
|
||||||
|
@ -197,6 +327,18 @@ def _select_unused_attributes_ids(
|
||||||
return to_remove
|
return to_remove
|
||||||
|
|
||||||
|
|
||||||
|
def _purge_unused_attributes_ids(
|
||||||
|
instance: Recorder,
|
||||||
|
session: Session,
|
||||||
|
attributes_ids_batch: set[int],
|
||||||
|
using_sqlite: bool,
|
||||||
|
) -> None:
|
||||||
|
if unused_attribute_ids_set := _select_unused_attributes_ids(
|
||||||
|
session, attributes_ids_batch, using_sqlite
|
||||||
|
):
|
||||||
|
_purge_batch_attributes_ids(instance, session, unused_attribute_ids_set)
|
||||||
|
|
||||||
|
|
||||||
def _select_unused_event_data_ids(
|
def _select_unused_event_data_ids(
|
||||||
session: Session, data_ids: set[int], using_sqlite: bool
|
session: Session, data_ids: set[int], using_sqlite: bool
|
||||||
) -> set[int]:
|
) -> set[int]:
|
||||||
|
@ -229,6 +371,16 @@ def _select_unused_event_data_ids(
|
||||||
return to_remove
|
return to_remove
|
||||||
|
|
||||||
|
|
||||||
|
def _purge_unused_data_ids(
|
||||||
|
instance: Recorder, session: Session, data_ids_batch: set[int], using_sqlite: bool
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
if unused_data_ids_set := _select_unused_event_data_ids(
|
||||||
|
session, data_ids_batch, using_sqlite
|
||||||
|
):
|
||||||
|
_purge_batch_data_ids(instance, session, unused_data_ids_set)
|
||||||
|
|
||||||
|
|
||||||
def _select_statistics_runs_to_purge(
|
def _select_statistics_runs_to_purge(
|
||||||
session: Session, purge_before: datetime
|
session: Session, purge_before: datetime
|
||||||
) -> list[int]:
|
) -> list[int]:
|
||||||
|
@ -256,6 +408,34 @@ def _select_short_term_statistics_to_purge(
|
||||||
return [statistic.id for statistic in statistics]
|
return [statistic.id for statistic in statistics]
|
||||||
|
|
||||||
|
|
||||||
|
def _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
|
||||||
|
session: Session, purge_before: datetime
|
||||||
|
) -> tuple[set[int], set[int], set[int], set[int]]:
|
||||||
|
"""Return a list of event, state, and attribute ids to purge that are linked by the event_id.
|
||||||
|
|
||||||
|
We do not link these anymore since state_change events
|
||||||
|
do not exist in the events table anymore, however we
|
||||||
|
still need to be able to purge them.
|
||||||
|
"""
|
||||||
|
events = session.execute(
|
||||||
|
find_legacy_event_state_and_attributes_and_data_ids_to_purge(purge_before)
|
||||||
|
).all()
|
||||||
|
_LOGGER.debug("Selected %s event ids to remove", len(events))
|
||||||
|
event_ids = set()
|
||||||
|
state_ids = set()
|
||||||
|
attributes_ids = set()
|
||||||
|
data_ids = set()
|
||||||
|
for event in events:
|
||||||
|
event_ids.add(event.event_id)
|
||||||
|
if event.state_id:
|
||||||
|
state_ids.add(event.state_id)
|
||||||
|
if event.attributes_id:
|
||||||
|
attributes_ids.add(event.attributes_id)
|
||||||
|
if event.data_id:
|
||||||
|
data_ids.add(event.data_id)
|
||||||
|
return event_ids, state_ids, attributes_ids, data_ids
|
||||||
|
|
||||||
|
|
||||||
def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) -> None:
|
def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) -> None:
|
||||||
"""Disconnect states and delete by state id."""
|
"""Disconnect states and delete by state id."""
|
||||||
|
|
||||||
|
@ -327,24 +507,27 @@ def _evict_purged_attributes_from_attributes_cache(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _purge_attributes_ids(
|
def _purge_batch_attributes_ids(
|
||||||
instance: Recorder, session: Session, attributes_ids: set[int]
|
instance: Recorder, session: Session, attributes_ids: set[int]
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Delete old attributes ids."""
|
"""Delete old attributes ids in batches of MAX_ROWS_TO_PURGE."""
|
||||||
deleted_rows = session.execute(delete_states_attributes_rows(attributes_ids))
|
for attributes_ids_chunk in chunked(attributes_ids, MAX_ROWS_TO_PURGE):
|
||||||
_LOGGER.debug("Deleted %s attribute states", deleted_rows)
|
deleted_rows = session.execute(
|
||||||
|
delete_states_attributes_rows(attributes_ids_chunk)
|
||||||
|
)
|
||||||
|
_LOGGER.debug("Deleted %s attribute states", deleted_rows)
|
||||||
|
|
||||||
# Evict any entries in the state_attributes_ids cache referring to a purged state
|
# Evict any entries in the state_attributes_ids cache referring to a purged state
|
||||||
_evict_purged_attributes_from_attributes_cache(instance, attributes_ids)
|
_evict_purged_attributes_from_attributes_cache(instance, attributes_ids)
|
||||||
|
|
||||||
|
|
||||||
def _purge_event_data_ids(
|
def _purge_batch_data_ids(
|
||||||
instance: Recorder, session: Session, data_ids: set[int]
|
instance: Recorder, session: Session, data_ids: set[int]
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Delete old event data ids."""
|
"""Delete old event data ids in batches of MAX_ROWS_TO_PURGE."""
|
||||||
|
for data_ids_chunk in chunked(data_ids, MAX_ROWS_TO_PURGE):
|
||||||
deleted_rows = session.execute(delete_event_data_rows(data_ids))
|
deleted_rows = session.execute(delete_event_data_rows(data_ids_chunk))
|
||||||
_LOGGER.debug("Deleted %s data events", deleted_rows)
|
_LOGGER.debug("Deleted %s data events", deleted_rows)
|
||||||
|
|
||||||
# Evict any entries in the event_data_ids cache referring to a purged state
|
# Evict any entries in the event_data_ids cache referring to a purged state
|
||||||
_evict_purged_data_from_data_cache(instance, data_ids)
|
_evict_purged_data_from_data_cache(instance, data_ids)
|
||||||
|
@ -438,7 +621,7 @@ def _purge_filtered_states(
|
||||||
unused_attribute_ids_set = _select_unused_attributes_ids(
|
unused_attribute_ids_set = _select_unused_attributes_ids(
|
||||||
session, {id_ for id_ in attributes_ids if id_ is not None}, using_sqlite
|
session, {id_ for id_ in attributes_ids if id_ is not None}, using_sqlite
|
||||||
)
|
)
|
||||||
_purge_attributes_ids(instance, session, unused_attribute_ids_set)
|
_purge_batch_attributes_ids(instance, session, unused_attribute_ids_set)
|
||||||
|
|
||||||
|
|
||||||
def _purge_filtered_events(
|
def _purge_filtered_events(
|
||||||
|
@ -466,7 +649,7 @@ def _purge_filtered_events(
|
||||||
if unused_data_ids_set := _select_unused_event_data_ids(
|
if unused_data_ids_set := _select_unused_event_data_ids(
|
||||||
session, set(data_ids), using_sqlite
|
session, set(data_ids), using_sqlite
|
||||||
):
|
):
|
||||||
_purge_event_data_ids(instance, session, unused_data_ids_set)
|
_purge_batch_data_ids(instance, session, unused_data_ids_set)
|
||||||
if EVENT_STATE_CHANGED in excluded_event_types:
|
if EVENT_STATE_CHANGED in excluded_event_types:
|
||||||
session.query(StateAttributes).delete(synchronize_session=False)
|
session.query(StateAttributes).delete(synchronize_session=False)
|
||||||
instance._state_attributes_ids = {} # pylint: disable=protected-access
|
instance._state_attributes_ids = {} # pylint: disable=protected-access
|
||||||
|
|
|
@ -621,3 +621,22 @@ def find_statistics_runs_to_purge(
|
||||||
def find_latest_statistics_runs_run_id() -> StatementLambdaElement:
|
def find_latest_statistics_runs_run_id() -> StatementLambdaElement:
|
||||||
"""Find the latest statistics_runs run_id."""
|
"""Find the latest statistics_runs run_id."""
|
||||||
return lambda_stmt(lambda: select(func.max(StatisticsRuns.run_id)))
|
return lambda_stmt(lambda: select(func.max(StatisticsRuns.run_id)))
|
||||||
|
|
||||||
|
|
||||||
|
def find_legacy_event_state_and_attributes_and_data_ids_to_purge(
|
||||||
|
purge_before: datetime,
|
||||||
|
) -> StatementLambdaElement:
|
||||||
|
"""Find the latest row in the legacy format to purge."""
|
||||||
|
return lambda_stmt(
|
||||||
|
lambda: select(
|
||||||
|
Events.event_id, Events.data_id, States.state_id, States.attributes_id
|
||||||
|
)
|
||||||
|
.join(States, Events.event_id == States.event_id)
|
||||||
|
.filter(Events.time_fired < purge_before)
|
||||||
|
.limit(MAX_ROWS_TO_PURGE)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def find_legacy_row() -> StatementLambdaElement:
|
||||||
|
"""Check if there are still states in the table with an event_id."""
|
||||||
|
return lambda_stmt(lambda: select(func.max(States.event_id)))
|
||||||
|
|
|
@ -11,6 +11,7 @@ from sqlalchemy.orm.session import Session
|
||||||
from homeassistant.components import recorder
|
from homeassistant.components import recorder
|
||||||
from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE, SupportedDialect
|
from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE, SupportedDialect
|
||||||
from homeassistant.components.recorder.models import (
|
from homeassistant.components.recorder.models import (
|
||||||
|
EventData,
|
||||||
Events,
|
Events,
|
||||||
RecorderRuns,
|
RecorderRuns,
|
||||||
StateAttributes,
|
StateAttributes,
|
||||||
|
@ -76,7 +77,13 @@ async def test_purge_old_states(
|
||||||
purge_before = dt_util.utcnow() - timedelta(days=4)
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
||||||
|
|
||||||
# run purge_old_data()
|
# run purge_old_data()
|
||||||
finished = purge_old_data(instance, purge_before, repack=False)
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
purge_before,
|
||||||
|
states_batch_size=1,
|
||||||
|
events_batch_size=1,
|
||||||
|
repack=False,
|
||||||
|
)
|
||||||
assert not finished
|
assert not finished
|
||||||
assert states.count() == 2
|
assert states.count() == 2
|
||||||
assert state_attributes.count() == 1
|
assert state_attributes.count() == 1
|
||||||
|
@ -96,7 +103,13 @@ async def test_purge_old_states(
|
||||||
|
|
||||||
# run purge_old_data again
|
# run purge_old_data again
|
||||||
purge_before = dt_util.utcnow()
|
purge_before = dt_util.utcnow()
|
||||||
finished = purge_old_data(instance, purge_before, repack=False)
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
purge_before,
|
||||||
|
states_batch_size=1,
|
||||||
|
events_batch_size=1,
|
||||||
|
repack=False,
|
||||||
|
)
|
||||||
assert not finished
|
assert not finished
|
||||||
assert states.count() == 0
|
assert states.count() == 0
|
||||||
assert state_attributes.count() == 0
|
assert state_attributes.count() == 0
|
||||||
|
@ -223,12 +236,24 @@ async def test_purge_old_events(
|
||||||
purge_before = dt_util.utcnow() - timedelta(days=4)
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
||||||
|
|
||||||
# run purge_old_data()
|
# run purge_old_data()
|
||||||
finished = purge_old_data(instance, purge_before, repack=False)
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
purge_before,
|
||||||
|
repack=False,
|
||||||
|
events_batch_size=1,
|
||||||
|
states_batch_size=1,
|
||||||
|
)
|
||||||
assert not finished
|
assert not finished
|
||||||
assert events.count() == 2
|
assert events.count() == 2
|
||||||
|
|
||||||
# we should only have 2 events left
|
# we should only have 2 events left
|
||||||
finished = purge_old_data(instance, purge_before, repack=False)
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
purge_before,
|
||||||
|
repack=False,
|
||||||
|
events_batch_size=1,
|
||||||
|
states_batch_size=1,
|
||||||
|
)
|
||||||
assert finished
|
assert finished
|
||||||
assert events.count() == 2
|
assert events.count() == 2
|
||||||
|
|
||||||
|
@ -249,10 +274,22 @@ async def test_purge_old_recorder_runs(
|
||||||
purge_before = dt_util.utcnow()
|
purge_before = dt_util.utcnow()
|
||||||
|
|
||||||
# run purge_old_data()
|
# run purge_old_data()
|
||||||
finished = purge_old_data(instance, purge_before, repack=False)
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
purge_before,
|
||||||
|
repack=False,
|
||||||
|
events_batch_size=1,
|
||||||
|
states_batch_size=1,
|
||||||
|
)
|
||||||
assert not finished
|
assert not finished
|
||||||
|
|
||||||
finished = purge_old_data(instance, purge_before, repack=False)
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
purge_before,
|
||||||
|
repack=False,
|
||||||
|
events_batch_size=1,
|
||||||
|
states_batch_size=1,
|
||||||
|
)
|
||||||
assert finished
|
assert finished
|
||||||
assert recorder_runs.count() == 1
|
assert recorder_runs.count() == 1
|
||||||
|
|
||||||
|
@ -1271,7 +1308,7 @@ async def _add_test_states(hass: HomeAssistant):
|
||||||
await set_state("test.recorder2", state, attributes=attributes)
|
await set_state("test.recorder2", state, attributes=attributes)
|
||||||
|
|
||||||
|
|
||||||
async def _add_test_events(hass: HomeAssistant):
|
async def _add_test_events(hass: HomeAssistant, iterations: int = 1):
|
||||||
"""Add a few events for testing."""
|
"""Add a few events for testing."""
|
||||||
utcnow = dt_util.utcnow()
|
utcnow = dt_util.utcnow()
|
||||||
five_days_ago = utcnow - timedelta(days=5)
|
five_days_ago = utcnow - timedelta(days=5)
|
||||||
|
@ -1282,25 +1319,64 @@ async def _add_test_events(hass: HomeAssistant):
|
||||||
await async_wait_recording_done(hass)
|
await async_wait_recording_done(hass)
|
||||||
|
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
for event_id in range(6):
|
for _ in range(iterations):
|
||||||
if event_id < 2:
|
for event_id in range(6):
|
||||||
timestamp = eleven_days_ago
|
if event_id < 2:
|
||||||
event_type = "EVENT_TEST_AUTOPURGE"
|
timestamp = eleven_days_ago
|
||||||
elif event_id < 4:
|
event_type = "EVENT_TEST_AUTOPURGE"
|
||||||
timestamp = five_days_ago
|
elif event_id < 4:
|
||||||
event_type = "EVENT_TEST_PURGE"
|
timestamp = five_days_ago
|
||||||
else:
|
event_type = "EVENT_TEST_PURGE"
|
||||||
timestamp = utcnow
|
else:
|
||||||
event_type = "EVENT_TEST"
|
timestamp = utcnow
|
||||||
|
event_type = "EVENT_TEST"
|
||||||
|
|
||||||
session.add(
|
session.add(
|
||||||
Events(
|
Events(
|
||||||
event_type=event_type,
|
event_type=event_type,
|
||||||
event_data=json.dumps(event_data),
|
event_data=json.dumps(event_data),
|
||||||
origin="LOCAL",
|
origin="LOCAL",
|
||||||
time_fired=timestamp,
|
time_fired=timestamp,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _add_events_with_event_data(hass: HomeAssistant, iterations: int = 1):
|
||||||
|
"""Add a few events with linked event_data for testing."""
|
||||||
|
utcnow = dt_util.utcnow()
|
||||||
|
five_days_ago = utcnow - timedelta(days=5)
|
||||||
|
eleven_days_ago = utcnow - timedelta(days=11)
|
||||||
|
event_data = {"test_attr": 5, "test_attr_10": "nice"}
|
||||||
|
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
await async_wait_recording_done(hass)
|
||||||
|
|
||||||
|
with session_scope(hass=hass) as session:
|
||||||
|
for _ in range(iterations):
|
||||||
|
for event_id in range(6):
|
||||||
|
if event_id < 2:
|
||||||
|
timestamp = eleven_days_ago
|
||||||
|
event_type = "EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA"
|
||||||
|
shared_data = '{"type":{"EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA"}'
|
||||||
|
elif event_id < 4:
|
||||||
|
timestamp = five_days_ago
|
||||||
|
event_type = "EVENT_TEST_PURGE_WITH_EVENT_DATA"
|
||||||
|
shared_data = '{"type":{"EVENT_TEST_PURGE_WITH_EVENT_DATA"}'
|
||||||
|
else:
|
||||||
|
timestamp = utcnow
|
||||||
|
event_type = "EVENT_TEST_WITH_EVENT_DATA"
|
||||||
|
shared_data = '{"type":{"EVENT_TEST_WITH_EVENT_DATA"}'
|
||||||
|
|
||||||
|
event_data = EventData(hash=1234, shared_data=shared_data)
|
||||||
|
|
||||||
|
session.add(
|
||||||
|
Events(
|
||||||
|
event_type=event_type,
|
||||||
|
origin="LOCAL",
|
||||||
|
time_fired=timestamp,
|
||||||
|
event_data_rel=event_data,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def _add_test_statistics(hass: HomeAssistant):
|
async def _add_test_statistics(hass: HomeAssistant):
|
||||||
|
@ -1384,6 +1460,29 @@ async def _add_test_statistics_runs(hass: HomeAssistant):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _add_state_without_event_linkage(
|
||||||
|
session: Session,
|
||||||
|
entity_id: str,
|
||||||
|
state: str,
|
||||||
|
timestamp: datetime,
|
||||||
|
):
|
||||||
|
state_attrs = StateAttributes(
|
||||||
|
hash=1234, shared_attrs=json.dumps({entity_id: entity_id})
|
||||||
|
)
|
||||||
|
session.add(state_attrs)
|
||||||
|
session.add(
|
||||||
|
States(
|
||||||
|
entity_id=entity_id,
|
||||||
|
state=state,
|
||||||
|
attributes=None,
|
||||||
|
last_changed=timestamp,
|
||||||
|
last_updated=timestamp,
|
||||||
|
event_id=None,
|
||||||
|
state_attributes=state_attrs,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _add_state_and_state_changed_event(
|
def _add_state_and_state_changed_event(
|
||||||
session: Session,
|
session: Session,
|
||||||
entity_id: str,
|
entity_id: str,
|
||||||
|
@ -1416,3 +1515,149 @@ def _add_state_and_state_changed_event(
|
||||||
time_fired=timestamp,
|
time_fired=timestamp,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_purge_many_old_events(
|
||||||
|
hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT
|
||||||
|
):
|
||||||
|
"""Test deleting old events."""
|
||||||
|
instance = await async_setup_recorder_instance(hass)
|
||||||
|
|
||||||
|
await _add_test_events(hass, MAX_ROWS_TO_PURGE)
|
||||||
|
|
||||||
|
with session_scope(hass=hass) as session:
|
||||||
|
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
|
||||||
|
event_datas = session.query(EventData)
|
||||||
|
assert events.count() == MAX_ROWS_TO_PURGE * 6
|
||||||
|
assert event_datas.count() == 5
|
||||||
|
|
||||||
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
||||||
|
|
||||||
|
# run purge_old_data()
|
||||||
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
purge_before,
|
||||||
|
repack=False,
|
||||||
|
states_batch_size=3,
|
||||||
|
events_batch_size=3,
|
||||||
|
)
|
||||||
|
assert not finished
|
||||||
|
assert events.count() == MAX_ROWS_TO_PURGE * 3
|
||||||
|
assert event_datas.count() == 5
|
||||||
|
|
||||||
|
# we should only have 2 groups of events left
|
||||||
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
purge_before,
|
||||||
|
repack=False,
|
||||||
|
states_batch_size=3,
|
||||||
|
events_batch_size=3,
|
||||||
|
)
|
||||||
|
assert finished
|
||||||
|
assert events.count() == MAX_ROWS_TO_PURGE * 2
|
||||||
|
assert event_datas.count() == 5
|
||||||
|
|
||||||
|
# we should now purge everything
|
||||||
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
dt_util.utcnow(),
|
||||||
|
repack=False,
|
||||||
|
states_batch_size=20,
|
||||||
|
events_batch_size=20,
|
||||||
|
)
|
||||||
|
assert finished
|
||||||
|
assert events.count() == 0
|
||||||
|
assert event_datas.count() == 0
|
||||||
|
|
||||||
|
|
||||||
|
async def test_purge_can_mix_legacy_and_new_format(
|
||||||
|
hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT
|
||||||
|
):
|
||||||
|
"""Test purging with legacy a new events."""
|
||||||
|
instance = await async_setup_recorder_instance(hass)
|
||||||
|
utcnow = dt_util.utcnow()
|
||||||
|
eleven_days_ago = utcnow - timedelta(days=11)
|
||||||
|
with session_scope(hass=hass) as session:
|
||||||
|
broken_state_no_time = States(
|
||||||
|
event_id=None,
|
||||||
|
entity_id="orphened.state",
|
||||||
|
last_updated=None,
|
||||||
|
last_changed=None,
|
||||||
|
)
|
||||||
|
session.add(broken_state_no_time)
|
||||||
|
start_id = 50000
|
||||||
|
for event_id in range(start_id, start_id + 50):
|
||||||
|
_add_state_and_state_changed_event(
|
||||||
|
session,
|
||||||
|
"sensor.excluded",
|
||||||
|
"purgeme",
|
||||||
|
eleven_days_ago,
|
||||||
|
event_id,
|
||||||
|
)
|
||||||
|
await _add_test_events(hass, 50)
|
||||||
|
await _add_events_with_event_data(hass, 50)
|
||||||
|
with session_scope(hass=hass) as session:
|
||||||
|
for _ in range(50):
|
||||||
|
_add_state_without_event_linkage(
|
||||||
|
session, "switch.random", "on", eleven_days_ago
|
||||||
|
)
|
||||||
|
states_with_event_id = session.query(States).filter(
|
||||||
|
States.event_id.is_not(None)
|
||||||
|
)
|
||||||
|
states_without_event_id = session.query(States).filter(
|
||||||
|
States.event_id.is_(None)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert states_with_event_id.count() == 50
|
||||||
|
assert states_without_event_id.count() == 51
|
||||||
|
|
||||||
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
||||||
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
purge_before,
|
||||||
|
repack=False,
|
||||||
|
)
|
||||||
|
assert not finished
|
||||||
|
assert states_with_event_id.count() == 0
|
||||||
|
assert states_without_event_id.count() == 51
|
||||||
|
# At this point all the legacy states are gone
|
||||||
|
# and we switch methods
|
||||||
|
purge_before = dt_util.utcnow() - timedelta(days=4)
|
||||||
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
purge_before,
|
||||||
|
repack=False,
|
||||||
|
events_batch_size=1,
|
||||||
|
states_batch_size=1,
|
||||||
|
)
|
||||||
|
# Since we only allow one iteration, we won't
|
||||||
|
# check if we are finished this loop similar
|
||||||
|
# to the legacy method
|
||||||
|
assert not finished
|
||||||
|
assert states_with_event_id.count() == 0
|
||||||
|
assert states_without_event_id.count() == 1
|
||||||
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
purge_before,
|
||||||
|
repack=False,
|
||||||
|
events_batch_size=100,
|
||||||
|
states_batch_size=100,
|
||||||
|
)
|
||||||
|
assert finished
|
||||||
|
assert states_with_event_id.count() == 0
|
||||||
|
assert states_without_event_id.count() == 1
|
||||||
|
_add_state_without_event_linkage(
|
||||||
|
session, "switch.random", "on", eleven_days_ago
|
||||||
|
)
|
||||||
|
assert states_with_event_id.count() == 0
|
||||||
|
assert states_without_event_id.count() == 2
|
||||||
|
finished = purge_old_data(
|
||||||
|
instance,
|
||||||
|
purge_before,
|
||||||
|
repack=False,
|
||||||
|
)
|
||||||
|
assert finished
|
||||||
|
# The broken state without a timestamp
|
||||||
|
# does not prevent future purges. Its ignored.
|
||||||
|
assert states_with_event_id.count() == 0
|
||||||
|
assert states_without_event_id.count() == 1
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue