Now that python 3.7 is the minimum supported version, we can use the more efficient SimpleQueue in the recorder as it does not have to use threading.Lock
238 lines
8.5 KiB
Python
238 lines
8.5 KiB
Python
"""Test data purging."""
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
import unittest
|
|
|
|
from homeassistant.components import recorder
|
|
from homeassistant.components.recorder.const import DATA_INSTANCE
|
|
from homeassistant.components.recorder.models import Events, RecorderRuns, States
|
|
from homeassistant.components.recorder.purge import purge_old_data
|
|
from homeassistant.components.recorder.util import session_scope
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .common import wait_recording_done
|
|
|
|
from tests.async_mock import patch
|
|
from tests.common import get_test_home_assistant, init_recorder_component
|
|
|
|
|
|
class TestRecorderPurge(unittest.TestCase):
|
|
"""Base class for common recorder tests."""
|
|
|
|
def setUp(self): # pylint: disable=invalid-name
|
|
"""Set up things to be run when tests are started."""
|
|
self.hass = get_test_home_assistant()
|
|
init_recorder_component(self.hass)
|
|
self.hass.start()
|
|
self.addCleanup(self.tear_down_cleanup)
|
|
|
|
def tear_down_cleanup(self):
|
|
"""Stop everything that was started."""
|
|
self.hass.stop()
|
|
|
|
def _add_test_states(self):
|
|
"""Add multiple states to the db for testing."""
|
|
now = datetime.now()
|
|
five_days_ago = now - timedelta(days=5)
|
|
eleven_days_ago = now - timedelta(days=11)
|
|
attributes = {"test_attr": 5, "test_attr_10": "nice"}
|
|
|
|
self.hass.block_till_done()
|
|
self.hass.data[DATA_INSTANCE].block_till_done()
|
|
wait_recording_done(self.hass)
|
|
|
|
with recorder.session_scope(hass=self.hass) as session:
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
state = "autopurgeme"
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
state = "purgeme"
|
|
else:
|
|
timestamp = now
|
|
state = "dontpurgeme"
|
|
|
|
session.add(
|
|
States(
|
|
entity_id="test.recorder2",
|
|
domain="sensor",
|
|
state=state,
|
|
attributes=json.dumps(attributes),
|
|
last_changed=timestamp,
|
|
last_updated=timestamp,
|
|
created=timestamp,
|
|
event_id=event_id + 1000,
|
|
)
|
|
)
|
|
|
|
def _add_test_events(self):
|
|
"""Add a few events for testing."""
|
|
now = datetime.now()
|
|
five_days_ago = now - timedelta(days=5)
|
|
eleven_days_ago = now - timedelta(days=11)
|
|
event_data = {"test_attr": 5, "test_attr_10": "nice"}
|
|
|
|
self.hass.block_till_done()
|
|
self.hass.data[DATA_INSTANCE].block_till_done()
|
|
wait_recording_done(self.hass)
|
|
|
|
with recorder.session_scope(hass=self.hass) as session:
|
|
for event_id in range(6):
|
|
if event_id < 2:
|
|
timestamp = eleven_days_ago
|
|
event_type = "EVENT_TEST_AUTOPURGE"
|
|
elif event_id < 4:
|
|
timestamp = five_days_ago
|
|
event_type = "EVENT_TEST_PURGE"
|
|
else:
|
|
timestamp = now
|
|
event_type = "EVENT_TEST"
|
|
|
|
session.add(
|
|
Events(
|
|
event_type=event_type,
|
|
event_data=json.dumps(event_data),
|
|
origin="LOCAL",
|
|
created=timestamp,
|
|
time_fired=timestamp,
|
|
)
|
|
)
|
|
|
|
def _add_test_recorder_runs(self):
|
|
"""Add a few recorder_runs for testing."""
|
|
now = datetime.now()
|
|
five_days_ago = now - timedelta(days=5)
|
|
eleven_days_ago = now - timedelta(days=11)
|
|
|
|
self.hass.block_till_done()
|
|
self.hass.data[DATA_INSTANCE].block_till_done()
|
|
wait_recording_done(self.hass)
|
|
|
|
with recorder.session_scope(hass=self.hass) as session:
|
|
for rec_id in range(6):
|
|
if rec_id < 2:
|
|
timestamp = eleven_days_ago
|
|
elif rec_id < 4:
|
|
timestamp = five_days_ago
|
|
else:
|
|
timestamp = now
|
|
|
|
session.add(
|
|
RecorderRuns(
|
|
start=timestamp,
|
|
created=dt_util.utcnow(),
|
|
end=timestamp + timedelta(days=1),
|
|
)
|
|
)
|
|
|
|
def test_purge_old_states(self):
|
|
"""Test deleting old states."""
|
|
self._add_test_states()
|
|
# make sure we start with 6 states
|
|
with session_scope(hass=self.hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 6
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
|
assert not finished
|
|
assert states.count() == 4
|
|
|
|
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
|
assert not finished
|
|
assert states.count() == 2
|
|
|
|
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
|
assert finished
|
|
assert states.count() == 2
|
|
|
|
def test_purge_old_events(self):
|
|
"""Test deleting old events."""
|
|
self._add_test_events()
|
|
|
|
with session_scope(hass=self.hass) as session:
|
|
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
|
|
assert events.count() == 6
|
|
|
|
# run purge_old_data()
|
|
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
|
assert not finished
|
|
assert events.count() == 4
|
|
|
|
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
|
assert not finished
|
|
assert events.count() == 2
|
|
|
|
# we should only have 2 events left
|
|
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
|
|
assert finished
|
|
assert events.count() == 2
|
|
|
|
def test_purge_method(self):
|
|
"""Test purge method."""
|
|
service_data = {"keep_days": 4}
|
|
self._add_test_events()
|
|
self._add_test_states()
|
|
self._add_test_recorder_runs()
|
|
|
|
# make sure we start with 6 states
|
|
with session_scope(hass=self.hass) as session:
|
|
states = session.query(States)
|
|
assert states.count() == 6
|
|
|
|
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
|
|
assert events.count() == 6
|
|
|
|
recorder_runs = session.query(RecorderRuns)
|
|
assert recorder_runs.count() == 7
|
|
|
|
self.hass.data[DATA_INSTANCE].block_till_done()
|
|
wait_recording_done(self.hass)
|
|
|
|
# run purge method - no service data, use defaults
|
|
self.hass.services.call("recorder", "purge")
|
|
self.hass.block_till_done()
|
|
|
|
# Small wait for recorder thread
|
|
self.hass.data[DATA_INSTANCE].block_till_done()
|
|
wait_recording_done(self.hass)
|
|
|
|
# only purged old events
|
|
assert states.count() == 4
|
|
assert events.count() == 4
|
|
|
|
# run purge method - correct service data
|
|
self.hass.services.call("recorder", "purge", service_data=service_data)
|
|
self.hass.block_till_done()
|
|
|
|
# Small wait for recorder thread
|
|
self.hass.data[DATA_INSTANCE].block_till_done()
|
|
wait_recording_done(self.hass)
|
|
|
|
# we should only have 2 states left after purging
|
|
assert states.count() == 2
|
|
|
|
# now we should only have 2 events left
|
|
assert events.count() == 2
|
|
|
|
# now we should only have 3 recorder runs left
|
|
assert recorder_runs.count() == 3
|
|
|
|
assert not (
|
|
"EVENT_TEST_PURGE" in (event.event_type for event in events.all())
|
|
)
|
|
|
|
# run purge method - correct service data, with repack
|
|
with patch(
|
|
"homeassistant.components.recorder.purge._LOGGER"
|
|
) as mock_logger:
|
|
service_data["repack"] = True
|
|
self.hass.services.call("recorder", "purge", service_data=service_data)
|
|
self.hass.block_till_done()
|
|
self.hass.data[DATA_INSTANCE].block_till_done()
|
|
wait_recording_done(self.hass)
|
|
assert (
|
|
mock_logger.debug.mock_calls[5][1][0]
|
|
== "Vacuuming SQL DB to free space"
|
|
)
|