Fix recorder purging by batch processing purges (#37140)
This commit is contained in:
parent
748f1c3607
commit
976cbdd2aa
5 changed files with 66 additions and 17 deletions
|
@ -335,7 +335,7 @@ class Recorder(threading.Thread):
|
|||
self.event_session = self.get_session()
|
||||
# Use a session for the event read loop
|
||||
# with a commit every time the event time
|
||||
# has changed. This reduces the disk io.
|
||||
# has changed. This reduces the disk io.
|
||||
while True:
|
||||
event = self.queue.get()
|
||||
if event is None:
|
||||
|
@ -344,7 +344,9 @@ class Recorder(threading.Thread):
|
|||
self.queue.task_done()
|
||||
return
|
||||
if isinstance(event, PurgeTask):
|
||||
purge.purge_old_data(self, event.keep_days, event.repack)
|
||||
# Schedule a new purge task if this one didn't finish
|
||||
if not purge.purge_old_data(self, event.keep_days, event.repack):
|
||||
self.queue.put(PurgeTask(event.keep_days, event.repack))
|
||||
self.queue.task_done()
|
||||
continue
|
||||
if event.event_type == EVENT_TIME_CHANGED:
|
||||
|
|
|
@ -64,7 +64,7 @@ class Events(Base): # type: ignore
|
|||
context_parent_id=event.context.parent_id,
|
||||
)
|
||||
|
||||
def to_native(self):
|
||||
def to_native(self, validate_entity_id=True):
|
||||
"""Convert to a natve HA Event."""
|
||||
context = Context(
|
||||
id=self.context_id,
|
||||
|
@ -183,7 +183,7 @@ class RecorderRuns(Base): # type: ignore
|
|||
|
||||
return [row[0] for row in query]
|
||||
|
||||
def to_native(self):
|
||||
def to_native(self, validate_entity_id=True):
|
||||
"""Return self, native format is this model."""
|
||||
return self
|
||||
|
||||
|
|
|
@ -7,32 +7,62 @@ from sqlalchemy.exc import SQLAlchemyError
|
|||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from .models import Events, RecorderRuns, States
|
||||
from .util import session_scope
|
||||
from .util import execute, session_scope
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def purge_old_data(instance, purge_days, repack):
|
||||
"""Purge events and states older than purge_days ago."""
|
||||
def purge_old_data(instance, purge_days: int, repack: bool) -> bool:
|
||||
"""Purge events and states older than purge_days ago.
|
||||
|
||||
Cleans up an timeframe of an hour, based on the oldest record.
|
||||
"""
|
||||
purge_before = dt_util.utcnow() - timedelta(days=purge_days)
|
||||
_LOGGER.debug("Purging events before %s", purge_before)
|
||||
|
||||
try:
|
||||
with session_scope(session=instance.get_session()) as session:
|
||||
deleted_rows = (
|
||||
query = session.query(States).order_by(States.last_updated.asc()).limit(1)
|
||||
states = execute(query, to_native=True, validate_entity_ids=False)
|
||||
|
||||
states_purge_before = purge_before
|
||||
if states:
|
||||
states_purge_before = min(
|
||||
purge_before, states[0].last_updated + timedelta(hours=1)
|
||||
)
|
||||
|
||||
deleted_rows_states = (
|
||||
session.query(States)
|
||||
.filter(States.last_updated < purge_before)
|
||||
.filter(States.last_updated < states_purge_before)
|
||||
.delete(synchronize_session=False)
|
||||
)
|
||||
_LOGGER.debug("Deleted %s states", deleted_rows)
|
||||
_LOGGER.debug("Deleted %s states", deleted_rows_states)
|
||||
|
||||
deleted_rows = (
|
||||
query = session.query(Events).order_by(Events.time_fired.asc()).limit(1)
|
||||
events = execute(query, to_native=True)
|
||||
|
||||
events_purge_before = purge_before
|
||||
if events:
|
||||
events_purge_before = min(
|
||||
purge_before, events[0].time_fired + timedelta(hours=1)
|
||||
)
|
||||
|
||||
deleted_rows_events = (
|
||||
session.query(Events)
|
||||
.filter(Events.time_fired < purge_before)
|
||||
.filter(Events.time_fired < events_purge_before)
|
||||
.delete(synchronize_session=False)
|
||||
)
|
||||
_LOGGER.debug("Deleted %s events", deleted_rows)
|
||||
_LOGGER.debug("Deleted %s events", deleted_rows_events)
|
||||
|
||||
# If states or events purging isn't processing the purge_before yet,
|
||||
# return false, as we are not done yet.
|
||||
if (states_purge_before and states_purge_before != purge_before) or (
|
||||
events_purge_before and events_purge_before != purge_before
|
||||
):
|
||||
_LOGGER.debug("Purging hasn't fully completed yet.")
|
||||
return False
|
||||
|
||||
# Recorder runs is small, no need to batch run it
|
||||
deleted_rows = (
|
||||
session.query(RecorderRuns)
|
||||
.filter(RecorderRuns.start < purge_before)
|
||||
|
@ -52,3 +82,5 @@ def purge_old_data(instance, purge_days, repack):
|
|||
|
||||
except SQLAlchemyError as err:
|
||||
_LOGGER.warning("Error purging history: %s.", err)
|
||||
|
||||
return True
|
||||
|
|
|
@ -317,7 +317,7 @@ def test_auto_purge(hass_recorder):
|
|||
test_time = tz.localize(datetime(2020, 1, 1, 4, 12, 0))
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.recorder.purge.purge_old_data"
|
||||
"homeassistant.components.recorder.purge.purge_old_data", return_value=True
|
||||
) as purge_old_data:
|
||||
for delta in (-1, 0, 1):
|
||||
hass.bus.fire(
|
||||
|
|
|
@ -130,9 +130,16 @@ class TestRecorderPurge(unittest.TestCase):
|
|||
assert states.count() == 6
|
||||
|
||||
# run purge_old_data()
|
||||
purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||
assert not finished
|
||||
assert states.count() == 4
|
||||
|
||||
# we should only have 2 states left after purging
|
||||
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||
assert not finished
|
||||
assert states.count() == 2
|
||||
|
||||
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||
assert finished
|
||||
assert states.count() == 2
|
||||
|
||||
def test_purge_old_events(self):
|
||||
|
@ -144,9 +151,17 @@ class TestRecorderPurge(unittest.TestCase):
|
|||
assert events.count() == 6
|
||||
|
||||
# run purge_old_data()
|
||||
purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||
assert not finished
|
||||
assert events.count() == 4
|
||||
|
||||
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||
assert not finished
|
||||
assert events.count() == 2
|
||||
|
||||
# we should only have 2 events left
|
||||
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||
assert finished
|
||||
assert events.count() == 2
|
||||
|
||||
def test_purge_method(self):
|
||||
|
|
Loading…
Add table
Reference in a new issue