From 4cf618334cb4fbcc03ef957e295221675227086d Mon Sep 17 00:00:00 2001 From: Johann Kellerman Date: Sat, 23 Jul 2016 20:25:17 +0200 Subject: [PATCH] Update recorder. (#2549) * Update recorder. models.py: - Use scoped_session in models.py to fix shutdown error __init__.py: - Session _commit & retry method - Single session var for purge_data - Ensure single _INSTANCE - repeat purge every 2 days - show correct time in log_error * _commit * Restore models to old functionality, swap purge, remove _INSTANCE cleanup from tests, typing ignore Base class * pylint * Remove recorder from model unit test --- homeassistant/components/recorder/__init__.py | 97 +++++++++++-------- homeassistant/components/recorder/models.py | 6 +- tests/components/recorder/test_init.py | 4 +- tests/components/recorder/test_models.py | 38 ++++---- 4 files changed, 80 insertions(+), 65 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 413106c66b5..781736d3c6a 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -91,8 +91,12 @@ def run_information(point_in_time=None): def setup(hass, config): """Setup the recorder.""" # pylint: disable=global-statement - # pylint: disable=too-many-locals global _INSTANCE + + if _INSTANCE is not None: + _LOGGER.error('Only a single instance allowed.') + return False + purge_days = config.get(DOMAIN, {}).get(CONF_PURGE_DAYS) db_url = config.get(DOMAIN, {}).get(CONF_DB_URL, None) @@ -130,7 +134,7 @@ def log_error(e, retry_wait=0, rollback=True, if rollback: Session().rollback() if retry_wait: - _LOGGER.info("Retrying failed query in %s seconds", QUERY_RETRY_WAIT) + _LOGGER.info("Retrying in %s seconds", retry_wait) time.sleep(retry_wait) @@ -165,8 +169,6 @@ class Recorder(threading.Thread): from homeassistant.components.recorder.models import Events, States import sqlalchemy.exc - global _INSTANCE - while True: try: self._setup_connection() @@ -177,8 +179,12 @@ class Recorder(threading.Thread): message="Error during connection setup: %s") if self.purge_days is not None: - track_point_in_utc_time(self.hass, - lambda now: self._purge_old_data(), + def purge_ticker(event): + """Rerun purge every second day.""" + self._purge_old_data() + track_point_in_utc_time(self.hass, purge_ticker, + dt_util.utcnow() + timedelta(days=2)) + track_point_in_utc_time(self.hass, purge_ticker, dt_util.utcnow() + timedelta(minutes=5)) while True: @@ -187,42 +193,26 @@ class Recorder(threading.Thread): if event == self.quit_object: self._close_run() self._close_connection() + # pylint: disable=global-statement + global _INSTANCE _INSTANCE = None self.queue.task_done() return - elif event.event_type == EVENT_TIME_CHANGED: + if event.event_type == EVENT_TIME_CHANGED: self.queue.task_done() continue - session = Session() dbevent = Events.from_event(event) - session.add(dbevent) - - for _ in range(0, RETRIES): - try: - session.commit() - break - except sqlalchemy.exc.OperationalError as e: - log_error(e, retry_wait=QUERY_RETRY_WAIT, - rollback=True) + self._commit(dbevent) if event.event_type != EVENT_STATE_CHANGED: self.queue.task_done() continue - session = Session() dbstate = States.from_event(event) - - for _ in range(0, RETRIES): - try: - dbstate.event_id = dbevent.event_id - session.add(dbstate) - session.commit() - break - except sqlalchemy.exc.OperationalError as e: - log_error(e, retry_wait=QUERY_RETRY_WAIT, - rollback=True) + dbstate.event_id = dbevent.event_id + self._commit(dbstate) self.queue.task_done() @@ -269,6 +259,7 @@ class Recorder(threading.Thread): def _close_connection(self): """Close the connection.""" + # pylint: disable=global-statement global Session self.engine.dispose() self.engine = None @@ -290,16 +281,12 @@ class Recorder(threading.Thread): start=self.recording_start, created=dt_util.utcnow() ) - session = Session() - session.add(self._run) - session.commit() + self._commit(self._run) def _close_run(self): """Save end time for current run.""" self._run.end = dt_util.utcnow() - session = Session() - session.add(self._run) - session.commit() + self._commit(self._run) self._run = None def _purge_old_data(self): @@ -313,17 +300,24 @@ class Recorder(threading.Thread): purge_before = dt_util.utcnow() - timedelta(days=self.purge_days) - _LOGGER.info("Purging events created before %s", purge_before) - deleted_rows = Session().query(Events).filter( - (Events.created < purge_before)).delete(synchronize_session=False) - _LOGGER.debug("Deleted %s events", deleted_rows) + def _purge_states(session): + deleted_rows = session.query(States) \ + .filter((States.created < purge_before)) \ + .delete(synchronize_session=False) + _LOGGER.debug("Deleted %s states", deleted_rows) - _LOGGER.info("Purging states created before %s", purge_before) - deleted_rows = Session().query(States).filter( - (States.created < purge_before)).delete(synchronize_session=False) - _LOGGER.debug("Deleted %s states", deleted_rows) + if self._commit(_purge_states): + _LOGGER.info("Purged states created before %s", purge_before) + + def _purge_events(session): + deleted_rows = session.query(Events) \ + .filter((Events.created < purge_before)) \ + .delete(synchronize_session=False) + _LOGGER.debug("Deleted %s events", deleted_rows) + + if self._commit(_purge_events): + _LOGGER.info("Purged events created before %s", purge_before) - Session().commit() Session().expire_all() # Execute sqlite vacuum command to free up space on disk @@ -331,6 +325,23 @@ class Recorder(threading.Thread): _LOGGER.info("Vacuuming SQLite to free space") self.engine.execute("VACUUM") + @staticmethod + def _commit(work): + """Commit & retry work: Either a model or in a function.""" + import sqlalchemy.exc + session = Session() + for _ in range(0, RETRIES): + try: + if callable(work): + work(session) + else: + session.add(work) + session.commit() + return True + except sqlalchemy.exc.OperationalError as e: + log_error(e, retry_wait=QUERY_RETRY_WAIT, rollback=True) + return False + def _verify_instance(): """Throw error if recorder not initialized.""" diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index 554e2f47d08..fdb5642562f 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -20,7 +20,7 @@ Base = declarative_base() _LOGGER = logging.getLogger(__name__) -class Events(Base): +class Events(Base): # type: ignore # pylint: disable=too-few-public-methods """Event history data.""" @@ -55,7 +55,7 @@ class Events(Base): return None -class States(Base): +class States(Base): # type: ignore # pylint: disable=too-few-public-methods """State change history.""" @@ -114,7 +114,7 @@ class States(Base): return None -class RecorderRuns(Base): +class RecorderRuns(Base): # type: ignore # pylint: disable=too-few-public-methods """Representation of recorder run.""" diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 7519443f1e4..5ebd951a682 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -1,8 +1,8 @@ """The tests for the Recorder component.""" -# pylint: disable=too-many-public-methods,protected-access -import unittest +# pylint: disable=protected-access import json from datetime import datetime, timedelta +import unittest from unittest.mock import patch from homeassistant.const import MATCH_ALL diff --git a/tests/components/recorder/test_models.py b/tests/components/recorder/test_models.py index 55c3e019f15..c616f3d0af1 100644 --- a/tests/components/recorder/test_models.py +++ b/tests/components/recorder/test_models.py @@ -1,5 +1,4 @@ """The tests for the Recorder component.""" -# pylint: disable=too-many-public-methods,protected-access import unittest from datetime import datetime @@ -12,32 +11,35 @@ from homeassistant.util import dt from homeassistant.components.recorder.models import ( Base, Events, States, RecorderRuns) -engine = None -Session = None +ENGINE = None +SESSION = None -def setUpModule(): +def setUpModule(): # pylint: disable=invalid-name """Set up a database to use.""" - global engine, Session + global ENGINE + global SESSION - engine = create_engine("sqlite://") - Base.metadata.create_all(engine) - session_factory = sessionmaker(bind=engine) - Session = scoped_session(session_factory) + ENGINE = create_engine("sqlite://") + Base.metadata.create_all(ENGINE) + session_factory = sessionmaker(bind=ENGINE) + SESSION = scoped_session(session_factory) -def tearDownModule(): +def tearDownModule(): # pylint: disable=invalid-name """Close database.""" - global engine, Session + global ENGINE + global SESSION - engine.dispose() - engine = None - Session = None + ENGINE.dispose() + ENGINE = None + SESSION = None class TestEvents(unittest.TestCase): """Test Events model.""" + # pylint: disable=no-self-use def test_from_event(self): """Test converting event to db event.""" event = ha.Event('test_event', { @@ -49,6 +51,8 @@ class TestEvents(unittest.TestCase): class TestStates(unittest.TestCase): """Test States model.""" + # pylint: disable=no-self-use + def test_from_event(self): """Test converting event to db state.""" state = ha.State('sensor.temperature', '18') @@ -78,14 +82,14 @@ class TestStates(unittest.TestCase): class TestRecorderRuns(unittest.TestCase): """Test recorder run model.""" - def setUp(self): + def setUp(self): # pylint: disable=invalid-name """Set up recorder runs.""" - self.session = session = Session() + self.session = session = SESSION() session.query(Events).delete() session.query(States).delete() session.query(RecorderRuns).delete() - def tearDown(self): + def tearDown(self): # pylint: disable=invalid-name """Clean up.""" self.session.rollback()