diff --git a/homeassistant/components/history.py b/homeassistant/components/history.py index ee3b05927ff..d5568201309 100644 --- a/homeassistant/components/history.py +++ b/homeassistant/components/history.py @@ -1,3 +1,9 @@ +""" +homeassistant.components.history +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Provide pre-made queries on top of the recorder component. +""" import re import homeassistant.components.recorder as recorder @@ -8,45 +14,29 @@ DEPENDENCIES = ['recorder', 'http'] def last_5_states(entity_id): """ Return the last 5 states for entity_id. """ - return recorder.query_states( - "SELECT * FROM states WHERE entity_id=? AND " - "last_changed=last_updated " - "ORDER BY last_changed DESC LIMIT 0, 5", (entity_id, )) + query = """ + SELECT * FROM states WHERE entity_id=? AND + last_changed=last_updated AND {} + ORDER BY last_changed DESC LIMIT 0, 5 + """.format(recorder.limit_to_run()) - -def last_5_events(): - """ Return the last 5 events (dev method). """ - return recorder.query_events( - "SELECT * FROM events ORDER BY created DESC LIMIT 0, 5") - - -def states_history(point_in_time): - """ Return states at a specific point in time. """ - # Find homeassistant.start before point_in_time - # Find last state for each entity after homeassistant.start - # Ignore all states where state == '' - pass + return recorder.query_states(query, (entity_id, )) def setup(hass, config): """ Setup history hooks. """ hass.http.register_path( 'GET', - re.compile('/api/component/recorder/(?P[a-zA-Z\._0-9]+)/last_5_states'), - _api_last_5_states), - hass.http.register_path( - 'GET', - re.compile('/api/component/recorder/last_5_events'), - _api_last_5_events), + re.compile( + r'/api/history/(?P[a-zA-Z\._0-9]+)/recent_states'), + _api_last_5_states) + + return True # pylint: disable=invalid-name def _api_last_5_states(handler, path_match, data): + """ Return the last 5 states for an entity id as JSON. """ entity_id = path_match.group('entity_id') handler.write_json(list(last_5_states(entity_id))) - - -# pylint: disable=invalid-name -def _api_last_5_events(handler, path_match, data): - handler.write_json(list(last_5_events)) diff --git a/homeassistant/components/recorder.py b/homeassistant/components/recorder.py index f202ffb5346..5480e634384 100644 --- a/homeassistant/components/recorder.py +++ b/homeassistant/components/recorder.py @@ -1,3 +1,10 @@ +""" +homeassistant.components.recorder +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Component that records all events and state changes. +Allows other components to query this database. +""" import logging import threading import queue @@ -5,6 +12,7 @@ import sqlite3 from datetime import datetime import time import json +import atexit from homeassistant import Event, EventOrigin, State from homeassistant.remote import JSONEncoder @@ -16,25 +24,36 @@ DOMAIN = "recorder" DEPENDENCIES = [] DB_FILE = 'home-assistant.db' + +RETURN_ROWCOUNT = "rowcount" +RETURN_LASTROWID = "lastrowid" +RETURN_ONE_ROW = "one_row" + _INSTANCE = None _LOGGER = logging.getLogger(__name__) -def query(query, arguments): +def query(sql_query, arguments=None): """ Query the database. """ - verify_instance() + _verify_instance() - return _INSTANCE.query(query, arguments) + return _INSTANCE.query(sql_query, arguments) -def query_states(state_query, arguments): +def query_states(state_query, arguments=None): """ Query the database and return a list of states. """ - return filter(None, (row_to_state(row) for row in query(state_query, arguments))) + return ( + row for row in + (row_to_state(row) for row in query(state_query, arguments)) + if row is not None) -def query_events(event_query, arguments): +def query_events(event_query, arguments=None): """ Query the database and return a list of states. """ - return filter(None, (row_to_event(row) for row in query(event_query, arguments))) + return ( + row for row in + (row_to_event(row) for row in query(event_query, arguments)) + if row is not None) def row_to_state(row): @@ -44,6 +63,7 @@ def row_to_state(row): row[1], row[2], json.loads(row[3]), datetime.fromtimestamp(row[4])) except ValueError: # When json.loads fails + _LOGGER.exception("Error converting row to state: %s", row) return None @@ -53,17 +73,48 @@ def row_to_event(row): return Event(row[1], json.loads(row[2]), EventOrigin[row[3].lower()]) except ValueError: # When json.oads fails + _LOGGER.exception("Error converting row to event: %s", row) return None -def verify_instance(): - """ Raise error if recorder is not setup. """ - if _INSTANCE is None: - raise RuntimeError("Recorder not initialized.") +def limit_to_run(point_in_time=None): + """ + Returns a WHERE partial that will limit query to a run. + A run starts when Home Assistant starts and ends when it stops. + """ + _verify_instance() + + end_event = None + + # Targetting current run + if point_in_time is None: + return "created >= {}".format( + _adapt_datetime(_INSTANCE.recording_start)) + + start_event = query( + ("SELECT * FROM events WHERE event_type = ? AND created < ? " + "ORDER BY created DESC LIMIT 0, 1"), + (EVENT_HOMEASSISTANT_START, point_in_time))[0] + + end_query = query( + ("SELECT * FROM events WHERE event_type = ? AND created > ? " + "ORDER BY created ASC LIMIT 0, 1"), + (EVENT_HOMEASSISTANT_START, point_in_time)) + + if end_query: + end_event = end_query[0] + + where_part = "created >= {}".format(start_event['created']) + + if end_event is None: + return where_part + else: + return "{} and created < {}".format(where_part, end_event['created']) def setup(hass, config): """ Setup the recorder. """ + # pylint: disable=global-statement global _INSTANCE _INSTANCE = Recorder(hass) @@ -83,11 +134,11 @@ class Recorder(threading.Thread): self.queue = queue.Queue() self.quit_object = object() self.lock = threading.Lock() + self.recording_start = datetime.now() def start_recording(event): """ Start recording. """ self.start() - hass.states.set('paulus.held', 'juist', {'nou en': 'bier'}) hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_recording) hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown) @@ -96,11 +147,13 @@ class Recorder(threading.Thread): def run(self): """ Start processing events to save. """ self._setup_connection() + self._setup_run() while True: event = self.queue.get() if event == self.quit_object: + self._close_run() self._close_connection() return @@ -150,70 +203,132 @@ class Recorder(threading.Thread): "event_type, event_data, origin, created" ") values (?, ?, ?, ?)", info) - def query(self, query, data=None): + def query(self, sql_query, data=None, return_value=None): """ Query the database. """ try: with self.conn, self.lock: + _LOGGER.info("Running query %s", sql_query) + cur = self.conn.cursor() if data is not None: - cur.execute(query, data) + cur.execute(sql_query, data) else: - cur.execute(query) + cur.execute(sql_query) - return cur.fetchall() + if return_value == RETURN_ROWCOUNT: + return cur.rowcount + elif return_value == RETURN_LASTROWID: + return cur.lastrowid + elif return_value == RETURN_ONE_ROW: + return cur.fetchone() + else: + return cur.fetchall() except sqlite3.IntegrityError: - _LOGGER.exception("Error querying the database using: %s", query) + _LOGGER.exception( + "Error querying the database using: %s", sql_query) return [] def _setup_connection(self): """ Ensure database is ready to fly. """ db_path = self.hass.get_config_path(DB_FILE) self.conn = sqlite3.connect(db_path, check_same_thread=False) + self.conn.row_factory = sqlite3.Row + + # Make sure the database is closed whenever Python exits + # without the STOP event being fired. + atexit.register(self._close_connection) # Have datetime objects be saved as integers - sqlite3.register_adapter(datetime, adapt_datetime) + sqlite3.register_adapter(datetime, _adapt_datetime) # Validate we are on the correct schema or that we have to migrate - c = self.conn.cursor() + cur = self.conn.cursor() def save_migration(migration_id): - c.execute('INSERT INTO schema_version VALUES (?, ?)', - (migration_id, datetime.now())) + """ Save and commit a migration to the database. """ + cur.execute('INSERT INTO schema_version VALUES (?, ?)', + (migration_id, datetime.now())) self.conn.commit() _LOGGER.info("Database migrated to version %d", migration_id) try: - c.execute('SELECT max(migration_id) FROM schema_version;') - migration_id = c.fetchone()[0] or 0 + cur.execute('SELECT max(migration_id) FROM schema_version;') + migration_id = cur.fetchone()[0] or 0 except sqlite3.OperationalError: # The table does not exist - c.execute('CREATE TABLE schema_version ' - '(migration_id integer primary key, performed integer)') + cur.execute('CREATE TABLE schema_version (' + 'migration_id integer primary key, performed integer)') migration_id = 0 if migration_id < 1: - c.execute( - 'CREATE TABLE events (event_id integer primary key, ' - 'event_type text, event_data text, origin text, ' - 'created integer)') - c.execute('CREATE INDEX events__event_type ON events(event_type)') + cur.execute(""" + CREATE TABLE recorder_runs ( + run_id integer primary key, + start integer, + end integer, + closed_incorrect integer default 0, + created integer) + """) - c.execute( - 'CREATE TABLE states (state_id integer primary key, ' - 'entity_id text, state text, attributes text, ' - 'last_changed integer, last_updated integer, created integer)') - c.execute('CREATE INDEX states__entity_id ON states(entity_id)') + cur.execute(""" + CREATE TABLE events ( + event_id integer primary key, + event_type text, + event_data text, + origin text, + created integer) + """) + cur.execute( + 'CREATE INDEX events__event_type ON events(event_type)') + + cur.execute(""" + CREATE TABLE states ( + state_id integer primary key, + entity_id text, + state text, + attributes text, + last_changed integer, + last_updated integer, + created integer) + """) + cur.execute('CREATE INDEX states__entity_id ON states(entity_id)') save_migration(1) def _close_connection(self): + """ Close connection to the database. """ _LOGGER.info("Closing database") + atexit.unregister(self._close_connection) self.conn.close() + def _setup_run(self): + """ Log the start of the current run. """ + if self.query("""UPDATE recorder_runs SET end=?, closed_incorrect=1 + WHERE end IS NULL""", (self.recording_start, ), + return_value=RETURN_ROWCOUNT): -# Setup datetime to save as a integer -def adapt_datetime(ts): - return time.mktime(ts.timetuple()) + _LOGGER.warning("Found unfinished sessions") + + self.query( + "INSERT INTO recorder_runs (start, created) VALUES (?, ?)", + (self.recording_start, datetime.now())) + + def _close_run(self): + """ Save end time for current run. """ + self.query( + "UPDATE recorder_runs SET end=? WHERE start=?", + (datetime.now(), self.recording_start)) + + +def _adapt_datetime(datetimestamp): + """ Turn a datetime into an integer for in the DB. """ + return time.mktime(datetimestamp.timetuple()) + + +def _verify_instance(): + """ throws error if recorder not initialized. """ + if _INSTANCE is None: + raise RuntimeError("Recorder not initialized.")