Recorder component: proof of concept
This commit is contained in:
parent
c2cd181c1a
commit
7c45318c00
6 changed files with 302 additions and 3 deletions
|
@ -314,6 +314,14 @@ class Event(object):
|
|||
self.data = data or {}
|
||||
self.origin = origin
|
||||
|
||||
def as_dict(self):
|
||||
""" Returns a dict representation of this Event. """
|
||||
return {
|
||||
'event_type': self.event_type,
|
||||
'data': dict(self.data),
|
||||
'origin': str(self.origin)
|
||||
}
|
||||
|
||||
def __repr__(self):
|
||||
# pylint: disable=maybe-no-member
|
||||
if self.data:
|
||||
|
@ -355,7 +363,8 @@ class EventBus(object):
|
|||
|
||||
event = Event(event_type, event_data, origin)
|
||||
|
||||
_LOGGER.info("Bus:Handling %s", event)
|
||||
if event_type != EVENT_TIME_CHANGED:
|
||||
_LOGGER.info("Bus:Handling %s", event)
|
||||
|
||||
if not listeners:
|
||||
return
|
||||
|
|
|
@ -89,6 +89,7 @@ from homeassistant.const import (
|
|||
from homeassistant.helpers import TrackStates
|
||||
import homeassistant.remote as rem
|
||||
import homeassistant.util as util
|
||||
import homeassistant.components.recorder as recorder
|
||||
from . import frontend
|
||||
|
||||
DOMAIN = "http"
|
||||
|
@ -235,6 +236,14 @@ class RequestHandler(SimpleHTTPRequestHandler):
|
|||
('DELETE', URL_API_EVENT_FORWARD,
|
||||
'_handle_delete_api_event_forward'),
|
||||
|
||||
# Query the recorder
|
||||
('GET',
|
||||
re.compile('/api/component/recorder/(?P<entity_id>[a-zA-Z\._0-9]+)/last_5_states'),
|
||||
'_handle_component_recorder_5_states'),
|
||||
('GET',
|
||||
re.compile('/api/component/recorder/last_5_events'),
|
||||
'_handle_component_recorder_5_events'),
|
||||
|
||||
# Static files
|
||||
('GET', re.compile(r'/static/(?P<file>[a-zA-Z\._\-0-9/]+)'),
|
||||
'_handle_get_static'),
|
||||
|
@ -495,6 +504,22 @@ class RequestHandler(SimpleHTTPRequestHandler):
|
|||
|
||||
self._write_json(changed_states)
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def _handle_component_recorder_5_states(self, path_match, data):
|
||||
if recorder.DOMAIN not in self.server.hass.components:
|
||||
return self._write_json([])
|
||||
|
||||
entity_id = path_match.group('entity_id')
|
||||
|
||||
self._write_json(recorder.last_5_states(entity_id))
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def _handle_component_recorder_5_events(self, path_match, data):
|
||||
if recorder.DOMAIN not in self.server.hass.components:
|
||||
return self._write_json([])
|
||||
|
||||
self._write_json(recorder.last_5_events())
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def _handle_post_api_event_forward(self, path_match, data):
|
||||
""" Handles adding an event forwarding target. """
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
<link rel="import" href="../bower_components/polymer/polymer.html">
|
||||
|
||||
<polymer-element name="recent-states" attributes="api stateObj">
|
||||
<template>
|
||||
<style>
|
||||
|
||||
</style>
|
||||
|
||||
<h3>State history</h3>
|
||||
|
||||
<template repeat="{{recentStates as state}}">
|
||||
<p>{{state.state}} - {{state.last_changed | relativeHATime}}</p>
|
||||
</template>
|
||||
|
||||
</template>
|
||||
<script>
|
||||
Polymer({
|
||||
recentStates: [],
|
||||
|
||||
stateObjChanged: function() {
|
||||
this.recentStates = [];
|
||||
|
||||
this.api.call_api('GET', 'component/recorder/' + this.stateObj.entity_id + '/last_5_states', {}, this.newStates.bind(this));
|
||||
},
|
||||
|
||||
newStates: function(states) {
|
||||
this.recentStates = states.slice(1);
|
||||
}
|
||||
});
|
||||
</script>
|
||||
</polymer-element>
|
|
@ -1,5 +1,7 @@
|
|||
<link rel="import" href="../bower_components/polymer/polymer.html">
|
||||
|
||||
<link rel="import" href="../components/recent-states.html">
|
||||
|
||||
<polymer-element name="more-info-default" attributes="stateObj">
|
||||
<template>
|
||||
<style>
|
||||
|
@ -29,9 +31,11 @@
|
|||
<div class='data'>
|
||||
{{stateObj.attributes[key]}}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<recent-states api="{{api}}" stateObj="{{stateObj}}"></recent-states>
|
||||
|
||||
</div>
|
||||
</template>
|
||||
<script>
|
||||
|
|
230
homeassistant/components/recorder.py
Normal file
230
homeassistant/components/recorder.py
Normal file
|
@ -0,0 +1,230 @@
|
|||
import logging
|
||||
import threading
|
||||
import queue
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
import time
|
||||
import json
|
||||
|
||||
from homeassistant import Event, EventOrigin, State
|
||||
from homeassistant.remote import JSONEncoder
|
||||
from homeassistant.const import (
|
||||
MATCH_ALL, EVENT_TIME_CHANGED, EVENT_STATE_CHANGED,
|
||||
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP)
|
||||
|
||||
DOMAIN = "recorder"
|
||||
DEPENDENCIES = []
|
||||
|
||||
DB_FILE = 'home-assistant.db'
|
||||
_INSTANCE = None
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def last_5_states(entity_id):
|
||||
""" Return the last 5 states for entity_id. """
|
||||
verify_instance()
|
||||
|
||||
return [
|
||||
_row_to_state(row) for row in _INSTANCE.query(
|
||||
"SELECT * FROM states WHERE entity_id=? AND "
|
||||
"last_changed=last_updated "
|
||||
"ORDER BY last_changed DESC LIMIT 0, 5", (entity_id, ))]
|
||||
|
||||
|
||||
def last_5_events():
|
||||
""" Return the last 5 events (dev method). """
|
||||
verify_instance()
|
||||
|
||||
return [
|
||||
_row_to_event(row) for row in _INSTANCE.query(
|
||||
"SELECT * FROM events ORDER BY created DESC LIMIT 0, 5")]
|
||||
|
||||
|
||||
def states_history(point_in_time):
|
||||
""" Return states at a specific point in time. """
|
||||
# Find homeassistant.start before point_in_time
|
||||
# Find last state for each entity after homeassistant.start
|
||||
# Ignore all states where state == ''
|
||||
pass
|
||||
|
||||
|
||||
def verify_instance():
|
||||
""" Raise error if recorder is not setup. """
|
||||
if _INSTANCE is None:
|
||||
raise RuntimeError("Recorder not initialized.")
|
||||
|
||||
|
||||
def setup(hass, config):
|
||||
""" Setup the recorder. """
|
||||
global _INSTANCE
|
||||
|
||||
_INSTANCE = Recorder(hass)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class Recorder(threading.Thread):
|
||||
"""
|
||||
Threaded recorder
|
||||
"""
|
||||
def __init__(self, hass):
|
||||
threading.Thread.__init__(self)
|
||||
|
||||
self.hass = hass
|
||||
self.conn = None
|
||||
self.queue = queue.Queue()
|
||||
self.quit_object = object()
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def start_recording(event):
|
||||
""" Start recording. """
|
||||
self.start()
|
||||
hass.states.set('paulus.held', 'juist', {'nou en': 'bier'})
|
||||
|
||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_recording)
|
||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown)
|
||||
hass.bus.listen(MATCH_ALL, self.event_listener)
|
||||
|
||||
def run(self):
|
||||
""" Start processing events to save. """
|
||||
self._setup_connection()
|
||||
|
||||
while True:
|
||||
event = self.queue.get()
|
||||
|
||||
if event == self.quit_object:
|
||||
self._close_connection()
|
||||
return
|
||||
|
||||
elif event.event_type == EVENT_TIME_CHANGED:
|
||||
continue
|
||||
|
||||
elif event.event_type == EVENT_STATE_CHANGED:
|
||||
self.record_state(
|
||||
event.data['entity_id'], event.data.get('new_state'))
|
||||
|
||||
self.record_event(event)
|
||||
|
||||
def event_listener(self, event):
|
||||
""" Listens for new events on the EventBus and puts them
|
||||
in the process queue. """
|
||||
self.queue.put(event)
|
||||
|
||||
def shutdown(self, event):
|
||||
""" Tells the recorder to shut down. """
|
||||
self.queue.put(self.quit_object)
|
||||
|
||||
def record_state(self, entity_id, state):
|
||||
""" Save a state to the database. """
|
||||
now = datetime.now()
|
||||
|
||||
if state is None:
|
||||
info = (entity_id, '', "{}", now, now, now)
|
||||
else:
|
||||
info = (
|
||||
entity_id, state.state, json.dumps(state.attributes),
|
||||
state.last_changed, state.last_updated, now)
|
||||
|
||||
self.query(
|
||||
"insert into states ("
|
||||
"entity_id, state, attributes, last_changed, last_updated,"
|
||||
"created) values (?, ?, ?, ?, ?, ?)", info)
|
||||
|
||||
def record_event(self, event):
|
||||
""" Save an event to the database. """
|
||||
info = (
|
||||
event.event_type, json.dumps(event.data, cls=JSONEncoder),
|
||||
str(event.origin), datetime.now()
|
||||
)
|
||||
|
||||
self.query(
|
||||
"insert into events ("
|
||||
"event_type, event_data, origin, created"
|
||||
") values (?, ?, ?, ?)", info)
|
||||
|
||||
def query(self, query, data=None):
|
||||
""" Query the database. """
|
||||
try:
|
||||
with self.conn, self.lock:
|
||||
cur = self.conn.cursor()
|
||||
|
||||
if data is not None:
|
||||
cur.execute(query, data)
|
||||
else:
|
||||
cur.execute(query)
|
||||
|
||||
return cur.fetchall()
|
||||
|
||||
except sqlite3.IntegrityError:
|
||||
_LOGGER.exception("Error querying the database")
|
||||
return []
|
||||
|
||||
def _setup_connection(self):
|
||||
""" Ensure database is ready to fly. """
|
||||
db_path = self.hass.get_config_path(DB_FILE)
|
||||
self.conn = sqlite3.connect(db_path, check_same_thread=False)
|
||||
|
||||
# Have datetime objects be saved as integers
|
||||
sqlite3.register_adapter(datetime, adapt_datetime)
|
||||
|
||||
# Validate we are on the correct schema or that we have to migrate
|
||||
c = self.conn.cursor()
|
||||
|
||||
def save_migration(migration_id):
|
||||
c.execute('INSERT INTO schema_version VALUES (?, ?)',
|
||||
(migration_id, datetime.now()))
|
||||
self.conn.commit()
|
||||
_LOGGER.info("Database migrated to version %d", migration_id)
|
||||
|
||||
try:
|
||||
c.execute('SELECT max(migration_id) FROM schema_version;')
|
||||
migration_id = c.fetchone()[0] or 0
|
||||
|
||||
except sqlite3.OperationalError:
|
||||
# The table does not exist
|
||||
c.execute('CREATE TABLE schema_version '
|
||||
'(migration_id integer primary key, performed integer)')
|
||||
migration_id = 0
|
||||
|
||||
if migration_id < 1:
|
||||
c.execute(
|
||||
'CREATE TABLE events (event_id integer primary key, '
|
||||
'event_type text, event_data text, origin text, '
|
||||
'created integer)')
|
||||
c.execute('CREATE INDEX events__event_type ON events(event_type)')
|
||||
|
||||
c.execute(
|
||||
'CREATE TABLE states (state_id integer primary key, '
|
||||
'entity_id text, state text, attributes text, '
|
||||
'last_changed integer, last_updated integer, created integer)')
|
||||
c.execute('CREATE INDEX states__entity_id ON states(entity_id)')
|
||||
|
||||
save_migration(1)
|
||||
|
||||
def _close_connection(self):
|
||||
_LOGGER.info("Closing database")
|
||||
self.conn.close()
|
||||
|
||||
|
||||
def _row_to_state(row):
|
||||
""" Convert a databsae row to a state. """
|
||||
try:
|
||||
return State(
|
||||
row[1], row[2], json.loads(row[3]), datetime.fromtimestamp(row[4]))
|
||||
except ValueError:
|
||||
# When json.loads fails
|
||||
return None
|
||||
|
||||
|
||||
def _row_to_event(row):
|
||||
""" Convert a databse row to an event. """
|
||||
try:
|
||||
return Event(row[1], json.loads(row[2]), EventOrigin[row[3].lower()])
|
||||
except ValueError:
|
||||
# When json.oads fails
|
||||
return None
|
||||
|
||||
|
||||
# Setup datetime to save as a integer
|
||||
def adapt_datetime(ts):
|
||||
return time.mktime(ts.timetuple())
|
|
@ -257,7 +257,7 @@ class JSONEncoder(json.JSONEncoder):
|
|||
def default(self, obj):
|
||||
""" Converts Home Assistant objects and hands
|
||||
other objects to the original method. """
|
||||
if isinstance(obj, ha.State):
|
||||
if isinstance(obj, (ha.State, ha.Event)):
|
||||
return obj.as_dict()
|
||||
|
||||
return json.JSONEncoder.default(self, obj)
|
||||
|
|
Loading…
Add table
Reference in a new issue