From 3b9fb6ccf55a764aff76969e110532950a6f3072 Mon Sep 17 00:00:00 2001 From: Thibault Cohen Date: Sat, 14 Jan 2017 12:52:47 -0500 Subject: [PATCH] Improve InfluxDB (#5238) * Revert #4791 and fixes #4696 * Update influxDB based on PR comments * Add migration script * Update influxdb_migrator based on PR comments * Add override_measurement option to influxdb_migrator * Rename value field to state when data is string type * Fix influxdb cloning query --- homeassistant/components/influxdb.py | 77 +++--- homeassistant/scripts/influxdb_migrator.py | 193 +++++++++++++++ tests/components/test_influxdb.py | 260 ++++++++++++++------- 3 files changed, 418 insertions(+), 112 deletions(-) create mode 100644 homeassistant/scripts/influxdb_migrator.py diff --git a/homeassistant/components/influxdb.py b/homeassistant/components/influxdb.py index 0250efae818..5221679b6b5 100644 --- a/homeassistant/components/influxdb.py +++ b/homeassistant/components/influxdb.py @@ -9,8 +9,9 @@ import logging import voluptuous as vol from homeassistant.const import ( - EVENT_STATE_CHANGED, CONF_HOST, CONF_PORT, CONF_SSL, CONF_VERIFY_SSL, - CONF_USERNAME, CONF_BLACKLIST, CONF_PASSWORD, CONF_WHITELIST) + EVENT_STATE_CHANGED, STATE_UNAVAILABLE, STATE_UNKNOWN, CONF_HOST, + CONF_PORT, CONF_SSL, CONF_VERIFY_SSL, CONF_USERNAME, CONF_BLACKLIST, + CONF_PASSWORD, CONF_WHITELIST) from homeassistant.helpers import state as state_helper import homeassistant.helpers.config_validation as cv @@ -21,6 +22,7 @@ _LOGGER = logging.getLogger(__name__) CONF_DB_NAME = 'database' CONF_TAGS = 'tags' CONF_DEFAULT_MEASUREMENT = 'default_measurement' +CONF_OVERRIDE_MEASUREMENT = 'override_measurement' DEFAULT_DATABASE = 'home_assistant' DEFAULT_VERIFY_SSL = True @@ -37,6 +39,8 @@ CONFIG_SCHEMA = vol.Schema({ vol.Optional(CONF_DB_NAME, default=DEFAULT_DATABASE): cv.string, vol.Optional(CONF_PORT): cv.port, vol.Optional(CONF_SSL): cv.boolean, + vol.Optional(CONF_DEFAULT_MEASUREMENT): cv.string, + vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string, vol.Optional(CONF_TAGS, default={}): vol.Schema({cv.string: cv.string}), vol.Optional(CONF_WHITELIST, default=[]): @@ -76,10 +80,12 @@ def setup(hass, config): blacklist = conf.get(CONF_BLACKLIST) whitelist = conf.get(CONF_WHITELIST) tags = conf.get(CONF_TAGS) + default_measurement = conf.get(CONF_DEFAULT_MEASUREMENT) + override_measurement = conf.get(CONF_OVERRIDE_MEASUREMENT) try: influx = InfluxDBClient(**kwargs) - influx.query("select * from /.*/ LIMIT 1;") + influx.query("SELECT * FROM /.*/ LIMIT 1;") except exceptions.InfluxDBClientError as exc: _LOGGER.error("Database host is not accessible due to '%s', please " "check your entries in the configuration file and that " @@ -89,56 +95,61 @@ def setup(hass, config): def influx_event_listener(event): """Listen for new messages on the bus and sends them to Influx.""" state = event.data.get('new_state') - if state is None or state.entity_id in blacklist: - return - - if whitelist and state.entity_id not in whitelist: + if state is None or state.state in ( + STATE_UNKNOWN, '', STATE_UNAVAILABLE) or \ + state.entity_id in blacklist: return try: - _state = state_helper.state_as_number(state) + if len(whitelist) > 0 and state.entity_id not in whitelist: + return + + _state = float(state_helper.state_as_number(state)) + _state_key = "value" except ValueError: _state = state.state + _state_key = "state" + + if override_measurement: + measurement = override_measurement + else: + measurement = state.attributes.get('unit_of_measurement') + if measurement in (None, ''): + if default_measurement: + measurement = default_measurement + else: + measurement = state.entity_id - # Create a counter for this state change json_body = [ { - 'measurement': "hass.state.count", + 'measurement': measurement, 'tags': { 'domain': state.domain, 'entity_id': state.object_id, }, 'time': event.time_fired, 'fields': { - 'value': 1 + _state_key: _state, } } ] - json_body[0]['tags'].update(tags) - - state_fields = {} - if isinstance(_state, (int, float)): - state_fields['value'] = float(_state) - for key, value in state.attributes.items(): - if isinstance(value, (int, float)): - state_fields[key] = float(value) + if key != 'unit_of_measurement': + # If the key is already in fields + if key in json_body[0]['fields']: + key = key + "_" + # Prevent column data errors in influxDB. + # For each value we try to cast it as float + # But if we can not do it we store the value + # as string add "_str" postfix to the field key + try: + json_body[0]['fields'][key] = float(value) + except (ValueError, TypeError): + new_key = "{}_str".format(key) + json_body[0]['fields'][new_key] = str(value) - if state_fields: - json_body.append( - { - 'measurement': "hass.state", - 'tags': { - 'domain': state.domain, - 'entity_id': state.object_id - }, - 'time': event.time_fired, - 'fields': state_fields - } - ) - - json_body[1]['tags'].update(tags) + json_body[0]['tags'].update(tags) try: influx.write_points(json_body) diff --git a/homeassistant/scripts/influxdb_migrator.py b/homeassistant/scripts/influxdb_migrator.py new file mode 100644 index 00000000000..6f643c592de --- /dev/null +++ b/homeassistant/scripts/influxdb_migrator.py @@ -0,0 +1,193 @@ +"""Script to convert an old-structure influxdb to a new one.""" + +import argparse +import sys + +from typing import List + + +# Based on code at +# http://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console +def print_progress(iteration: int, total: int, prefix: str='', suffix: str='', + decimals: int=2, bar_length: int=68) -> None: + """Print progress bar. + + Call in a loop to create terminal progress bar + @params: + iteration - Required : current iteration (Int) + total - Required : total iterations (Int) + prefix - Optional : prefix string (Str) + suffix - Optional : suffix string (Str) + decimals - Optional : number of decimals in percent complete (Int) + barLength - Optional : character length of bar (Int) + """ + filled_length = int(round(bar_length * iteration / float(total))) + percents = round(100.00 * (iteration / float(total)), decimals) + line = '#' * filled_length + '-' * (bar_length - filled_length) + sys.stdout.write('%s [%s] %s%s %s\r' % (prefix, line, + percents, '%', suffix)) + sys.stdout.flush() + if iteration == total: + print("\n") + + +def run(script_args: List) -> int: + """The actual script body.""" + from influxdb import InfluxDBClient + + parser = argparse.ArgumentParser( + description="Migrate legacy influxDB.") + parser.add_argument( + '-d', '--dbname', + metavar='dbname', + required=True, + help="InfluxDB database name") + parser.add_argument( + '-H', '--host', + metavar='host', + default='127.0.0.1', + help="InfluxDB host address") + parser.add_argument( + '-P', '--port', + metavar='port', + default=8086, + help="InfluxDB host port") + parser.add_argument( + '-u', '--username', + metavar='username', + default='root', + help="InfluxDB username") + parser.add_argument( + '-p', '--password', + metavar='password', + default='root', + help="InfluxDB password") + parser.add_argument( + '-s', '--step', + metavar='step', + default=1000, + help="How many points to migrate at the same time") + parser.add_argument( + '-o', '--override-measurement', + metavar='override_measurement', + default="", + help="Store all your points in the same measurement") + parser.add_argument( + '-D', '--delete', + action='store_true', + default=False, + help="Delete old database") + parser.add_argument( + '--script', + choices=['influxdb_migrator']) + + args = parser.parse_args() + + # Get client for old DB + client = InfluxDBClient(args.host, args.port, + args.username, args.password) + client.switch_database(args.dbname) + # Get DB list + db_list = [db['name'] for db in client.get_list_database()] + # Get measurements of the old DB + res = client.query('SHOW MEASUREMENTS') + measurements = [measurement['name'] for measurement in res.get_points()] + nb_measurements = len(measurements) + # Move data + # Get old DB name + old_dbname = "{}__old".format(args.dbname) + # Create old DB if needed + if old_dbname not in db_list: + client.create_database(old_dbname) + # Copy data to the old DB + print("Cloning from {} to {}".format(args.dbname, old_dbname)) + for index, measurement in enumerate(measurements): + client.query('''SELECT * INTO {}..:MEASUREMENT FROM ''' + '"{}" GROUP BY *'.format(old_dbname, measurement)) + # Print progess + print_progress(index + 1, nb_measurements) + + # Delete the database + client.drop_database(args.dbname) + # Create new DB if needed + client.create_database(args.dbname) + client.switch_database(old_dbname) + # Get client for new DB + new_client = InfluxDBClient(args.host, args.port, args.username, + args.password, args.dbname) + # Counter of points without time + point_wt_time = 0 + + print("Migrating from {} to {}".format(old_dbname, args.dbname)) + # Walk into measurenebt + for index, measurement in enumerate(measurements): + + # Get tag list + res = client.query('''SHOW TAG KEYS FROM "{}"'''.format(measurement)) + tags = [v['tagKey'] for v in res.get_points()] + # Get field list + res = client.query('''SHOW FIELD KEYS FROM "{}"'''.format(measurement)) + fields = [v['fieldKey'] for v in res.get_points()] + # Get points, convert and send points to the new DB + offset = 0 + while True: + nb_points = 0 + # Prepare new points + new_points = [] + # Get points + res = client.query('SELECT * FROM "{}" LIMIT {} OFFSET ' + '{}'.format(measurement, args.step, offset)) + for point in res.get_points(): + new_point = {"tags": {}, + "fields": {}, + "time": None} + if args.override_measurement: + new_point["measurement"] = args.override_measurement + else: + new_point["measurement"] = measurement + # Check time + if point["time"] is None: + # Point without time + point_wt_time += 1 + print("Can not convert point without time") + continue + # Convert all fields + for field in fields: + try: + new_point["fields"][field] = float(point[field]) + except (ValueError, TypeError): + if field == "value": + new_key = "state" + else: + new_key = "{}_str".format(field) + new_point["fields"][new_key] = str(point[field]) + # Add tags + for tag in tags: + new_point["tags"][tag] = point[tag] + # Set time + new_point["time"] = point["time"] + # Add new point to the new list + new_points.append(new_point) + # Count nb points + nb_points += 1 + + # Send to the new db + try: + new_client.write_points(new_points) + except Exception as exp: + raise exp + + # If there is no points + if nb_points == 0: + # print("Measurement {} migrated".format(measurement)) + break + else: + # Increment offset + offset += args.step + # Print progess + print_progress(index + 1, nb_measurements) + + # Delete database if needed + if args.delete: + print("Dropping {}".format(old_dbname)) + client.drop_database(old_dbname) diff --git a/tests/components/test_influxdb.py b/tests/components/test_influxdb.py index 1e64351e406..96a6460a2a4 100644 --- a/tests/components/test_influxdb.py +++ b/tests/components/test_influxdb.py @@ -106,43 +106,52 @@ class TestInfluxDB(unittest.TestCase): """Test the event listener.""" self._setup() - valid = {'1': 1, '1.0': 1.0, STATE_ON: 1, STATE_OFF: 0, 'str': 'str'} + valid = { + '1': 1, + '1.0': 1.0, + STATE_ON: 1, + STATE_OFF: 0, + 'foo': 'foo' + } for in_, out in valid.items(): - state = mock.MagicMock(state=in_, domain='fake', - object_id='entity') + attrs = { + 'unit_of_measurement': 'foobars', + 'longitude': '1.1', + 'latitude': '2.2' + } + state = mock.MagicMock( + state=in_, domain='fake', object_id='entity', attributes=attrs) event = mock.MagicMock(data={'new_state': state}, time_fired=12345) - - body = [ - { - 'measurement': 'hass.state.count', + if isinstance(out, str): + body = [{ + 'measurement': 'foobars', 'tags': { 'domain': 'fake', 'entity_id': 'entity', }, 'time': 12345, 'fields': { - 'value': 1, - } - } - ] - - if isinstance(out, (int, float)): - body.append( - { - 'measurement': 'hass.state', - 'tags': { - 'domain': 'fake', - 'entity_id': 'entity', - }, - 'time': 12345, - 'fields': { - 'value': float(out) - } - } - ) + 'state': out, + 'longitude': 1.1, + 'latitude': 2.2 + }, + }] + else: + body = [{ + 'measurement': 'foobars', + 'tags': { + 'domain': 'fake', + 'entity_id': 'entity', + }, + 'time': 12345, + 'fields': { + 'value': out, + 'longitude': 1.1, + 'latitude': 2.2 + }, + }] self.handler_method(event) - self.assertEqual( mock_client.return_value.write_points.call_count, 1 ) @@ -150,7 +159,40 @@ class TestInfluxDB(unittest.TestCase): mock_client.return_value.write_points.call_args, mock.call(body) ) + mock_client.return_value.write_points.reset_mock() + def test_event_listener_no_units(self, mock_client): + """Test the event listener for missing units.""" + self._setup() + + for unit in (None, ''): + if unit: + attrs = {'unit_of_measurement': unit} + else: + attrs = {} + state = mock.MagicMock( + state=1, domain='fake', entity_id='entity-id', + object_id='entity', attributes=attrs) + event = mock.MagicMock(data={'new_state': state}, time_fired=12345) + body = [{ + 'measurement': 'entity-id', + 'tags': { + 'domain': 'fake', + 'entity_id': 'entity', + }, + 'time': 12345, + 'fields': { + 'value': 1, + }, + }] + self.handler_method(event) + self.assertEqual( + mock_client.return_value.write_points.call_count, 1 + ) + self.assertEqual( + mock_client.return_value.write_points.call_args, + mock.call(body) + ) mock_client.return_value.write_points.reset_mock() def test_event_listener_fail_write(self, mock_client): @@ -165,6 +207,39 @@ class TestInfluxDB(unittest.TestCase): influx_client.exceptions.InfluxDBClientError('foo') self.handler_method(event) + def test_event_listener_states(self, mock_client): + """Test the event listener against ignored states.""" + self._setup() + + for state_state in (1, 'unknown', '', 'unavailable'): + state = mock.MagicMock( + state=state_state, domain='fake', entity_id='entity-id', + object_id='entity', attributes={}) + event = mock.MagicMock(data={'new_state': state}, time_fired=12345) + body = [{ + 'measurement': 'entity-id', + 'tags': { + 'domain': 'fake', + 'entity_id': 'entity', + }, + 'time': 12345, + 'fields': { + 'value': 1, + }, + }] + self.handler_method(event) + if state_state == 1: + self.assertEqual( + mock_client.return_value.write_points.call_count, 1 + ) + self.assertEqual( + mock_client.return_value.write_points.call_args, + mock.call(body) + ) + else: + self.assertFalse(mock_client.return_value.write_points.called) + mock_client.return_value.write_points.reset_mock() + def test_event_listener_blacklist(self, mock_client): """Test the event listener against a blacklist.""" self._setup() @@ -174,34 +249,18 @@ class TestInfluxDB(unittest.TestCase): state=1, domain='fake', entity_id='fake.{}'.format(entity_id), object_id=entity_id, attributes={}) event = mock.MagicMock(data={'new_state': state}, time_fired=12345) - - body = [ - { - 'measurement': 'hass.state.count', - 'tags': { - 'domain': 'fake', - 'entity_id': entity_id, - }, - 'time': 12345, - 'fields': { - 'value': 1, - } + body = [{ + 'measurement': 'fake.{}'.format(entity_id), + 'tags': { + 'domain': 'fake', + 'entity_id': entity_id, }, - { - 'measurement': 'hass.state', - 'tags': { - 'domain': 'fake', - 'entity_id': entity_id, - }, - 'time': 12345, - 'fields': { - 'value': 1 - } - } - ] - + 'time': 12345, + 'fields': { + 'value': 1, + }, + }] self.handler_method(event) - if entity_id == 'ok': self.assertEqual( mock_client.return_value.write_points.call_count, 1 @@ -212,7 +271,6 @@ class TestInfluxDB(unittest.TestCase): ) else: self.assertFalse(mock_client.return_value.write_points.called) - mock_client.return_value.write_points.reset_mock() def test_event_listener_invalid_type(self, mock_client): @@ -229,43 +287,45 @@ class TestInfluxDB(unittest.TestCase): for in_, out in valid.items(): attrs = { 'unit_of_measurement': 'foobars', + 'longitude': '1.1', + 'latitude': '2.2', 'invalid_attribute': ['value1', 'value2'] } state = mock.MagicMock( state=in_, domain='fake', object_id='entity', attributes=attrs) event = mock.MagicMock(data={'new_state': state}, time_fired=12345) - - body = [ - { - 'measurement': 'hass.state.count', + if isinstance(out, str): + body = [{ + 'measurement': 'foobars', 'tags': { 'domain': 'fake', 'entity_id': 'entity', }, 'time': 12345, 'fields': { - 'value': 1, - } - } - ] - - if isinstance(out, (int, float)): - body.append( - { - 'measurement': 'hass.state', - 'tags': { - 'domain': 'fake', - 'entity_id': 'entity', - }, - 'time': 12345, - 'fields': { - 'value': float(out) - } - } - ) + 'state': out, + 'longitude': 1.1, + 'latitude': 2.2, + 'invalid_attribute_str': "['value1', 'value2']" + }, + }] + else: + body = [{ + 'measurement': 'foobars', + 'tags': { + 'domain': 'fake', + 'entity_id': 'entity', + }, + 'time': 12345, + 'fields': { + 'value': float(out), + 'longitude': 1.1, + 'latitude': 2.2, + 'invalid_attribute_str': "['value1', 'value2']" + }, + }] self.handler_method(event) - self.assertEqual( mock_client.return_value.write_points.call_count, 1 ) @@ -273,5 +333,47 @@ class TestInfluxDB(unittest.TestCase): mock_client.return_value.write_points.call_args, mock.call(body) ) - + mock_client.return_value.write_points.reset_mock() + + def test_event_listener_default_measurement(self, mock_client): + """Test the event listener with a default measurement.""" + config = { + 'influxdb': { + 'host': 'host', + 'username': 'user', + 'password': 'pass', + 'default_measurement': 'state', + 'blacklist': ['fake.blacklisted'] + } + } + assert setup_component(self.hass, influxdb.DOMAIN, config) + self.handler_method = self.hass.bus.listen.call_args_list[0][0][1] + + for entity_id in ('ok', 'blacklisted'): + state = mock.MagicMock( + state=1, domain='fake', entity_id='fake.{}'.format(entity_id), + object_id=entity_id, attributes={}) + event = mock.MagicMock(data={'new_state': state}, time_fired=12345) + body = [{ + 'measurement': 'state', + 'tags': { + 'domain': 'fake', + 'entity_id': entity_id, + }, + 'time': 12345, + 'fields': { + 'value': 1, + }, + }] + self.handler_method(event) + if entity_id == 'ok': + self.assertEqual( + mock_client.return_value.write_points.call_count, 1 + ) + self.assertEqual( + mock_client.return_value.write_points.call_args, + mock.call(body) + ) + else: + self.assertFalse(mock_client.return_value.write_points.called) mock_client.return_value.write_points.reset_mock()