diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index f5498e941d3..d7bb59bdeb1 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -130,10 +130,10 @@ class Events(Base): # type: ignore[misc,valid-type] {"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"}, ) __tablename__ = TABLE_EVENTS - event_id = Column(Integer, Identity(), primary_key=True) # no longer used + event_id = Column(Integer, Identity(), primary_key=True) event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE)) event_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql")) - origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used + origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used for new rows origin_idx = Column(SmallInteger) time_fired = Column(DATETIME_TYPE, index=True) context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True) @@ -247,8 +247,10 @@ class States(Base): # type: ignore[misc,valid-type] state_id = Column(Integer, Identity(), primary_key=True) entity_id = Column(String(MAX_LENGTH_STATE_ENTITY_ID)) state = Column(String(MAX_LENGTH_STATE_STATE)) - attributes = Column(Text().with_variant(mysql.LONGTEXT, "mysql")) - event_id = Column( + attributes = Column( + Text().with_variant(mysql.LONGTEXT, "mysql") + ) # no longer used for new rows + event_id = Column( # no longer used for new rows Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True ) last_changed = Column(DATETIME_TYPE) diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index f94f3d3d641..432e3993add 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -3,9 +3,10 @@ from __future__ import annotations from collections.abc import Callable, Iterable from datetime import datetime -from itertools import zip_longest +from functools import partial +from itertools import islice, zip_longest import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import distinct @@ -29,6 +30,8 @@ from .queries import ( disconnect_states_rows, find_events_to_purge, find_latest_statistics_runs_run_id, + find_legacy_event_state_and_attributes_and_data_ids_to_purge, + find_legacy_row, find_short_term_statistics_to_purge, find_states_to_purge, find_statistics_runs_to_purge, @@ -42,9 +45,34 @@ if TYPE_CHECKING: _LOGGER = logging.getLogger(__name__) +DEFAULT_STATES_BATCHES_PER_PURGE = 20 # We expect ~95% de-dupe rate +DEFAULT_EVENTS_BATCHES_PER_PURGE = 15 # We expect ~92% de-dupe rate + + +def take(take_num: int, iterable: Iterable) -> list[Any]: + """Return first n items of the iterable as a list. + + From itertools recipes + """ + return list(islice(iterable, take_num)) + + +def chunked(iterable: Iterable, chunked_num: int) -> Iterable[Any]: + """Break *iterable* into lists of length *n*. + + From more-itertools + """ + return iter(partial(take, chunked_num, iter(iterable)), []) + + @retryable_database_job("purge") def purge_old_data( - instance: Recorder, purge_before: datetime, repack: bool, apply_filter: bool = False + instance: Recorder, + purge_before: datetime, + repack: bool, + apply_filter: bool = False, + events_batch_size: int = DEFAULT_EVENTS_BATCHES_PER_PURGE, + states_batch_size: int = DEFAULT_STATES_BATCHES_PER_PURGE, ) -> bool: """Purge events and states older than purge_before. @@ -58,40 +86,37 @@ def purge_old_data( with session_scope(session=instance.get_session()) as session: # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record - ( - event_ids, - state_ids, - attributes_ids, - data_ids, - ) = _select_event_state_attributes_ids_data_ids_to_purge(session, purge_before) + has_more_to_purge = False + if _purging_legacy_format(session): + _LOGGER.debug( + "Purge running in legacy format as there are states with event_id remaining" + ) + has_more_to_purge |= _purge_legacy_format( + instance, session, purge_before, using_sqlite + ) + else: + _LOGGER.debug( + "Purge running in new format as there are NO states with event_id remaining" + ) + # Once we are done purging legacy rows, we use the new method + has_more_to_purge |= _purge_states_and_attributes_ids( + instance, session, states_batch_size, purge_before, using_sqlite + ) + has_more_to_purge |= _purge_events_and_data_ids( + instance, session, events_batch_size, purge_before, using_sqlite + ) + statistics_runs = _select_statistics_runs_to_purge(session, purge_before) short_term_statistics = _select_short_term_statistics_to_purge( session, purge_before ) - - if state_ids: - _purge_state_ids(instance, session, state_ids) - - if unused_attribute_ids_set := _select_unused_attributes_ids( - session, attributes_ids, using_sqlite - ): - _purge_attributes_ids(instance, session, unused_attribute_ids_set) - - if event_ids: - _purge_event_ids(session, event_ids) - - if unused_data_ids_set := _select_unused_event_data_ids( - session, data_ids, using_sqlite - ): - _purge_event_data_ids(instance, session, unused_data_ids_set) - if statistics_runs: _purge_statistics_runs(session, statistics_runs) if short_term_statistics: _purge_short_term_statistics(session, short_term_statistics) - if state_ids or event_ids or statistics_runs or short_term_statistics: + if has_more_to_purge or statistics_runs or short_term_statistics: # Return false, as we might not be done yet. _LOGGER.debug("Purging hasn't fully completed yet") return False @@ -106,27 +131,132 @@ def purge_old_data( return True -def _select_event_state_attributes_ids_data_ids_to_purge( +def _purging_legacy_format(session: Session) -> bool: + """Check if there are any legacy event_id linked states rows remaining.""" + return bool(session.execute(find_legacy_row()).scalar()) + + +def _purge_legacy_format( + instance: Recorder, session: Session, purge_before: datetime, using_sqlite: bool +) -> bool: + """Purge rows that are still linked by the event_ids.""" + ( + event_ids, + state_ids, + attributes_ids, + data_ids, + ) = _select_legacy_event_state_and_attributes_and_data_ids_to_purge( + session, purge_before + ) + if state_ids: + _purge_state_ids(instance, session, state_ids) + _purge_unused_attributes_ids(instance, session, attributes_ids, using_sqlite) + if event_ids: + _purge_event_ids(session, event_ids) + _purge_unused_data_ids(instance, session, data_ids, using_sqlite) + return bool(event_ids or state_ids or attributes_ids or data_ids) + + +def _purge_states_and_attributes_ids( + instance: Recorder, + session: Session, + states_batch_size: int, + purge_before: datetime, + using_sqlite: bool, +) -> bool: + """Purge states and linked attributes id in a batch. + + Returns true if there are more states to purge. + """ + has_remaining_state_ids_to_purge = True + # There are more states relative to attributes_ids so + # we purge enough state_ids to try to generate a full + # size batch of attributes_ids that will be around the size + # MAX_ROWS_TO_PURGE + attributes_ids_batch: set[int] = set() + for _ in range(states_batch_size): + state_ids, attributes_ids = _select_state_attributes_ids_to_purge( + session, purge_before + ) + if not state_ids: + has_remaining_state_ids_to_purge = False + break + _purge_state_ids(instance, session, state_ids) + attributes_ids_batch = attributes_ids_batch | attributes_ids + + _purge_unused_attributes_ids(instance, session, attributes_ids_batch, using_sqlite) + _LOGGER.debug( + "After purging states and attributes_ids remaining=%s", + has_remaining_state_ids_to_purge, + ) + return has_remaining_state_ids_to_purge + + +def _purge_events_and_data_ids( + instance: Recorder, + session: Session, + events_batch_size: int, + purge_before: datetime, + using_sqlite: bool, +) -> bool: + """Purge states and linked attributes id in a batch. + + Returns true if there are more states to purge. + """ + has_remaining_event_ids_to_purge = True + # There are more events relative to data_ids so + # we purge enough event_ids to try to generate a full + # size batch of data_ids that will be around the size + # MAX_ROWS_TO_PURGE + data_ids_batch: set[int] = set() + for _ in range(events_batch_size): + event_ids, data_ids = _select_event_data_ids_to_purge(session, purge_before) + if not event_ids: + has_remaining_event_ids_to_purge = False + break + _purge_event_ids(session, event_ids) + data_ids_batch = data_ids_batch | data_ids + + _purge_unused_data_ids(instance, session, data_ids_batch, using_sqlite) + _LOGGER.debug( + "After purging event and data_ids remaining=%s", + has_remaining_event_ids_to_purge, + ) + return has_remaining_event_ids_to_purge + + +def _select_state_attributes_ids_to_purge( session: Session, purge_before: datetime -) -> tuple[set[int], set[int], set[int], set[int]]: - """Return a list of event, state, and attribute ids to purge.""" - events = session.execute(find_events_to_purge(purge_before)).all() - _LOGGER.debug("Selected %s event ids to remove", len(events)) - states = session.execute(find_states_to_purge(purge_before)).all() - _LOGGER.debug("Selected %s state ids to remove", len(states)) - event_ids = set() +) -> tuple[set[int], set[int]]: + """Return sets of state and attribute ids to purge.""" state_ids = set() attributes_ids = set() - data_ids = set() - for event in events: - event_ids.add(event.event_id) - if event.data_id: - data_ids.add(event.data_id) - for state in states: + for state in session.execute(find_states_to_purge(purge_before)).all(): state_ids.add(state.state_id) if state.attributes_id: attributes_ids.add(state.attributes_id) - return event_ids, state_ids, attributes_ids, data_ids + _LOGGER.debug( + "Selected %s state ids and %s attributes_ids to remove", + len(state_ids), + len(attributes_ids), + ) + return state_ids, attributes_ids + + +def _select_event_data_ids_to_purge( + session: Session, purge_before: datetime +) -> tuple[set[int], set[int]]: + """Return sets of event and data ids to purge.""" + event_ids = set() + data_ids = set() + for event in session.execute(find_events_to_purge(purge_before)).all(): + event_ids.add(event.event_id) + if event.data_id: + data_ids.add(event.data_id) + _LOGGER.debug( + "Selected %s event ids and %s data_ids to remove", len(event_ids), len(data_ids) + ) + return event_ids, data_ids def _select_unused_attributes_ids( @@ -197,6 +327,18 @@ def _select_unused_attributes_ids( return to_remove +def _purge_unused_attributes_ids( + instance: Recorder, + session: Session, + attributes_ids_batch: set[int], + using_sqlite: bool, +) -> None: + if unused_attribute_ids_set := _select_unused_attributes_ids( + session, attributes_ids_batch, using_sqlite + ): + _purge_batch_attributes_ids(instance, session, unused_attribute_ids_set) + + def _select_unused_event_data_ids( session: Session, data_ids: set[int], using_sqlite: bool ) -> set[int]: @@ -229,6 +371,16 @@ def _select_unused_event_data_ids( return to_remove +def _purge_unused_data_ids( + instance: Recorder, session: Session, data_ids_batch: set[int], using_sqlite: bool +) -> None: + + if unused_data_ids_set := _select_unused_event_data_ids( + session, data_ids_batch, using_sqlite + ): + _purge_batch_data_ids(instance, session, unused_data_ids_set) + + def _select_statistics_runs_to_purge( session: Session, purge_before: datetime ) -> list[int]: @@ -256,6 +408,34 @@ def _select_short_term_statistics_to_purge( return [statistic.id for statistic in statistics] +def _select_legacy_event_state_and_attributes_and_data_ids_to_purge( + session: Session, purge_before: datetime +) -> tuple[set[int], set[int], set[int], set[int]]: + """Return a list of event, state, and attribute ids to purge that are linked by the event_id. + + We do not link these anymore since state_change events + do not exist in the events table anymore, however we + still need to be able to purge them. + """ + events = session.execute( + find_legacy_event_state_and_attributes_and_data_ids_to_purge(purge_before) + ).all() + _LOGGER.debug("Selected %s event ids to remove", len(events)) + event_ids = set() + state_ids = set() + attributes_ids = set() + data_ids = set() + for event in events: + event_ids.add(event.event_id) + if event.state_id: + state_ids.add(event.state_id) + if event.attributes_id: + attributes_ids.add(event.attributes_id) + if event.data_id: + data_ids.add(event.data_id) + return event_ids, state_ids, attributes_ids, data_ids + + def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) -> None: """Disconnect states and delete by state id.""" @@ -327,24 +507,27 @@ def _evict_purged_attributes_from_attributes_cache( ) -def _purge_attributes_ids( +def _purge_batch_attributes_ids( instance: Recorder, session: Session, attributes_ids: set[int] ) -> None: - """Delete old attributes ids.""" - deleted_rows = session.execute(delete_states_attributes_rows(attributes_ids)) - _LOGGER.debug("Deleted %s attribute states", deleted_rows) + """Delete old attributes ids in batches of MAX_ROWS_TO_PURGE.""" + for attributes_ids_chunk in chunked(attributes_ids, MAX_ROWS_TO_PURGE): + deleted_rows = session.execute( + delete_states_attributes_rows(attributes_ids_chunk) + ) + _LOGGER.debug("Deleted %s attribute states", deleted_rows) # Evict any entries in the state_attributes_ids cache referring to a purged state _evict_purged_attributes_from_attributes_cache(instance, attributes_ids) -def _purge_event_data_ids( +def _purge_batch_data_ids( instance: Recorder, session: Session, data_ids: set[int] ) -> None: - """Delete old event data ids.""" - - deleted_rows = session.execute(delete_event_data_rows(data_ids)) - _LOGGER.debug("Deleted %s data events", deleted_rows) + """Delete old event data ids in batches of MAX_ROWS_TO_PURGE.""" + for data_ids_chunk in chunked(data_ids, MAX_ROWS_TO_PURGE): + deleted_rows = session.execute(delete_event_data_rows(data_ids_chunk)) + _LOGGER.debug("Deleted %s data events", deleted_rows) # Evict any entries in the event_data_ids cache referring to a purged state _evict_purged_data_from_data_cache(instance, data_ids) @@ -438,7 +621,7 @@ def _purge_filtered_states( unused_attribute_ids_set = _select_unused_attributes_ids( session, {id_ for id_ in attributes_ids if id_ is not None}, using_sqlite ) - _purge_attributes_ids(instance, session, unused_attribute_ids_set) + _purge_batch_attributes_ids(instance, session, unused_attribute_ids_set) def _purge_filtered_events( @@ -466,7 +649,7 @@ def _purge_filtered_events( if unused_data_ids_set := _select_unused_event_data_ids( session, set(data_ids), using_sqlite ): - _purge_event_data_ids(instance, session, unused_data_ids_set) + _purge_batch_data_ids(instance, session, unused_data_ids_set) if EVENT_STATE_CHANGED in excluded_event_types: session.query(StateAttributes).delete(synchronize_session=False) instance._state_attributes_ids = {} # pylint: disable=protected-access diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index 76098663bd7..5532c5c0703 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -621,3 +621,22 @@ def find_statistics_runs_to_purge( def find_latest_statistics_runs_run_id() -> StatementLambdaElement: """Find the latest statistics_runs run_id.""" return lambda_stmt(lambda: select(func.max(StatisticsRuns.run_id))) + + +def find_legacy_event_state_and_attributes_and_data_ids_to_purge( + purge_before: datetime, +) -> StatementLambdaElement: + """Find the latest row in the legacy format to purge.""" + return lambda_stmt( + lambda: select( + Events.event_id, Events.data_id, States.state_id, States.attributes_id + ) + .join(States, Events.event_id == States.event_id) + .filter(Events.time_fired < purge_before) + .limit(MAX_ROWS_TO_PURGE) + ) + + +def find_legacy_row() -> StatementLambdaElement: + """Check if there are still states in the table with an event_id.""" + return lambda_stmt(lambda: select(func.max(States.event_id))) diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index de9db9d5014..c8ba5e9d076 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -11,6 +11,7 @@ from sqlalchemy.orm.session import Session from homeassistant.components import recorder from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE, SupportedDialect from homeassistant.components.recorder.models import ( + EventData, Events, RecorderRuns, StateAttributes, @@ -76,7 +77,13 @@ async def test_purge_old_states( purge_before = dt_util.utcnow() - timedelta(days=4) # run purge_old_data() - finished = purge_old_data(instance, purge_before, repack=False) + finished = purge_old_data( + instance, + purge_before, + states_batch_size=1, + events_batch_size=1, + repack=False, + ) assert not finished assert states.count() == 2 assert state_attributes.count() == 1 @@ -96,7 +103,13 @@ async def test_purge_old_states( # run purge_old_data again purge_before = dt_util.utcnow() - finished = purge_old_data(instance, purge_before, repack=False) + finished = purge_old_data( + instance, + purge_before, + states_batch_size=1, + events_batch_size=1, + repack=False, + ) assert not finished assert states.count() == 0 assert state_attributes.count() == 0 @@ -223,12 +236,24 @@ async def test_purge_old_events( purge_before = dt_util.utcnow() - timedelta(days=4) # run purge_old_data() - finished = purge_old_data(instance, purge_before, repack=False) + finished = purge_old_data( + instance, + purge_before, + repack=False, + events_batch_size=1, + states_batch_size=1, + ) assert not finished assert events.count() == 2 # we should only have 2 events left - finished = purge_old_data(instance, purge_before, repack=False) + finished = purge_old_data( + instance, + purge_before, + repack=False, + events_batch_size=1, + states_batch_size=1, + ) assert finished assert events.count() == 2 @@ -249,10 +274,22 @@ async def test_purge_old_recorder_runs( purge_before = dt_util.utcnow() # run purge_old_data() - finished = purge_old_data(instance, purge_before, repack=False) + finished = purge_old_data( + instance, + purge_before, + repack=False, + events_batch_size=1, + states_batch_size=1, + ) assert not finished - finished = purge_old_data(instance, purge_before, repack=False) + finished = purge_old_data( + instance, + purge_before, + repack=False, + events_batch_size=1, + states_batch_size=1, + ) assert finished assert recorder_runs.count() == 1 @@ -1271,7 +1308,7 @@ async def _add_test_states(hass: HomeAssistant): await set_state("test.recorder2", state, attributes=attributes) -async def _add_test_events(hass: HomeAssistant): +async def _add_test_events(hass: HomeAssistant, iterations: int = 1): """Add a few events for testing.""" utcnow = dt_util.utcnow() five_days_ago = utcnow - timedelta(days=5) @@ -1282,25 +1319,64 @@ async def _add_test_events(hass: HomeAssistant): await async_wait_recording_done(hass) with session_scope(hass=hass) as session: - for event_id in range(6): - if event_id < 2: - timestamp = eleven_days_ago - event_type = "EVENT_TEST_AUTOPURGE" - elif event_id < 4: - timestamp = five_days_ago - event_type = "EVENT_TEST_PURGE" - else: - timestamp = utcnow - event_type = "EVENT_TEST" + for _ in range(iterations): + for event_id in range(6): + if event_id < 2: + timestamp = eleven_days_ago + event_type = "EVENT_TEST_AUTOPURGE" + elif event_id < 4: + timestamp = five_days_ago + event_type = "EVENT_TEST_PURGE" + else: + timestamp = utcnow + event_type = "EVENT_TEST" - session.add( - Events( - event_type=event_type, - event_data=json.dumps(event_data), - origin="LOCAL", - time_fired=timestamp, + session.add( + Events( + event_type=event_type, + event_data=json.dumps(event_data), + origin="LOCAL", + time_fired=timestamp, + ) + ) + + +async def _add_events_with_event_data(hass: HomeAssistant, iterations: int = 1): + """Add a few events with linked event_data for testing.""" + utcnow = dt_util.utcnow() + five_days_ago = utcnow - timedelta(days=5) + eleven_days_ago = utcnow - timedelta(days=11) + event_data = {"test_attr": 5, "test_attr_10": "nice"} + + await hass.async_block_till_done() + await async_wait_recording_done(hass) + + with session_scope(hass=hass) as session: + for _ in range(iterations): + for event_id in range(6): + if event_id < 2: + timestamp = eleven_days_ago + event_type = "EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA" + shared_data = '{"type":{"EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA"}' + elif event_id < 4: + timestamp = five_days_ago + event_type = "EVENT_TEST_PURGE_WITH_EVENT_DATA" + shared_data = '{"type":{"EVENT_TEST_PURGE_WITH_EVENT_DATA"}' + else: + timestamp = utcnow + event_type = "EVENT_TEST_WITH_EVENT_DATA" + shared_data = '{"type":{"EVENT_TEST_WITH_EVENT_DATA"}' + + event_data = EventData(hash=1234, shared_data=shared_data) + + session.add( + Events( + event_type=event_type, + origin="LOCAL", + time_fired=timestamp, + event_data_rel=event_data, + ) ) - ) async def _add_test_statistics(hass: HomeAssistant): @@ -1384,6 +1460,29 @@ async def _add_test_statistics_runs(hass: HomeAssistant): ) +def _add_state_without_event_linkage( + session: Session, + entity_id: str, + state: str, + timestamp: datetime, +): + state_attrs = StateAttributes( + hash=1234, shared_attrs=json.dumps({entity_id: entity_id}) + ) + session.add(state_attrs) + session.add( + States( + entity_id=entity_id, + state=state, + attributes=None, + last_changed=timestamp, + last_updated=timestamp, + event_id=None, + state_attributes=state_attrs, + ) + ) + + def _add_state_and_state_changed_event( session: Session, entity_id: str, @@ -1416,3 +1515,149 @@ def _add_state_and_state_changed_event( time_fired=timestamp, ) ) + + +async def test_purge_many_old_events( + hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT +): + """Test deleting old events.""" + instance = await async_setup_recorder_instance(hass) + + await _add_test_events(hass, MAX_ROWS_TO_PURGE) + + with session_scope(hass=hass) as session: + events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) + event_datas = session.query(EventData) + assert events.count() == MAX_ROWS_TO_PURGE * 6 + assert event_datas.count() == 5 + + purge_before = dt_util.utcnow() - timedelta(days=4) + + # run purge_old_data() + finished = purge_old_data( + instance, + purge_before, + repack=False, + states_batch_size=3, + events_batch_size=3, + ) + assert not finished + assert events.count() == MAX_ROWS_TO_PURGE * 3 + assert event_datas.count() == 5 + + # we should only have 2 groups of events left + finished = purge_old_data( + instance, + purge_before, + repack=False, + states_batch_size=3, + events_batch_size=3, + ) + assert finished + assert events.count() == MAX_ROWS_TO_PURGE * 2 + assert event_datas.count() == 5 + + # we should now purge everything + finished = purge_old_data( + instance, + dt_util.utcnow(), + repack=False, + states_batch_size=20, + events_batch_size=20, + ) + assert finished + assert events.count() == 0 + assert event_datas.count() == 0 + + +async def test_purge_can_mix_legacy_and_new_format( + hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT +): + """Test purging with legacy a new events.""" + instance = await async_setup_recorder_instance(hass) + utcnow = dt_util.utcnow() + eleven_days_ago = utcnow - timedelta(days=11) + with session_scope(hass=hass) as session: + broken_state_no_time = States( + event_id=None, + entity_id="orphened.state", + last_updated=None, + last_changed=None, + ) + session.add(broken_state_no_time) + start_id = 50000 + for event_id in range(start_id, start_id + 50): + _add_state_and_state_changed_event( + session, + "sensor.excluded", + "purgeme", + eleven_days_ago, + event_id, + ) + await _add_test_events(hass, 50) + await _add_events_with_event_data(hass, 50) + with session_scope(hass=hass) as session: + for _ in range(50): + _add_state_without_event_linkage( + session, "switch.random", "on", eleven_days_ago + ) + states_with_event_id = session.query(States).filter( + States.event_id.is_not(None) + ) + states_without_event_id = session.query(States).filter( + States.event_id.is_(None) + ) + + assert states_with_event_id.count() == 50 + assert states_without_event_id.count() == 51 + + purge_before = dt_util.utcnow() - timedelta(days=4) + finished = purge_old_data( + instance, + purge_before, + repack=False, + ) + assert not finished + assert states_with_event_id.count() == 0 + assert states_without_event_id.count() == 51 + # At this point all the legacy states are gone + # and we switch methods + purge_before = dt_util.utcnow() - timedelta(days=4) + finished = purge_old_data( + instance, + purge_before, + repack=False, + events_batch_size=1, + states_batch_size=1, + ) + # Since we only allow one iteration, we won't + # check if we are finished this loop similar + # to the legacy method + assert not finished + assert states_with_event_id.count() == 0 + assert states_without_event_id.count() == 1 + finished = purge_old_data( + instance, + purge_before, + repack=False, + events_batch_size=100, + states_batch_size=100, + ) + assert finished + assert states_with_event_id.count() == 0 + assert states_without_event_id.count() == 1 + _add_state_without_event_linkage( + session, "switch.random", "on", eleven_days_ago + ) + assert states_with_event_id.count() == 0 + assert states_without_event_id.count() == 2 + finished = purge_old_data( + instance, + purge_before, + repack=False, + ) + assert finished + # The broken state without a timestamp + # does not prevent future purges. Its ignored. + assert states_with_event_id.count() == 0 + assert states_without_event_id.count() == 1