Fix recorder purging by batch processing purges (#37140)
This commit is contained in:
parent
a4501b93c4
commit
7d74b74570
5 changed files with 66 additions and 17 deletions
|
@ -335,7 +335,7 @@ class Recorder(threading.Thread):
|
||||||
self.event_session = self.get_session()
|
self.event_session = self.get_session()
|
||||||
# Use a session for the event read loop
|
# Use a session for the event read loop
|
||||||
# with a commit every time the event time
|
# with a commit every time the event time
|
||||||
# has changed. This reduces the disk io.
|
# has changed. This reduces the disk io.
|
||||||
while True:
|
while True:
|
||||||
event = self.queue.get()
|
event = self.queue.get()
|
||||||
if event is None:
|
if event is None:
|
||||||
|
@ -344,7 +344,9 @@ class Recorder(threading.Thread):
|
||||||
self.queue.task_done()
|
self.queue.task_done()
|
||||||
return
|
return
|
||||||
if isinstance(event, PurgeTask):
|
if isinstance(event, PurgeTask):
|
||||||
purge.purge_old_data(self, event.keep_days, event.repack)
|
# Schedule a new purge task if this one didn't finish
|
||||||
|
if not purge.purge_old_data(self, event.keep_days, event.repack):
|
||||||
|
self.queue.put(PurgeTask(event.keep_days, event.repack))
|
||||||
self.queue.task_done()
|
self.queue.task_done()
|
||||||
continue
|
continue
|
||||||
if event.event_type == EVENT_TIME_CHANGED:
|
if event.event_type == EVENT_TIME_CHANGED:
|
||||||
|
|
|
@ -64,7 +64,7 @@ class Events(Base): # type: ignore
|
||||||
context_parent_id=event.context.parent_id,
|
context_parent_id=event.context.parent_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
def to_native(self):
|
def to_native(self, validate_entity_id=True):
|
||||||
"""Convert to a natve HA Event."""
|
"""Convert to a natve HA Event."""
|
||||||
context = Context(
|
context = Context(
|
||||||
id=self.context_id,
|
id=self.context_id,
|
||||||
|
@ -183,7 +183,7 @@ class RecorderRuns(Base): # type: ignore
|
||||||
|
|
||||||
return [row[0] for row in query]
|
return [row[0] for row in query]
|
||||||
|
|
||||||
def to_native(self):
|
def to_native(self, validate_entity_id=True):
|
||||||
"""Return self, native format is this model."""
|
"""Return self, native format is this model."""
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
|
|
@ -7,32 +7,62 @@ from sqlalchemy.exc import SQLAlchemyError
|
||||||
import homeassistant.util.dt as dt_util
|
import homeassistant.util.dt as dt_util
|
||||||
|
|
||||||
from .models import Events, RecorderRuns, States
|
from .models import Events, RecorderRuns, States
|
||||||
from .util import session_scope
|
from .util import execute, session_scope
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def purge_old_data(instance, purge_days, repack):
|
def purge_old_data(instance, purge_days: int, repack: bool) -> bool:
|
||||||
"""Purge events and states older than purge_days ago."""
|
"""Purge events and states older than purge_days ago.
|
||||||
|
|
||||||
|
Cleans up an timeframe of an hour, based on the oldest record.
|
||||||
|
"""
|
||||||
purge_before = dt_util.utcnow() - timedelta(days=purge_days)
|
purge_before = dt_util.utcnow() - timedelta(days=purge_days)
|
||||||
_LOGGER.debug("Purging events before %s", purge_before)
|
_LOGGER.debug("Purging events before %s", purge_before)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with session_scope(session=instance.get_session()) as session:
|
with session_scope(session=instance.get_session()) as session:
|
||||||
deleted_rows = (
|
query = session.query(States).order_by(States.last_updated.asc()).limit(1)
|
||||||
|
states = execute(query, to_native=True, validate_entity_ids=False)
|
||||||
|
|
||||||
|
states_purge_before = purge_before
|
||||||
|
if states:
|
||||||
|
states_purge_before = min(
|
||||||
|
purge_before, states[0].last_updated + timedelta(hours=1)
|
||||||
|
)
|
||||||
|
|
||||||
|
deleted_rows_states = (
|
||||||
session.query(States)
|
session.query(States)
|
||||||
.filter(States.last_updated < purge_before)
|
.filter(States.last_updated < states_purge_before)
|
||||||
.delete(synchronize_session=False)
|
.delete(synchronize_session=False)
|
||||||
)
|
)
|
||||||
_LOGGER.debug("Deleted %s states", deleted_rows)
|
_LOGGER.debug("Deleted %s states", deleted_rows_states)
|
||||||
|
|
||||||
deleted_rows = (
|
query = session.query(Events).order_by(Events.time_fired.asc()).limit(1)
|
||||||
|
events = execute(query, to_native=True)
|
||||||
|
|
||||||
|
events_purge_before = purge_before
|
||||||
|
if events:
|
||||||
|
events_purge_before = min(
|
||||||
|
purge_before, events[0].time_fired + timedelta(hours=1)
|
||||||
|
)
|
||||||
|
|
||||||
|
deleted_rows_events = (
|
||||||
session.query(Events)
|
session.query(Events)
|
||||||
.filter(Events.time_fired < purge_before)
|
.filter(Events.time_fired < events_purge_before)
|
||||||
.delete(synchronize_session=False)
|
.delete(synchronize_session=False)
|
||||||
)
|
)
|
||||||
_LOGGER.debug("Deleted %s events", deleted_rows)
|
_LOGGER.debug("Deleted %s events", deleted_rows_events)
|
||||||
|
|
||||||
|
# If states or events purging isn't processing the purge_before yet,
|
||||||
|
# return false, as we are not done yet.
|
||||||
|
if (states_purge_before and states_purge_before != purge_before) or (
|
||||||
|
events_purge_before and events_purge_before != purge_before
|
||||||
|
):
|
||||||
|
_LOGGER.debug("Purging hasn't fully completed yet.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Recorder runs is small, no need to batch run it
|
||||||
deleted_rows = (
|
deleted_rows = (
|
||||||
session.query(RecorderRuns)
|
session.query(RecorderRuns)
|
||||||
.filter(RecorderRuns.start < purge_before)
|
.filter(RecorderRuns.start < purge_before)
|
||||||
|
@ -52,3 +82,5 @@ def purge_old_data(instance, purge_days, repack):
|
||||||
|
|
||||||
except SQLAlchemyError as err:
|
except SQLAlchemyError as err:
|
||||||
_LOGGER.warning("Error purging history: %s.", err)
|
_LOGGER.warning("Error purging history: %s.", err)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
|
@ -317,7 +317,7 @@ def test_auto_purge(hass_recorder):
|
||||||
test_time = tz.localize(datetime(2020, 1, 1, 4, 12, 0))
|
test_time = tz.localize(datetime(2020, 1, 1, 4, 12, 0))
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"homeassistant.components.recorder.purge.purge_old_data"
|
"homeassistant.components.recorder.purge.purge_old_data", return_value=True
|
||||||
) as purge_old_data:
|
) as purge_old_data:
|
||||||
for delta in (-1, 0, 1):
|
for delta in (-1, 0, 1):
|
||||||
hass.bus.fire(
|
hass.bus.fire(
|
||||||
|
|
|
@ -130,9 +130,16 @@ class TestRecorderPurge(unittest.TestCase):
|
||||||
assert states.count() == 6
|
assert states.count() == 6
|
||||||
|
|
||||||
# run purge_old_data()
|
# run purge_old_data()
|
||||||
purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||||
|
assert not finished
|
||||||
|
assert states.count() == 4
|
||||||
|
|
||||||
# we should only have 2 states left after purging
|
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||||
|
assert not finished
|
||||||
|
assert states.count() == 2
|
||||||
|
|
||||||
|
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||||
|
assert finished
|
||||||
assert states.count() == 2
|
assert states.count() == 2
|
||||||
|
|
||||||
def test_purge_old_events(self):
|
def test_purge_old_events(self):
|
||||||
|
@ -144,9 +151,17 @@ class TestRecorderPurge(unittest.TestCase):
|
||||||
assert events.count() == 6
|
assert events.count() == 6
|
||||||
|
|
||||||
# run purge_old_data()
|
# run purge_old_data()
|
||||||
purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||||
|
assert not finished
|
||||||
|
assert events.count() == 4
|
||||||
|
|
||||||
|
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||||
|
assert not finished
|
||||||
|
assert events.count() == 2
|
||||||
|
|
||||||
# we should only have 2 events left
|
# we should only have 2 events left
|
||||||
|
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
||||||
|
assert finished
|
||||||
assert events.count() == 2
|
assert events.count() == 2
|
||||||
|
|
||||||
def test_purge_method(self):
|
def test_purge_method(self):
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue