diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 52dabad1faf..aadc8e61fa1 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -335,7 +335,7 @@ class Recorder(threading.Thread): self.event_session = self.get_session() # Use a session for the event read loop # with a commit every time the event time - # has changed. This reduces the disk io. + # has changed. This reduces the disk io. while True: event = self.queue.get() if event is None: @@ -344,7 +344,9 @@ class Recorder(threading.Thread): self.queue.task_done() return if isinstance(event, PurgeTask): - purge.purge_old_data(self, event.keep_days, event.repack) + # Schedule a new purge task if this one didn't finish + if not purge.purge_old_data(self, event.keep_days, event.repack): + self.queue.put(PurgeTask(event.keep_days, event.repack)) self.queue.task_done() continue if event.event_type == EVENT_TIME_CHANGED: diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index 03c81726310..0566faf1c4d 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -64,7 +64,7 @@ class Events(Base): # type: ignore context_parent_id=event.context.parent_id, ) - def to_native(self): + def to_native(self, validate_entity_id=True): """Convert to a natve HA Event.""" context = Context( id=self.context_id, @@ -183,7 +183,7 @@ class RecorderRuns(Base): # type: ignore return [row[0] for row in query] - def to_native(self): + def to_native(self, validate_entity_id=True): """Return self, native format is this model.""" return self diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 78d92b8b65e..833926af219 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -7,32 +7,62 @@ from sqlalchemy.exc import SQLAlchemyError import homeassistant.util.dt as dt_util from .models import Events, RecorderRuns, States -from .util import session_scope +from .util import execute, session_scope _LOGGER = logging.getLogger(__name__) -def purge_old_data(instance, purge_days, repack): - """Purge events and states older than purge_days ago.""" +def purge_old_data(instance, purge_days: int, repack: bool) -> bool: + """Purge events and states older than purge_days ago. + + Cleans up an timeframe of an hour, based on the oldest record. + """ purge_before = dt_util.utcnow() - timedelta(days=purge_days) _LOGGER.debug("Purging events before %s", purge_before) try: with session_scope(session=instance.get_session()) as session: - deleted_rows = ( + query = session.query(States).order_by(States.last_updated.asc()).limit(1) + states = execute(query, to_native=True, validate_entity_ids=False) + + states_purge_before = purge_before + if states: + states_purge_before = min( + purge_before, states[0].last_updated + timedelta(hours=1) + ) + + deleted_rows_states = ( session.query(States) - .filter(States.last_updated < purge_before) + .filter(States.last_updated < states_purge_before) .delete(synchronize_session=False) ) - _LOGGER.debug("Deleted %s states", deleted_rows) + _LOGGER.debug("Deleted %s states", deleted_rows_states) - deleted_rows = ( + query = session.query(Events).order_by(Events.time_fired.asc()).limit(1) + events = execute(query, to_native=True) + + events_purge_before = purge_before + if events: + events_purge_before = min( + purge_before, events[0].time_fired + timedelta(hours=1) + ) + + deleted_rows_events = ( session.query(Events) - .filter(Events.time_fired < purge_before) + .filter(Events.time_fired < events_purge_before) .delete(synchronize_session=False) ) - _LOGGER.debug("Deleted %s events", deleted_rows) + _LOGGER.debug("Deleted %s events", deleted_rows_events) + # If states or events purging isn't processing the purge_before yet, + # return false, as we are not done yet. + if (states_purge_before and states_purge_before != purge_before) or ( + events_purge_before and events_purge_before != purge_before + ): + _LOGGER.debug("Purging hasn't fully completed yet.") + return False + + # Recorder runs is small, no need to batch run it deleted_rows = ( session.query(RecorderRuns) .filter(RecorderRuns.start < purge_before) @@ -52,3 +82,5 @@ def purge_old_data(instance, purge_days, repack): except SQLAlchemyError as err: _LOGGER.warning("Error purging history: %s.", err) + + return True diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index d4dfa0ecc1e..843609cf308 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -317,7 +317,7 @@ def test_auto_purge(hass_recorder): test_time = tz.localize(datetime(2020, 1, 1, 4, 12, 0)) with patch( - "homeassistant.components.recorder.purge.purge_old_data" + "homeassistant.components.recorder.purge.purge_old_data", return_value=True ) as purge_old_data: for delta in (-1, 0, 1): hass.bus.fire( diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 05a184a8608..afcb1b2818f 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -130,9 +130,16 @@ class TestRecorderPurge(unittest.TestCase): assert states.count() == 6 # run purge_old_data() - purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) + finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) + assert not finished + assert states.count() == 4 - # we should only have 2 states left after purging + finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) + assert not finished + assert states.count() == 2 + + finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) + assert finished assert states.count() == 2 def test_purge_old_events(self): @@ -144,9 +151,17 @@ class TestRecorderPurge(unittest.TestCase): assert events.count() == 6 # run purge_old_data() - purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) + finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) + assert not finished + assert events.count() == 4 + + finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) + assert not finished + assert events.count() == 2 # we should only have 2 events left + finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) + assert finished assert events.count() == 2 def test_purge_method(self):