Add recorder purge service, rework purge timer (#9523)
* Add recorder purge service * Recorder test to match purge config * Removed purge timer, move service handler to setup, add service description file * Tests for recorder purge service * Recorder purge timer rework, add purge service parameter, tests * Purge service schema change * Service description change value range * First cleanup * Fix name of config
This commit is contained in:
parent
4314dc251f
commit
3f9d052218
4 changed files with 110 additions and 22 deletions
|
@ -10,10 +10,11 @@ https://home-assistant.io/components/recorder/
|
|||
import asyncio
|
||||
import concurrent.futures
|
||||
import logging
|
||||
from os import path
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
from datetime import timedelta, datetime
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Dict
|
||||
|
||||
import voluptuous as vol
|
||||
|
@ -28,6 +29,7 @@ import homeassistant.helpers.config_validation as cv
|
|||
from homeassistant.helpers.event import async_track_time_interval
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
import homeassistant.util.dt as dt_util
|
||||
from homeassistant import config as conf_util
|
||||
|
||||
from . import purge, migration
|
||||
from .const import DATA_INSTANCE
|
||||
|
@ -39,11 +41,21 @@ _LOGGER = logging.getLogger(__name__)
|
|||
|
||||
DOMAIN = 'recorder'
|
||||
|
||||
SERVICE_PURGE = 'purge'
|
||||
|
||||
ATTR_KEEP_DAYS = 'keep_days'
|
||||
|
||||
SERVICE_PURGE_SCHEMA = vol.Schema({
|
||||
vol.Required(ATTR_KEEP_DAYS):
|
||||
vol.All(vol.Coerce(int), vol.Range(min=0))
|
||||
})
|
||||
|
||||
DEFAULT_URL = 'sqlite:///{hass_config_path}'
|
||||
DEFAULT_DB_FILE = 'home-assistant_v2.db'
|
||||
|
||||
CONF_DB_URL = 'db_url'
|
||||
CONF_PURGE_DAYS = 'purge_days'
|
||||
CONF_PURGE_KEEP_DAYS = 'purge_keep_days'
|
||||
CONF_PURGE_INTERVAL = 'purge_interval'
|
||||
CONF_EVENT_TYPES = 'event_types'
|
||||
|
||||
CONNECT_RETRY_WAIT = 3
|
||||
|
@ -65,7 +77,9 @@ FILTER_SCHEMA = vol.Schema({
|
|||
|
||||
CONFIG_SCHEMA = vol.Schema({
|
||||
DOMAIN: FILTER_SCHEMA.extend({
|
||||
vol.Optional(CONF_PURGE_DAYS):
|
||||
vol.Inclusive(CONF_PURGE_KEEP_DAYS, 'purge'):
|
||||
vol.All(vol.Coerce(int), vol.Range(min=1)),
|
||||
vol.Inclusive(CONF_PURGE_INTERVAL, 'purge'):
|
||||
vol.All(vol.Coerce(int), vol.Range(min=1)),
|
||||
vol.Optional(CONF_DB_URL): cv.string,
|
||||
})
|
||||
|
@ -106,7 +120,8 @@ def run_information(hass, point_in_time: Optional[datetime]=None):
|
|||
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the recorder."""
|
||||
conf = config.get(DOMAIN, {})
|
||||
purge_days = conf.get(CONF_PURGE_DAYS)
|
||||
purge_days = conf.get(CONF_PURGE_KEEP_DAYS)
|
||||
purge_interval = conf.get(CONF_PURGE_INTERVAL)
|
||||
|
||||
db_url = conf.get(CONF_DB_URL, None)
|
||||
if not db_url:
|
||||
|
@ -116,24 +131,46 @@ def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|||
include = conf.get(CONF_INCLUDE, {})
|
||||
exclude = conf.get(CONF_EXCLUDE, {})
|
||||
instance = hass.data[DATA_INSTANCE] = Recorder(
|
||||
hass, purge_days=purge_days, uri=db_url, include=include,
|
||||
exclude=exclude)
|
||||
hass, uri=db_url, include=include, exclude=exclude)
|
||||
instance.async_initialize()
|
||||
instance.start()
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_handle_purge_interval(now):
|
||||
"""Handle purge interval."""
|
||||
instance.do_purge(purge_days)
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_handle_purge_service(service):
|
||||
"""Handle calls to the purge service."""
|
||||
instance.do_purge(service.data[ATTR_KEEP_DAYS])
|
||||
|
||||
descriptions = yield from hass.async_add_job(
|
||||
conf_util.load_yaml_config_file, path.join(
|
||||
path.dirname(__file__), 'services.yaml'))
|
||||
|
||||
if purge_interval and purge_days:
|
||||
async_track_time_interval(hass, async_handle_purge_interval,
|
||||
timedelta(days=purge_interval))
|
||||
|
||||
hass.services.async_register(DOMAIN, SERVICE_PURGE,
|
||||
async_handle_purge_service,
|
||||
descriptions.get(SERVICE_PURGE),
|
||||
schema=SERVICE_PURGE_SCHEMA)
|
||||
|
||||
return (yield from instance.async_db_ready)
|
||||
|
||||
|
||||
class Recorder(threading.Thread):
|
||||
"""A threaded recorder class."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant, purge_days: int, uri: str,
|
||||
def __init__(self, hass: HomeAssistant, uri: str,
|
||||
include: Dict, exclude: Dict) -> None:
|
||||
"""Initialize the recorder."""
|
||||
threading.Thread.__init__(self, name='Recorder')
|
||||
|
||||
self.hass = hass
|
||||
self.purge_days = purge_days
|
||||
self.purge_days = None
|
||||
self.queue = queue.Queue() # type: Any
|
||||
self.recording_start = dt_util.utcnow()
|
||||
self.db_url = uri
|
||||
|
@ -148,12 +185,19 @@ class Recorder(threading.Thread):
|
|||
self.exclude_t = exclude.get(CONF_EVENT_TYPES, [])
|
||||
|
||||
self.get_session = None
|
||||
self.purge_task = object()
|
||||
|
||||
@callback
|
||||
def async_initialize(self):
|
||||
"""Initialize the recorder."""
|
||||
self.hass.bus.async_listen(MATCH_ALL, self.event_listener)
|
||||
|
||||
def do_purge(self, purge_days=None):
|
||||
"""Event listener for purging data."""
|
||||
if purge_days is not None:
|
||||
self.purge_days = purge_days
|
||||
self.queue.put(self.purge_task)
|
||||
|
||||
def run(self):
|
||||
"""Start processing events to save."""
|
||||
from .models import States, Events
|
||||
|
@ -190,7 +234,6 @@ class Recorder(threading.Thread):
|
|||
self.hass.add_job(connection_failed)
|
||||
return
|
||||
|
||||
purge_task = object()
|
||||
shutdown_task = object()
|
||||
hass_started = concurrent.futures.Future()
|
||||
|
||||
|
@ -220,15 +263,6 @@ class Recorder(threading.Thread):
|
|||
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START,
|
||||
notify_hass_started)
|
||||
|
||||
if self.purge_days is not None:
|
||||
@callback
|
||||
def do_purge(now):
|
||||
"""Event listener for purging data."""
|
||||
self.queue.put(purge_task)
|
||||
|
||||
async_track_time_interval(self.hass, do_purge,
|
||||
timedelta(days=2))
|
||||
|
||||
self.hass.add_job(register)
|
||||
result = hass_started.result()
|
||||
|
||||
|
@ -244,7 +278,7 @@ class Recorder(threading.Thread):
|
|||
self._close_connection()
|
||||
self.queue.task_done()
|
||||
return
|
||||
elif event is purge_task:
|
||||
elif event is self.purge_task:
|
||||
purge.purge_old_data(self, self.purge_days)
|
||||
continue
|
||||
elif event.event_type == EVENT_TIME_CHANGED:
|
||||
|
|
9
homeassistant/components/recorder/services.yaml
Normal file
9
homeassistant/components/recorder/services.yaml
Normal file
|
@ -0,0 +1,9 @@
|
|||
# Describes the format for available recorder services
|
||||
|
||||
purge:
|
||||
description: Start purge task - delete events and states older than x days, according to keep_days service data.
|
||||
|
||||
fields:
|
||||
keep_days:
|
||||
description: Number of history days to keep in database after purge. Value >= 0
|
||||
example: 2
|
|
@ -195,8 +195,7 @@ def test_recorder_setup_failure():
|
|||
with patch.object(Recorder, '_setup_connection') as setup, \
|
||||
patch('homeassistant.components.recorder.time.sleep'):
|
||||
setup.side_effect = ImportError("driver not found")
|
||||
rec = Recorder(
|
||||
hass, purge_days=0, uri='sqlite://', include={}, exclude={})
|
||||
rec = Recorder(hass, uri='sqlite://', include={}, exclude={})
|
||||
rec.start()
|
||||
rec.join()
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
"""Test data purging."""
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from time import sleep
|
||||
import unittest
|
||||
|
||||
from homeassistant.components import recorder
|
||||
|
@ -16,8 +17,9 @@ class TestRecorderPurge(unittest.TestCase):
|
|||
|
||||
def setUp(self): # pylint: disable=invalid-name
|
||||
"""Setup things to be run when tests are started."""
|
||||
config = {'purge_keep_days': 4, 'purge_interval': 2}
|
||||
self.hass = get_test_home_assistant()
|
||||
init_recorder_component(self.hass)
|
||||
init_recorder_component(self.hass, config)
|
||||
self.hass.start()
|
||||
|
||||
def tearDown(self): # pylint: disable=invalid-name
|
||||
|
@ -107,3 +109,47 @@ class TestRecorderPurge(unittest.TestCase):
|
|||
|
||||
# now we should only have 3 events left
|
||||
self.assertEqual(events.count(), 3)
|
||||
|
||||
def test_purge_method(self):
|
||||
"""Test purge method."""
|
||||
service_data = {'keep_days': 4}
|
||||
self._add_test_states()
|
||||
self._add_test_events()
|
||||
|
||||
# make sure we start with 5 states
|
||||
with session_scope(hass=self.hass) as session:
|
||||
states = session.query(States)
|
||||
self.assertEqual(states.count(), 5)
|
||||
|
||||
events = session.query(Events).filter(
|
||||
Events.event_type.like("EVENT_TEST%"))
|
||||
self.assertEqual(events.count(), 5)
|
||||
|
||||
self.hass.data[DATA_INSTANCE].block_till_done()
|
||||
|
||||
# run purge method - no service data, should not work
|
||||
self.hass.services.call('recorder', 'purge')
|
||||
self.hass.async_block_till_done()
|
||||
|
||||
# Small wait for recorder thread
|
||||
sleep(0.1)
|
||||
|
||||
# we should only have 2 states left after purging
|
||||
self.assertEqual(states.count(), 5)
|
||||
|
||||
# now we should only have 3 events left
|
||||
self.assertEqual(events.count(), 5)
|
||||
|
||||
# run purge method - correct service data
|
||||
self.hass.services.call('recorder', 'purge',
|
||||
service_data=service_data)
|
||||
self.hass.async_block_till_done()
|
||||
|
||||
# Small wait for recorder thread
|
||||
sleep(0.1)
|
||||
|
||||
# we should only have 2 states left after purging
|
||||
self.assertEqual(states.count(), 2)
|
||||
|
||||
# now we should only have 3 events left
|
||||
self.assertEqual(events.count(), 3)
|
||||
|
|
Loading…
Add table
Reference in a new issue