From 3f9d052218d2c07e4d03849757c0c109d3800f6f Mon Sep 17 00:00:00 2001 From: milanvo Date: Wed, 4 Oct 2017 14:07:42 +0200 Subject: [PATCH] Add recorder purge service, rework purge timer (#9523) * Add recorder purge service * Recorder test to match purge config * Removed purge timer, move service handler to setup, add service description file * Tests for recorder purge service * Recorder purge timer rework, add purge service parameter, tests * Purge service schema change * Service description change value range * First cleanup * Fix name of config --- homeassistant/components/recorder/__init__.py | 72 ++++++++++++++----- .../components/recorder/services.yaml | 9 +++ tests/components/recorder/test_init.py | 3 +- tests/components/recorder/test_purge.py | 48 ++++++++++++- 4 files changed, 110 insertions(+), 22 deletions(-) create mode 100644 homeassistant/components/recorder/services.yaml diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 5d3ca270399..5959165779b 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -10,10 +10,11 @@ https://home-assistant.io/components/recorder/ import asyncio import concurrent.futures import logging +from os import path import queue import threading import time -from datetime import timedelta, datetime +from datetime import datetime, timedelta from typing import Optional, Dict import voluptuous as vol @@ -28,6 +29,7 @@ import homeassistant.helpers.config_validation as cv from homeassistant.helpers.event import async_track_time_interval from homeassistant.helpers.typing import ConfigType import homeassistant.util.dt as dt_util +from homeassistant import config as conf_util from . import purge, migration from .const import DATA_INSTANCE @@ -39,11 +41,21 @@ _LOGGER = logging.getLogger(__name__) DOMAIN = 'recorder' +SERVICE_PURGE = 'purge' + +ATTR_KEEP_DAYS = 'keep_days' + +SERVICE_PURGE_SCHEMA = vol.Schema({ + vol.Required(ATTR_KEEP_DAYS): + vol.All(vol.Coerce(int), vol.Range(min=0)) +}) + DEFAULT_URL = 'sqlite:///{hass_config_path}' DEFAULT_DB_FILE = 'home-assistant_v2.db' CONF_DB_URL = 'db_url' -CONF_PURGE_DAYS = 'purge_days' +CONF_PURGE_KEEP_DAYS = 'purge_keep_days' +CONF_PURGE_INTERVAL = 'purge_interval' CONF_EVENT_TYPES = 'event_types' CONNECT_RETRY_WAIT = 3 @@ -65,7 +77,9 @@ FILTER_SCHEMA = vol.Schema({ CONFIG_SCHEMA = vol.Schema({ DOMAIN: FILTER_SCHEMA.extend({ - vol.Optional(CONF_PURGE_DAYS): + vol.Inclusive(CONF_PURGE_KEEP_DAYS, 'purge'): + vol.All(vol.Coerce(int), vol.Range(min=1)), + vol.Inclusive(CONF_PURGE_INTERVAL, 'purge'): vol.All(vol.Coerce(int), vol.Range(min=1)), vol.Optional(CONF_DB_URL): cv.string, }) @@ -106,7 +120,8 @@ def run_information(hass, point_in_time: Optional[datetime]=None): def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the recorder.""" conf = config.get(DOMAIN, {}) - purge_days = conf.get(CONF_PURGE_DAYS) + purge_days = conf.get(CONF_PURGE_KEEP_DAYS) + purge_interval = conf.get(CONF_PURGE_INTERVAL) db_url = conf.get(CONF_DB_URL, None) if not db_url: @@ -116,24 +131,46 @@ def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: include = conf.get(CONF_INCLUDE, {}) exclude = conf.get(CONF_EXCLUDE, {}) instance = hass.data[DATA_INSTANCE] = Recorder( - hass, purge_days=purge_days, uri=db_url, include=include, - exclude=exclude) + hass, uri=db_url, include=include, exclude=exclude) instance.async_initialize() instance.start() + @asyncio.coroutine + def async_handle_purge_interval(now): + """Handle purge interval.""" + instance.do_purge(purge_days) + + @asyncio.coroutine + def async_handle_purge_service(service): + """Handle calls to the purge service.""" + instance.do_purge(service.data[ATTR_KEEP_DAYS]) + + descriptions = yield from hass.async_add_job( + conf_util.load_yaml_config_file, path.join( + path.dirname(__file__), 'services.yaml')) + + if purge_interval and purge_days: + async_track_time_interval(hass, async_handle_purge_interval, + timedelta(days=purge_interval)) + + hass.services.async_register(DOMAIN, SERVICE_PURGE, + async_handle_purge_service, + descriptions.get(SERVICE_PURGE), + schema=SERVICE_PURGE_SCHEMA) + return (yield from instance.async_db_ready) class Recorder(threading.Thread): """A threaded recorder class.""" - def __init__(self, hass: HomeAssistant, purge_days: int, uri: str, + def __init__(self, hass: HomeAssistant, uri: str, include: Dict, exclude: Dict) -> None: """Initialize the recorder.""" threading.Thread.__init__(self, name='Recorder') self.hass = hass - self.purge_days = purge_days + self.purge_days = None self.queue = queue.Queue() # type: Any self.recording_start = dt_util.utcnow() self.db_url = uri @@ -148,12 +185,19 @@ class Recorder(threading.Thread): self.exclude_t = exclude.get(CONF_EVENT_TYPES, []) self.get_session = None + self.purge_task = object() @callback def async_initialize(self): """Initialize the recorder.""" self.hass.bus.async_listen(MATCH_ALL, self.event_listener) + def do_purge(self, purge_days=None): + """Event listener for purging data.""" + if purge_days is not None: + self.purge_days = purge_days + self.queue.put(self.purge_task) + def run(self): """Start processing events to save.""" from .models import States, Events @@ -190,7 +234,6 @@ class Recorder(threading.Thread): self.hass.add_job(connection_failed) return - purge_task = object() shutdown_task = object() hass_started = concurrent.futures.Future() @@ -220,15 +263,6 @@ class Recorder(threading.Thread): self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, notify_hass_started) - if self.purge_days is not None: - @callback - def do_purge(now): - """Event listener for purging data.""" - self.queue.put(purge_task) - - async_track_time_interval(self.hass, do_purge, - timedelta(days=2)) - self.hass.add_job(register) result = hass_started.result() @@ -244,7 +278,7 @@ class Recorder(threading.Thread): self._close_connection() self.queue.task_done() return - elif event is purge_task: + elif event is self.purge_task: purge.purge_old_data(self, self.purge_days) continue elif event.event_type == EVENT_TIME_CHANGED: diff --git a/homeassistant/components/recorder/services.yaml b/homeassistant/components/recorder/services.yaml new file mode 100644 index 00000000000..fa57e8fc07f --- /dev/null +++ b/homeassistant/components/recorder/services.yaml @@ -0,0 +1,9 @@ +# Describes the format for available recorder services + +purge: + description: Start purge task - delete events and states older than x days, according to keep_days service data. + + fields: + keep_days: + description: Number of history days to keep in database after purge. Value >= 0 + example: 2 diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 539b80f50d0..ed04e96a43c 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -195,8 +195,7 @@ def test_recorder_setup_failure(): with patch.object(Recorder, '_setup_connection') as setup, \ patch('homeassistant.components.recorder.time.sleep'): setup.side_effect = ImportError("driver not found") - rec = Recorder( - hass, purge_days=0, uri='sqlite://', include={}, exclude={}) + rec = Recorder(hass, uri='sqlite://', include={}, exclude={}) rec.start() rec.join() diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 1a52e0503bb..5db710882d9 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -1,6 +1,7 @@ """Test data purging.""" import json from datetime import datetime, timedelta +from time import sleep import unittest from homeassistant.components import recorder @@ -16,8 +17,9 @@ class TestRecorderPurge(unittest.TestCase): def setUp(self): # pylint: disable=invalid-name """Setup things to be run when tests are started.""" + config = {'purge_keep_days': 4, 'purge_interval': 2} self.hass = get_test_home_assistant() - init_recorder_component(self.hass) + init_recorder_component(self.hass, config) self.hass.start() def tearDown(self): # pylint: disable=invalid-name @@ -107,3 +109,47 @@ class TestRecorderPurge(unittest.TestCase): # now we should only have 3 events left self.assertEqual(events.count(), 3) + + def test_purge_method(self): + """Test purge method.""" + service_data = {'keep_days': 4} + self._add_test_states() + self._add_test_events() + + # make sure we start with 5 states + with session_scope(hass=self.hass) as session: + states = session.query(States) + self.assertEqual(states.count(), 5) + + events = session.query(Events).filter( + Events.event_type.like("EVENT_TEST%")) + self.assertEqual(events.count(), 5) + + self.hass.data[DATA_INSTANCE].block_till_done() + + # run purge method - no service data, should not work + self.hass.services.call('recorder', 'purge') + self.hass.async_block_till_done() + + # Small wait for recorder thread + sleep(0.1) + + # we should only have 2 states left after purging + self.assertEqual(states.count(), 5) + + # now we should only have 3 events left + self.assertEqual(events.count(), 5) + + # run purge method - correct service data + self.hass.services.call('recorder', 'purge', + service_data=service_data) + self.hass.async_block_till_done() + + # Small wait for recorder thread + sleep(0.1) + + # we should only have 2 states left after purging + self.assertEqual(states.count(), 2) + + # now we should only have 3 events left + self.assertEqual(events.count(), 3)