Switch to SQLAlchemy for the Recorder component. Gives the ability t… (#2377)

* Switch to SQLAlchemy for the Recorder component.  Gives the ability to use MySQL or other.

* fixes for failed lint

* add conversion script

* code review fixes and refactor to use to_native() model methods and execute() helper

* move script to homeassistant.scripts module

* style fixes my tox lint/flake8 missed

* move exclusion up
This commit is contained in:
rhooper 2016-07-02 14:22:51 -04:00 committed by Paulus Schoutsen
parent a65f196d19
commit a2e45b8fdd
11 changed files with 779 additions and 639 deletions

View file

@ -3,6 +3,7 @@ source = homeassistant
omit =
homeassistant/__main__.py
homeassistant/scripts/*.py
# omit pieces of code that rely on external devices being present
homeassistant/components/apcupsd.py

View file

@ -9,8 +9,8 @@ from collections import defaultdict
from datetime import timedelta
from itertools import groupby
from homeassistant.components import recorder, script
import homeassistant.util.dt as dt_util
from homeassistant.components import recorder, script
from homeassistant.components.http import HomeAssistantView
DOMAIN = 'history'
@ -27,13 +27,12 @@ def last_5_states(entity_id):
"""Return the last 5 states for entity_id."""
entity_id = entity_id.lower()
query = """
SELECT * FROM states WHERE entity_id=? AND
last_changed=last_updated
ORDER BY state_id DESC LIMIT 0, 5
"""
return recorder.query_states(query, (entity_id, ))
states = recorder.get_model('States')
return recorder.execute(
recorder.query('States').filter(
(states.entity_id == entity_id) &
(states.last_changed == states.last_updated)
).order_by(states.state_id.desc()).limit(5))
def get_significant_states(start_time, end_time=None, entity_id=None):
@ -44,48 +43,42 @@ def get_significant_states(start_time, end_time=None, entity_id=None):
as well as all states from certain domains (for instance
thermostat so that we get current temperature in our graphs).
"""
where = """
(domain IN ({}) OR last_changed=last_updated)
AND domain NOT IN ({}) AND last_updated > ?
""".format(",".join("'%s'" % x for x in SIGNIFICANT_DOMAINS),
",".join("'%s'" % x for x in IGNORE_DOMAINS))
data = [start_time]
states = recorder.get_model('States')
query = recorder.query('States').filter(
(states.domain.in_(SIGNIFICANT_DOMAINS) |
(states.last_changed == states.last_updated)) &
((~states.domain.in_(IGNORE_DOMAINS)) &
(states.last_updated > start_time)))
if end_time is not None:
where += "AND last_updated < ? "
data.append(end_time)
query = query.filter(states.last_updated < end_time)
if entity_id is not None:
where += "AND entity_id = ? "
data.append(entity_id.lower())
query = query.filter_by(entity_id=entity_id.lower())
query = ("SELECT * FROM states WHERE {} "
"ORDER BY entity_id, last_updated ASC").format(where)
states = (state for state in recorder.query_states(query, data)
if _is_significant(state))
states = (
state for state in recorder.execute(
query.order_by(states.entity_id, states.last_updated))
if _is_significant(state))
return states_to_json(states, start_time, entity_id)
def state_changes_during_period(start_time, end_time=None, entity_id=None):
"""Return states changes during UTC period start_time - end_time."""
where = "last_changed=last_updated AND last_changed > ? "
data = [start_time]
states = recorder.get_model('States')
query = recorder.query('States').filter(
(states.last_changed == states.last_updated) &
(states.last_changed > start_time))
if end_time is not None:
where += "AND last_changed < ? "
data.append(end_time)
query = query.filter(states.last_updated < end_time)
if entity_id is not None:
where += "AND entity_id = ? "
data.append(entity_id.lower())
query = query.filter_by(entity_id=entity_id.lower())
query = ("SELECT * FROM states WHERE {} "
"ORDER BY entity_id, last_changed ASC").format(where)
states = recorder.query_states(query, data)
states = recorder.execute(
query.order_by(states.entity_id, states.last_updated))
return states_to_json(states, start_time, entity_id)
@ -99,24 +92,27 @@ def get_states(utc_point_in_time, entity_ids=None, run=None):
if run is None:
return []
where = run.where_after_start_run + "AND created < ? "
where_data = [utc_point_in_time]
from sqlalchemy import and_, func
states = recorder.get_model('States')
most_recent_state_ids = recorder.query(
func.max(states.state_id).label('max_state_id')
).filter(
(states.created >= run.start) &
(states.created < utc_point_in_time)
)
if entity_ids is not None:
where += "AND entity_id IN ({}) ".format(
",".join(['?'] * len(entity_ids)))
where_data.extend(entity_ids)
most_recent_state_ids = most_recent_state_ids.filter(
states.entity_id.in_(entity_ids))
query = """
SELECT * FROM states
INNER JOIN (
SELECT max(state_id) AS max_state_id
FROM states WHERE {}
GROUP BY entity_id)
WHERE state_id = max_state_id
""".format(where)
most_recent_state_ids = most_recent_state_ids.group_by(
states.entity_id).subquery()
return recorder.query_states(query, where_data)
query = recorder.query('States').join(most_recent_state_ids, and_(
states.state_id == most_recent_state_ids.c.max_state_id))
return recorder.execute(query)
def states_to_json(states, start_time, entity_id):

View file

@ -11,27 +11,23 @@ from itertools import groupby
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
from homeassistant.components import recorder, sun
from homeassistant.const import (
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED,
STATE_NOT_HOME, STATE_OFF, STATE_ON)
from homeassistant.components.http import HomeAssistantView
from homeassistant.const import (EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED,
STATE_NOT_HOME, STATE_OFF, STATE_ON)
from homeassistant.core import DOMAIN as HA_DOMAIN
from homeassistant.core import State
from homeassistant.helpers.entity import split_entity_id
from homeassistant.helpers import template
import homeassistant.helpers.config_validation as cv
from homeassistant.components.http import HomeAssistantView
from homeassistant.helpers.entity import split_entity_id
DOMAIN = "logbook"
DEPENDENCIES = ['recorder', 'http']
URL_LOGBOOK = re.compile(r'/api/logbook(?:/(?P<date>\d{4}-\d{1,2}-\d{1,2})|)')
QUERY_EVENTS_BETWEEN = """
SELECT * FROM events WHERE time_fired > ? AND time_fired < ?
"""
_LOGGER = logging.getLogger(__name__)
EVENT_LOGBOOK_ENTRY = 'logbook_entry'
@ -100,9 +96,11 @@ class LogbookView(HomeAssistantView):
end_day = start_day + timedelta(days=1)
events = recorder.query_events(
QUERY_EVENTS_BETWEEN,
(dt_util.as_utc(start_day), dt_util.as_utc(end_day)))
events = recorder.get_model('Events')
query = recorder.query('Events').filter(
(events.time_fired > start_day) &
(events.time_fired < end_day))
events = recorder.execute(query)
return self.json(humanify(events))

View file

@ -1,529 +0,0 @@
"""
Support for recording details.
Component that records all events and state changes. Allows other components
to query this database.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/recorder/
"""
import atexit
import json
import logging
import queue
import sqlite3
import threading
from datetime import date, datetime, timedelta
import voluptuous as vol
import homeassistant.util.dt as dt_util
from homeassistant.const import (
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED,
EVENT_TIME_CHANGED, MATCH_ALL)
from homeassistant.core import Event, EventOrigin, State
from homeassistant.remote import JSONEncoder
from homeassistant.helpers.event import track_point_in_utc_time
DOMAIN = "recorder"
DB_FILE = 'home-assistant.db'
RETURN_ROWCOUNT = "rowcount"
RETURN_LASTROWID = "lastrowid"
RETURN_ONE_ROW = "one_row"
CONF_PURGE_DAYS = "purge_days"
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Optional(CONF_PURGE_DAYS): vol.All(vol.Coerce(int),
vol.Range(min=1)),
})
}, extra=vol.ALLOW_EXTRA)
_INSTANCE = None
_LOGGER = logging.getLogger(__name__)
def query(sql_query, arguments=None):
"""Query the database."""
_verify_instance()
return _INSTANCE.query(sql_query, arguments)
def query_states(state_query, arguments=None):
"""Query the database and return a list of states."""
return [
row for row in
(row_to_state(row) for row in query(state_query, arguments))
if row is not None]
def query_events(event_query, arguments=None):
"""Query the database and return a list of states."""
return [
row for row in
(row_to_event(row) for row in query(event_query, arguments))
if row is not None]
def row_to_state(row):
"""Convert a database row to a state."""
try:
return State(
row[1], row[2], json.loads(row[3]),
dt_util.utc_from_timestamp(row[4]),
dt_util.utc_from_timestamp(row[5]))
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting row to state: %s", row)
return None
def row_to_event(row):
"""Convert a databse row to an event."""
try:
return Event(row[1], json.loads(row[2]), EventOrigin(row[3]),
dt_util.utc_from_timestamp(row[5]))
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting row to event: %s", row)
return None
def run_information(point_in_time=None):
"""Return information about current run.
There is also the run that covers point_in_time.
"""
_verify_instance()
if point_in_time is None or point_in_time > _INSTANCE.recording_start:
return RecorderRun()
run = _INSTANCE.query(
"SELECT * FROM recorder_runs WHERE start<? AND END>?",
(point_in_time, point_in_time), return_value=RETURN_ONE_ROW)
return RecorderRun(run) if run else None
def setup(hass, config):
"""Setup the recorder."""
# pylint: disable=global-statement
global _INSTANCE
purge_days = config.get(DOMAIN, {}).get(CONF_PURGE_DAYS)
_INSTANCE = Recorder(hass, purge_days=purge_days)
return True
class RecorderRun(object):
"""Representation of a recorder run."""
def __init__(self, row=None):
"""Initialize the recorder run."""
self.end = None
if row is None:
self.start = _INSTANCE.recording_start
self.closed_incorrect = False
else:
self.start = dt_util.utc_from_timestamp(row[1])
if row[2] is not None:
self.end = dt_util.utc_from_timestamp(row[2])
self.closed_incorrect = bool(row[3])
def entity_ids(self, point_in_time=None):
"""Return the entity ids that existed in this run.
Specify point_in_time if you want to know which existed at that point
in time inside the run.
"""
where = self.where_after_start_run
where_data = []
if point_in_time is not None or self.end is not None:
where += "AND created < ? "
where_data.append(point_in_time or self.end)
return [row[0] for row in query(
"SELECT entity_id FROM states WHERE {}"
"GROUP BY entity_id".format(where), where_data)]
@property
def where_after_start_run(self):
"""Return SQL WHERE clause.
Selection of the rows created after the start of the run.
"""
return "created >= {} ".format(_adapt_datetime(self.start))
@property
def where_limit_to_run(self):
"""Return a SQL WHERE clause.
For limiting the results to this run.
"""
where = self.where_after_start_run
if self.end is not None:
where += "AND created < {} ".format(_adapt_datetime(self.end))
return where
class Recorder(threading.Thread):
"""A threaded recorder class."""
# pylint: disable=too-many-instance-attributes
def __init__(self, hass, purge_days):
"""Initialize the recorder."""
threading.Thread.__init__(self)
self.hass = hass
self.purge_days = purge_days
self.conn = None
self.queue = queue.Queue()
self.quit_object = object()
self.lock = threading.Lock()
self.recording_start = dt_util.utcnow()
self.utc_offset = dt_util.now().utcoffset().total_seconds()
self.db_path = self.hass.config.path(DB_FILE)
def start_recording(event):
"""Start recording."""
self.start()
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_recording)
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown)
hass.bus.listen(MATCH_ALL, self.event_listener)
def run(self):
"""Start processing events to save."""
self._setup_connection()
self._setup_run()
if self.purge_days is not None:
track_point_in_utc_time(self.hass,
lambda now: self._purge_old_data(),
dt_util.utcnow() + timedelta(minutes=5))
while True:
event = self.queue.get()
if event == self.quit_object:
self._close_run()
self._close_connection()
self.queue.task_done()
return
elif event.event_type == EVENT_TIME_CHANGED:
self.queue.task_done()
continue
event_id = self.record_event(event)
if event.event_type == EVENT_STATE_CHANGED:
self.record_state(
event.data['entity_id'], event.data.get('new_state'),
event_id)
self.queue.task_done()
def event_listener(self, event):
"""Listen for new events and put them in the process queue."""
self.queue.put(event)
def shutdown(self, event):
"""Tell the recorder to shut down."""
self.queue.put(self.quit_object)
self.block_till_done()
def record_state(self, entity_id, state, event_id):
"""Save a state to the database."""
now = dt_util.utcnow()
# State got deleted
if state is None:
state_state = ''
state_domain = ''
state_attr = '{}'
last_changed = last_updated = now
else:
state_domain = state.domain
state_state = state.state
state_attr = json.dumps(dict(state.attributes))
last_changed = state.last_changed
last_updated = state.last_updated
info = (
entity_id, state_domain, state_state, state_attr,
last_changed, last_updated,
now, self.utc_offset, event_id)
self.query(
"""
INSERT INTO states (
entity_id, domain, state, attributes, last_changed, last_updated,
created, utc_offset, event_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
info)
def record_event(self, event):
"""Save an event to the database."""
info = (
event.event_type, json.dumps(event.data, cls=JSONEncoder),
str(event.origin), dt_util.utcnow(), event.time_fired,
self.utc_offset
)
return self.query(
"INSERT INTO events ("
"event_type, event_data, origin, created, time_fired, utc_offset"
") VALUES (?, ?, ?, ?, ?, ?)", info, RETURN_LASTROWID)
def query(self, sql_query, data=None, return_value=None):
"""Query the database."""
try:
with self.conn, self.lock:
_LOGGER.debug("Running query %s", sql_query)
cur = self.conn.cursor()
if data is not None:
cur.execute(sql_query, data)
else:
cur.execute(sql_query)
if return_value == RETURN_ROWCOUNT:
return cur.rowcount
elif return_value == RETURN_LASTROWID:
return cur.lastrowid
elif return_value == RETURN_ONE_ROW:
return cur.fetchone()
else:
return cur.fetchall()
except (sqlite3.IntegrityError, sqlite3.OperationalError,
sqlite3.ProgrammingError):
_LOGGER.exception(
"Error querying the database using: %s", sql_query)
return []
def block_till_done(self):
"""Block till all events processed."""
self.queue.join()
def _setup_connection(self):
"""Ensure database is ready to fly."""
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
# Make sure the database is closed whenever Python exits
# without the STOP event being fired.
atexit.register(self._close_connection)
# Have datetime objects be saved as integers.
sqlite3.register_adapter(date, _adapt_datetime)
sqlite3.register_adapter(datetime, _adapt_datetime)
# Validate we are on the correct schema or that we have to migrate.
cur = self.conn.cursor()
def save_migration(migration_id):
"""Save and commit a migration to the database."""
cur.execute('INSERT INTO schema_version VALUES (?, ?)',
(migration_id, dt_util.utcnow()))
self.conn.commit()
_LOGGER.info("Database migrated to version %d", migration_id)
try:
cur.execute('SELECT max(migration_id) FROM schema_version;')
migration_id = cur.fetchone()[0] or 0
except sqlite3.OperationalError:
# The table does not exist.
cur.execute('CREATE TABLE schema_version ('
'migration_id integer primary key, performed integer)')
migration_id = 0
if migration_id < 1:
cur.execute("""
CREATE TABLE recorder_runs (
run_id integer primary key,
start integer,
end integer,
closed_incorrect integer default 0,
created integer)
""")
cur.execute("""
CREATE TABLE events (
event_id integer primary key,
event_type text,
event_data text,
origin text,
created integer)
""")
cur.execute(
'CREATE INDEX events__event_type ON events(event_type)')
cur.execute("""
CREATE TABLE states (
state_id integer primary key,
entity_id text,
state text,
attributes text,
last_changed integer,
last_updated integer,
created integer)
""")
cur.execute('CREATE INDEX states__entity_id ON states(entity_id)')
save_migration(1)
if migration_id < 2:
cur.execute("""
ALTER TABLE events
ADD COLUMN time_fired integer
""")
cur.execute('UPDATE events SET time_fired=created')
save_migration(2)
if migration_id < 3:
utc_offset = self.utc_offset
cur.execute("""
ALTER TABLE recorder_runs
ADD COLUMN utc_offset integer
""")
cur.execute("""
ALTER TABLE events
ADD COLUMN utc_offset integer
""")
cur.execute("""
ALTER TABLE states
ADD COLUMN utc_offset integer
""")
cur.execute("UPDATE recorder_runs SET utc_offset=?", [utc_offset])
cur.execute("UPDATE events SET utc_offset=?", [utc_offset])
cur.execute("UPDATE states SET utc_offset=?", [utc_offset])
save_migration(3)
if migration_id < 4:
# We had a bug where we did not save utc offset for recorder runs.
cur.execute(
"""UPDATE recorder_runs SET utc_offset=?
WHERE utc_offset IS NULL""", [self.utc_offset])
cur.execute("""
ALTER TABLE states
ADD COLUMN event_id integer
""")
save_migration(4)
if migration_id < 5:
# Add domain so that thermostat graphs look right.
try:
cur.execute("""
ALTER TABLE states
ADD COLUMN domain text
""")
except sqlite3.OperationalError:
# We had a bug in this migration for a while on dev.
# Without this, dev-users will have to throw away their db.
pass
# TravisCI has Python compiled against an old version of SQLite3
# which misses the instr method.
self.conn.create_function(
"instr", 2,
lambda string, substring: string.find(substring) + 1)
# Populate domain with defaults.
cur.execute("""
UPDATE states
set domain=substr(entity_id, 0, instr(entity_id, '.'))
""")
# Add indexes we are going to use a lot on selects.
cur.execute("""
CREATE INDEX states__state_changes ON
states (last_changed, last_updated, entity_id)""")
cur.execute("""
CREATE INDEX states__significant_changes ON
states (domain, last_updated, entity_id)""")
save_migration(5)
def _close_connection(self):
"""Close connection to the database."""
_LOGGER.info("Closing database")
atexit.unregister(self._close_connection)
self.conn.close()
def _setup_run(self):
"""Log the start of the current run."""
if self.query("""UPDATE recorder_runs SET end=?, closed_incorrect=1
WHERE end IS NULL""", (self.recording_start, ),
return_value=RETURN_ROWCOUNT):
_LOGGER.warning("Found unfinished sessions")
self.query(
"""INSERT INTO recorder_runs (start, created, utc_offset)
VALUES (?, ?, ?)""",
(self.recording_start, dt_util.utcnow(), self.utc_offset))
def _close_run(self):
"""Save end time for current run."""
self.query(
"UPDATE recorder_runs SET end=? WHERE start=?",
(dt_util.utcnow(), self.recording_start))
def _purge_old_data(self):
"""Purge events and states older than purge_days ago."""
if not self.purge_days or self.purge_days < 1:
_LOGGER.debug("purge_days set to %s, will not purge any old data.",
self.purge_days)
return
purge_before = dt_util.utcnow() - timedelta(days=self.purge_days)
_LOGGER.info("Purging events created before %s", purge_before)
deleted_rows = self.query(
sql_query="DELETE FROM events WHERE created < ?;",
data=(int(purge_before.timestamp()),),
return_value=RETURN_ROWCOUNT)
_LOGGER.debug("Deleted %s events", deleted_rows)
_LOGGER.info("Purging states created before %s", purge_before)
deleted_rows = self.query(
sql_query="DELETE FROM states WHERE created < ?;",
data=(int(purge_before.timestamp()),),
return_value=RETURN_ROWCOUNT)
_LOGGER.debug("Deleted %s states", deleted_rows)
# Execute sqlite vacuum command to free up space on disk
self.query("VACUUM;")
def _adapt_datetime(datetimestamp):
"""Turn a datetime into an integer for in the DB."""
return dt_util.as_utc(datetimestamp).timestamp()
def _verify_instance():
"""Throw error if recorder not initialized."""
if _INSTANCE is None:
raise RuntimeError("Recorder not initialized.")

View file

@ -0,0 +1,315 @@
"""
Support for recording details.
Component that records all events and state changes. Allows other components
to query this database.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/recorder/
"""
import logging
import queue
import threading
import time
from datetime import timedelta
import voluptuous as vol
import homeassistant.util.dt as dt_util
from homeassistant.const import (EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED,
EVENT_TIME_CHANGED, MATCH_ALL)
from homeassistant.helpers.event import track_point_in_utc_time
DOMAIN = "recorder"
REQUIREMENTS = ['sqlalchemy==1.0.13']
DEFAULT_URL = "sqlite:///{hass_config_path}"
DEFAULT_DB_FILE = "home-assistant_v2.db"
CONF_DB_URL = "db_url"
CONF_PURGE_DAYS = "purge_days"
RETRIES = 3
CONNECT_RETRY_WAIT = 10
QUERY_RETRY_WAIT = 0.1
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Optional(CONF_PURGE_DAYS): vol.All(vol.Coerce(int),
vol.Range(min=1)),
vol.Optional(CONF_DB_URL): vol.Url(''),
})
}, extra=vol.ALLOW_EXTRA)
_INSTANCE = None
_LOGGER = logging.getLogger(__name__)
# These classes will be populated during setup()
# pylint: disable=invalid-name
Session = None
def execute(q):
"""Query the database and convert the objects to HA native form.
This method also retries a few times in the case of stale connections.
"""
import sqlalchemy.exc
for _ in range(0, RETRIES):
try:
return [
row for row in
(row.to_native() for row in q)
if row is not None]
except sqlalchemy.exc.SQLAlchemyError as e:
log_error(e, retry_wait=QUERY_RETRY_WAIT, rollback=True)
return []
def run_information(point_in_time=None):
"""Return information about current run.
There is also the run that covers point_in_time.
"""
_verify_instance()
recorder_runs = get_model('RecorderRuns')
if point_in_time is None or point_in_time > _INSTANCE.recording_start:
return recorder_runs(
end=None,
start=_INSTANCE.recording_start,
closed_incorrect=False)
return query('RecorderRuns').filter(
(recorder_runs.start < point_in_time) &
(recorder_runs.end > point_in_time)).first()
def setup(hass, config):
"""Setup the recorder."""
# pylint: disable=global-statement
# pylint: disable=too-many-locals
global _INSTANCE
purge_days = config.get(DOMAIN, {}).get(CONF_PURGE_DAYS)
db_url = config.get(DOMAIN, {}).get(CONF_DB_URL, None)
if not db_url:
db_url = DEFAULT_URL.format(
hass_config_path=hass.config.path(DEFAULT_DB_FILE))
_INSTANCE = Recorder(hass, purge_days=purge_days, uri=db_url)
return True
def query(model_name, *args):
"""Helper to return a query handle."""
if isinstance(model_name, str):
return Session().query(get_model(model_name), *args)
return Session().query(model_name, *args)
def get_model(model_name):
"""Get a model class."""
from homeassistant.components.recorder import models
return getattr(models, model_name)
def log_error(e, retry_wait=0, rollback=True,
message="Error during query: %s"):
"""Log about SQLAlchemy errors in a sane manner."""
import sqlalchemy.exc
if not isinstance(e, sqlalchemy.exc.OperationalError):
_LOGGER.exception(e)
else:
_LOGGER.error(message, str(e))
if rollback:
Session().rollback()
if retry_wait:
_LOGGER.info("Retrying failed query in %s seconds", QUERY_RETRY_WAIT)
time.sleep(retry_wait)
class Recorder(threading.Thread):
"""A threaded recorder class."""
# pylint: disable=too-many-instance-attributes
def __init__(self, hass, purge_days, uri):
"""Initialize the recorder."""
threading.Thread.__init__(self)
self.hass = hass
self.purge_days = purge_days
self.queue = queue.Queue()
self.quit_object = object()
self.recording_start = dt_util.utcnow()
self.db_url = uri
self.db_ready = threading.Event()
self.engine = None
self._run = None
def start_recording(event):
"""Start recording."""
self.start()
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_recording)
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown)
hass.bus.listen(MATCH_ALL, self.event_listener)
def run(self):
"""Start processing events to save."""
from homeassistant.components.recorder.models import Events, States
import sqlalchemy.exc
while True:
try:
self._setup_connection()
self._setup_run()
break
except sqlalchemy.exc.SQLAlchemyError as e:
log_error(e, retry_wait=CONNECT_RETRY_WAIT, rollback=False,
message="Error during connection setup: %s")
if self.purge_days is not None:
track_point_in_utc_time(self.hass,
lambda now: self._purge_old_data(),
dt_util.utcnow() + timedelta(minutes=5))
while True:
event = self.queue.get()
if event == self.quit_object:
self._close_run()
self.queue.task_done()
return
elif event.event_type == EVENT_TIME_CHANGED:
self.queue.task_done()
continue
for _ in range(0, RETRIES):
try:
event_id = Events.record_event(Session, event)
break
except sqlalchemy.exc.OperationalError as e:
log_error(e, retry_wait=QUERY_RETRY_WAIT, rollback=True)
if event.event_type == EVENT_STATE_CHANGED:
for _ in range(0, RETRIES):
try:
States.record_state(
Session,
event.data['entity_id'],
event.data.get('new_state'),
event_id)
break
except sqlalchemy.exc.OperationalError as e:
log_error(e, retry_wait=QUERY_RETRY_WAIT,
rollback=True)
self.queue.task_done()
def event_listener(self, event):
"""Listen for new events and put them in the process queue."""
self.queue.put(event)
def shutdown(self, event):
"""Tell the recorder to shut down."""
self.queue.put(self.quit_object)
self.block_till_done()
def block_till_done(self):
"""Block till all events processed."""
self.queue.join()
def block_till_db_ready(self):
"""Block until the database session is ready."""
self.db_ready.wait()
def _setup_connection(self):
"""Ensure database is ready to fly."""
# pylint: disable=global-statement
global Session
import homeassistant.components.recorder.models as models
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
if self.db_url == 'sqlite://' or ':memory:' in self.db_url:
from sqlalchemy.pool import StaticPool
self.engine = create_engine(
'sqlite://',
connect_args={'check_same_thread': False},
poolclass=StaticPool)
else:
self.engine = create_engine(self.db_url, echo=False)
models.Base.metadata.create_all(self.engine)
session_factory = sessionmaker(bind=self.engine)
Session = scoped_session(session_factory)
self.db_ready.set()
def _setup_run(self):
"""Log the start of the current run."""
recorder_runs = get_model('RecorderRuns')
for run in query('RecorderRuns').filter_by(end=None):
run.closed_incorrect = True
run.end = self.recording_start
_LOGGER.warning("Ended unfinished session (id=%s from %s)",
run.run_id, run.start)
Session().add(run)
_LOGGER.warning("Found unfinished sessions")
self._run = recorder_runs(
start=self.recording_start,
created=dt_util.utcnow()
)
Session().add(self._run)
Session().commit()
def _close_run(self):
"""Save end time for current run."""
self._run.end = dt_util.utcnow()
Session().add(self._run)
Session().commit()
self._run = None
def _purge_old_data(self):
"""Purge events and states older than purge_days ago."""
from homeassistant.components.recorder.models import Events, States
if not self.purge_days or self.purge_days < 1:
_LOGGER.debug("purge_days set to %s, will not purge any old data.",
self.purge_days)
return
purge_before = dt_util.utcnow() - timedelta(days=self.purge_days)
_LOGGER.info("Purging events created before %s", purge_before)
deleted_rows = Session().query(Events).filter(
(Events.created < purge_before)).delete(synchronize_session=False)
_LOGGER.debug("Deleted %s events", deleted_rows)
_LOGGER.info("Purging states created before %s", purge_before)
deleted_rows = Session().query(States).filter(
(States.created < purge_before)).delete(synchronize_session=False)
_LOGGER.debug("Deleted %s states", deleted_rows)
Session().commit()
Session().expire_all()
# Execute sqlite vacuum command to free up space on disk
if self.engine.driver == 'sqlite':
_LOGGER.info("Vacuuming SQLite to free space")
self.engine.execute("VACUUM")
def _verify_instance():
"""Throw error if recorder not initialized."""
if _INSTANCE is None:
raise RuntimeError("Recorder not initialized.")

View file

@ -0,0 +1,151 @@
"""Models for SQLAlchemy."""
import json
from datetime import datetime
import logging
from sqlalchemy import (Boolean, Column, DateTime, ForeignKey, Index, Integer,
String, Text, distinct)
from sqlalchemy.ext.declarative import declarative_base
import homeassistant.util.dt as dt_util
from homeassistant.core import Event, EventOrigin, State
from homeassistant.remote import JSONEncoder
# SQLAlchemy Schema
# pylint: disable=invalid-name
Base = declarative_base()
_LOGGER = logging.getLogger(__name__)
class Events(Base):
# pylint: disable=too-few-public-methods
"""Event history data."""
__tablename__ = 'events'
event_id = Column(Integer, primary_key=True)
event_type = Column(String(32), index=True)
event_data = Column(Text)
origin = Column(String(32))
time_fired = Column(DateTime(timezone=True))
created = Column(DateTime(timezone=True), default=datetime.utcnow)
@staticmethod
def record_event(session, event):
"""Save an event to the database."""
dbevent = Events(event_type=event.event_type,
event_data=json.dumps(event.data, cls=JSONEncoder),
origin=str(event.origin),
time_fired=event.time_fired)
session.add(dbevent)
session.commit()
return dbevent.event_id
def to_native(self):
"""Convert to a natve HA Event."""
try:
return Event(
self.event_type,
json.loads(self.event_data),
EventOrigin(self.origin),
dt_util.UTC.localize(self.time_fired)
)
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting to event: %s", self)
return None
class States(Base):
# pylint: disable=too-few-public-methods
"""State change history."""
__tablename__ = 'states'
state_id = Column(Integer, primary_key=True)
domain = Column(String(64))
entity_id = Column(String(64))
state = Column(String(255))
attributes = Column(Text)
origin = Column(String(32))
event_id = Column(Integer, ForeignKey('events.event_id'))
last_changed = Column(DateTime(timezone=True), default=datetime.utcnow)
last_updated = Column(DateTime(timezone=True), default=datetime.utcnow)
created = Column(DateTime(timezone=True), default=datetime.utcnow)
__table_args__ = (Index('states__state_changes',
'last_changed', 'last_updated', 'entity_id'),
Index('states__significant_changes',
'domain', 'last_updated', 'entity_id'), )
@staticmethod
def record_state(session, entity_id, state, event_id):
"""Save a state to the database."""
now = dt_util.utcnow()
dbstate = States(event_id=event_id, entity_id=entity_id)
# State got deleted
if state is None:
dbstate.state = ''
dbstate.domain = ''
dbstate.attributes = '{}'
dbstate.last_changed = now
dbstate.last_updated = now
else:
dbstate.domain = state.domain
dbstate.state = state.state
dbstate.attributes = json.dumps(dict(state.attributes))
dbstate.last_changed = state.last_changed
dbstate.last_updated = state.last_updated
session().add(dbstate)
session().commit()
def to_native(self):
"""Convert to an HA state object."""
try:
return State(
self.entity_id, self.state,
json.loads(self.attributes),
dt_util.UTC.localize(self.last_changed),
dt_util.UTC.localize(self.last_updated)
)
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting row to state: %s", self)
return None
class RecorderRuns(Base):
# pylint: disable=too-few-public-methods
"""Representation of recorder run."""
__tablename__ = 'recorder_runs'
run_id = Column(Integer, primary_key=True)
start = Column(DateTime(timezone=True), default=datetime.utcnow)
end = Column(DateTime(timezone=True))
closed_incorrect = Column(Boolean, default=False)
created = Column(DateTime(timezone=True), default=datetime.utcnow)
def entity_ids(self, point_in_time=None):
"""Return the entity ids that existed in this run.
Specify point_in_time if you want to know which existed at that point
in time inside the run.
"""
from homeassistant.components.recorder import Session, _verify_instance
_verify_instance()
query = Session().query(distinct(States.entity_id)).filter(
States.created >= self.start)
if point_in_time is not None or self.end is not None:
query = query.filter(States.created < point_in_time)
return [row.entity_id for row in query]
def to_native(self):
"""Return self, native format is this model."""
return self

View file

@ -0,0 +1,187 @@
#!/usr/bin/env python
"""Script to convert an old-format home-assistant.db to a new format one."""
import argparse
import os.path
import sqlite3
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from homeassistant.components.recorder import models
import homeassistant.config as config_util
import homeassistant.util.dt as dt_util
def ts_to_dt(timestamp):
"""Turn a datetime into an integer for in the DB."""
if timestamp is None:
return None
return dt_util.utc_from_timestamp(timestamp)
# Based on code at
# http://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console
# pylint: disable=too-many-arguments
def print_progress(iteration, total, prefix='', suffix='', decimals=2,
bar_length=68):
"""Print progress bar.
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : number of decimals in percent complete (Int)
barLength - Optional : character length of bar (Int)
"""
filled_length = int(round(bar_length * iteration / float(total)))
percents = round(100.00 * (iteration / float(total)), decimals)
line = '#' * filled_length + '-' * (bar_length - filled_length)
sys.stdout.write('%s [%s] %s%s %s\r' % (prefix, line,
percents, '%', suffix))
sys.stdout.flush()
if iteration == total:
print("\n")
def main():
"""The actual script body."""
# pylint: disable=too-many-locals,invalid-name,too-many-statements
parser = argparse.ArgumentParser(
description="Home Assistant: Observe, Control, Automate.")
parser.add_argument(
'-c', '--config',
metavar='path_to_config_dir',
default=config_util.get_default_config_dir(),
help="Directory that contains the Home Assistant configuration")
parser.add_argument(
'-a', '--append',
action='store_true',
default=False,
help="Append to existing new format SQLite database")
parser.add_argument(
'--uri',
type=str,
help="Connect to URI and import (implies --append)"
"eg: mysql://localhost/homeassistant")
args = parser.parse_args()
config_dir = os.path.join(os.getcwd(), args.config)
# Test if configuration directory exists
if not os.path.isdir(config_dir):
if config_dir != config_util.get_default_config_dir():
print(('Fatal Error: Specified configuration directory does '
'not exist {} ').format(config_dir))
sys.exit(1)
else:
config_dir = config_util.get_default_config_dir()
src_db = '{}/home-assistant.db'.format(config_dir)
dst_db = '{}/home-assistant_v2.db'.format(config_dir)
if not os.path.exists(src_db):
print("Fatal Error: Old format database '{}' does not exist".format(
src_db))
sys.exit(1)
if not args.uri and (os.path.exists(dst_db) and not args.append):
print("Fatal Error: New format database '{}' exists already - "
"Remove it or use --append".format(dst_db))
print("Note: --append must maintain an ID mapping and is much slower")
print("and requires sufficient memory to track all event IDs")
sys.exit(1)
conn = sqlite3.connect(src_db)
uri = args.uri or "sqlite:///{}".format(dst_db)
engine = create_engine(uri, echo=False)
models.Base.metadata.create_all(engine)
session_factory = sessionmaker(bind=engine)
session = session_factory()
append = args.append or args.uri
c = conn.cursor()
c.execute("SELECT count(*) FROM recorder_runs")
num_rows = c.fetchone()[0]
print("Converting {} recorder_runs".format(num_rows))
c.close()
c = conn.cursor()
n = 0
for row in c.execute("SELECT * FROM recorder_runs"):
n += 1
session.add(models.RecorderRuns(
start=ts_to_dt(row[1]),
end=ts_to_dt(row[2]),
closed_incorrect=row[3],
created=ts_to_dt(row[4])
))
if n % 1000 == 0:
session.commit()
print_progress(n, num_rows)
print_progress(n, num_rows)
session.commit()
c.close()
c = conn.cursor()
c.execute("SELECT count(*) FROM events")
num_rows = c.fetchone()[0]
print("Converting {} events".format(num_rows))
c.close()
id_mapping = {}
c = conn.cursor()
n = 0
for row in c.execute("SELECT * FROM events"):
n += 1
o = models.Events(
event_type=row[1],
event_data=row[2],
origin=row[3],
created=ts_to_dt(row[4]),
time_fired=ts_to_dt(row[5]),
)
session.add(o)
if append:
session.flush()
id_mapping[row[0]] = o.event_id
if n % 1000 == 0:
session.commit()
print_progress(n, num_rows)
print_progress(n, num_rows)
session.commit()
c.close()
c = conn.cursor()
c.execute("SELECT count(*) FROM states")
num_rows = c.fetchone()[0]
print("Converting {} states".format(num_rows))
c.close()
c = conn.cursor()
n = 0
for row in c.execute("SELECT * FROM states"):
n += 1
session.add(models.States(
entity_id=row[1],
state=row[2],
attributes=row[3],
last_changed=ts_to_dt(row[4]),
last_updated=ts_to_dt(row[5]),
event_id=id_mapping.get(row[6], row[6]),
domain=row[7]
))
if n % 1000 == 0:
session.commit()
print_progress(n, num_rows)
print_progress(n, num_rows)
session.commit()
c.close()
if __name__ == "__main__":
main()

View file

@ -383,6 +383,9 @@ somecomfort==0.2.1
# homeassistant.components.sensor.speedtest
speedtest-cli==0.3.4
# homeassistant.components.recorder
sqlalchemy==1.0.13
# homeassistant.components.http
static3==0.7.0

View file

@ -6,7 +6,7 @@ import pkgutil
import re
import sys
COMMENT_REQUIREMENTS = [
COMMENT_REQUIREMENTS = (
'RPi.GPIO',
'rpi-rf',
'Adafruit_Python_DHT',
@ -14,7 +14,11 @@ COMMENT_REQUIREMENTS = [
'pybluez',
'bluepy',
'python-lirc',
]
)
IGNORE_PACKAGES = (
'homeassistant.components.recorder.models',
)
def explore_module(package, explore_children):
@ -59,7 +63,8 @@ def gather_modules():
try:
module = importlib.import_module(package)
except ImportError:
errors.append(package)
if package not in IGNORE_PACKAGES:
errors.append(package)
continue
if not getattr(module, 'REQUIREMENTS', None):

View file

@ -25,9 +25,13 @@ class TestComponentHistory(unittest.TestCase):
def init_recorder(self):
"""Initialize the recorder."""
with patch('homeassistant.core.Config.path', return_value=':memory:'):
recorder.setup(self.hass, {})
db_uri = 'sqlite://'
with patch('homeassistant.core.Config.path', return_value=db_uri):
recorder.setup(self.hass, config={
"recorder": {
"db_url": db_uri}})
self.hass.start()
recorder._INSTANCE.block_till_db_ready()
self.wait_recording_done()
def wait_recording_done(self):

View file

@ -1,8 +1,8 @@
"""The tests for the Recorder component."""
# pylint: disable=too-many-public-methods,protected-access
import unittest
import time
import json
from datetime import datetime, timedelta
from unittest.mock import patch
from homeassistant.const import MATCH_ALL
@ -17,9 +17,14 @@ class TestRecorder(unittest.TestCase):
def setUp(self): # pylint: disable=invalid-name
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant()
with patch('homeassistant.core.Config.path', return_value=':memory:'):
recorder.setup(self.hass, {})
db_uri = 'sqlite://'
with patch('homeassistant.core.Config.path', return_value=db_uri):
recorder.setup(self.hass, config={
"recorder": {
"db_url": db_uri}})
self.hass.start()
recorder._INSTANCE.block_till_db_ready()
self.session = recorder.Session()
recorder._INSTANCE.block_till_done()
def tearDown(self): # pylint: disable=invalid-name
@ -29,12 +34,13 @@ class TestRecorder(unittest.TestCase):
def _add_test_states(self):
"""Add multiple states to the db for testing."""
now = int(time.time())
five_days_ago = now - (60*60*24*5)
now = datetime.now()
five_days_ago = now - timedelta(days=5)
attributes = {'test_attr': 5, 'test_attr_10': 'nice'}
self.hass.pool.block_till_done()
recorder._INSTANCE.block_till_done()
for event_id in range(5):
if event_id < 3:
timestamp = five_days_ago
@ -42,19 +48,24 @@ class TestRecorder(unittest.TestCase):
else:
timestamp = now
state = 'dontpurgeme'
recorder.query("INSERT INTO states ("
"entity_id, domain, state, attributes,"
"last_changed, last_updated, created,"
"utc_offset, event_id)"
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
('test.recorder2', 'sensor', state,
json.dumps(attributes), timestamp, timestamp,
timestamp, -18000, event_id + 1000))
self.session.add(recorder.get_model('States')(
entity_id='test.recorder2',
domain='sensor',
state=state,
attributes=json.dumps(attributes),
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
event_id=event_id + 1000
))
self.session.commit()
def _add_test_events(self):
"""Add a few events for testing."""
now = int(time.time())
five_days_ago = now - (60*60*24*5)
now = datetime.now()
five_days_ago = now - timedelta(days=5)
event_data = {'test_attr': 5, 'test_attr_10': 'nice'}
self.hass.pool.block_till_done()
@ -66,12 +77,14 @@ class TestRecorder(unittest.TestCase):
else:
timestamp = now
event_type = 'EVENT_TEST'
recorder.query("INSERT INTO events"
"(event_type, event_data, origin, created,"
"time_fired, utc_offset)"
"VALUES (?, ?, ?, ?, ?, ?)",
(event_type, json.dumps(event_data), 'LOCAL',
timestamp, timestamp, -18000))
self.session.add(recorder.get_model('Events')(
event_type=event_type,
event_data=json.dumps(event_data),
origin='LOCAL',
created=timestamp,
time_fired=timestamp,
))
def test_saving_state(self):
"""Test saving and restoring a state."""
@ -84,7 +97,8 @@ class TestRecorder(unittest.TestCase):
self.hass.pool.block_till_done()
recorder._INSTANCE.block_till_done()
states = recorder.query_states('SELECT * FROM states')
states = recorder.execute(
recorder.query('States'))
self.assertEqual(1, len(states))
self.assertEqual(self.hass.states.get(entity_id), states[0])
@ -108,8 +122,9 @@ class TestRecorder(unittest.TestCase):
self.hass.pool.block_till_done()
recorder._INSTANCE.block_till_done()
db_events = recorder.query_events(
'SELECT * FROM events WHERE event_type = ?', (event_type, ))
db_events = recorder.execute(
recorder.query('Events').filter_by(
event_type=event_type))
assert len(events) == 1
assert len(db_events) == 1
@ -129,51 +144,45 @@ class TestRecorder(unittest.TestCase):
"""Test deleting old states."""
self._add_test_states()
# make sure we start with 5 states
states = recorder.query_states('SELECT * FROM states')
self.assertEqual(len(states), 5)
states = recorder.query('States')
self.assertEqual(states.count(), 5)
# run purge_old_data()
recorder._INSTANCE.purge_days = 4
recorder._INSTANCE._purge_old_data()
# we should only have 2 states left after purging
states = recorder.query_states('SELECT * FROM states')
self.assertEqual(len(states), 2)
self.assertEqual(states.count(), 2)
def test_purge_old_events(self):
"""Test deleting old events."""
self._add_test_events()
events = recorder.query_events('SELECT * FROM events WHERE '
'event_type LIKE "EVENT_TEST%"')
self.assertEqual(len(events), 5)
events = recorder.query('Events').filter(
recorder.get_model('Events').event_type.like("EVENT_TEST%"))
self.assertEqual(events.count(), 5)
# run purge_old_data()
recorder._INSTANCE.purge_days = 4
recorder._INSTANCE._purge_old_data()
# now we should only have 3 events left
events = recorder.query_events('SELECT * FROM events WHERE '
'event_type LIKE "EVENT_TEST%"')
self.assertEqual(len(events), 3)
self.assertEqual(events.count(), 3)
def test_purge_disabled(self):
"""Test leaving purge_days disabled."""
self._add_test_states()
self._add_test_events()
# make sure we start with 5 states and events
states = recorder.query_states('SELECT * FROM states')
events = recorder.query_events('SELECT * FROM events WHERE '
'event_type LIKE "EVENT_TEST%"')
self.assertEqual(len(states), 5)
self.assertEqual(len(events), 5)
states = recorder.query('States')
events = recorder.query('Events').filter(
recorder.get_model('Events').event_type.like("EVENT_TEST%"))
self.assertEqual(states.count(), 5)
self.assertEqual(events.count(), 5)
# run purge_old_data()
recorder._INSTANCE.purge_days = None
recorder._INSTANCE._purge_old_data()
# we should have all of our states still
states = recorder.query_states('SELECT * FROM states')
events = recorder.query_events('SELECT * FROM events WHERE '
'event_type LIKE "EVENT_TEST%"')
self.assertEqual(len(states), 5)
self.assertEqual(len(events), 5)
self.assertEqual(states.count(), 5)
self.assertEqual(events.count(), 5)