Improve influxdb throughput (#12882)

* Batch influxdb events for writing

* Name constants
This commit is contained in:
Anders Melchiorsen 2018-03-04 06:22:31 +01:00 committed by Paulus Schoutsen
parent 2321603eb7
commit d06807c634
2 changed files with 101 additions and 63 deletions

View file

@ -43,7 +43,10 @@ DOMAIN = 'influxdb'
TIMEOUT = 5 TIMEOUT = 5
RETRY_DELAY = 20 RETRY_DELAY = 20
QUEUE_BACKLOG_SECONDS = 10 QUEUE_BACKLOG_SECONDS = 30
BATCH_TIMEOUT = 1
BATCH_BUFFER_SIZE = 100
COMPONENT_CONFIG_SCHEMA_ENTRY = vol.Schema({ COMPONENT_CONFIG_SCHEMA_ENTRY = vol.Schema({
vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string, vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string,
@ -143,18 +146,18 @@ def setup(hass, config):
"READ/WRITE", exc) "READ/WRITE", exc)
return False return False
def influx_handle_event(event): def event_to_json(event):
"""Send an event to Influx.""" """Add an event to the outgoing Influx list."""
state = event.data.get('new_state') state = event.data.get('new_state')
if state is None or state.state in ( if state is None or state.state in (
STATE_UNKNOWN, '', STATE_UNAVAILABLE) or \ STATE_UNKNOWN, '', STATE_UNAVAILABLE) or \
state.entity_id in blacklist_e or state.domain in blacklist_d: state.entity_id in blacklist_e or state.domain in blacklist_d:
return True return
try: try:
if (whitelist_e and state.entity_id not in whitelist_e) or \ if (whitelist_e and state.entity_id not in whitelist_e) or \
(whitelist_d and state.domain not in whitelist_d): (whitelist_d and state.domain not in whitelist_d):
return True return
_include_state = _include_value = False _include_state = _include_value = False
@ -183,55 +186,48 @@ def setup(hass, config):
else: else:
include_uom = False include_uom = False
json_body = [ json = {
{
'measurement': measurement, 'measurement': measurement,
'tags': { 'tags': {
'domain': state.domain, 'domain': state.domain,
'entity_id': state.object_id, 'entity_id': state.object_id,
}, },
'time': event.time_fired, 'time': event.time_fired,
'fields': { 'fields': {}
} }
}
]
if _include_state: if _include_state:
json_body[0]['fields']['state'] = state.state json['fields']['state'] = state.state
if _include_value: if _include_value:
json_body[0]['fields']['value'] = _state_as_value json['fields']['value'] = _state_as_value
for key, value in state.attributes.items(): for key, value in state.attributes.items():
if key in tags_attributes: if key in tags_attributes:
json_body[0]['tags'][key] = value json['tags'][key] = value
elif key != 'unit_of_measurement' or include_uom: elif key != 'unit_of_measurement' or include_uom:
# If the key is already in fields # If the key is already in fields
if key in json_body[0]['fields']: if key in json['fields']:
key = key + "_" key = key + "_"
# Prevent column data errors in influxDB. # Prevent column data errors in influxDB.
# For each value we try to cast it as float # For each value we try to cast it as float
# But if we can not do it we store the value # But if we can not do it we store the value
# as string add "_str" postfix to the field key # as string add "_str" postfix to the field key
try: try:
json_body[0]['fields'][key] = float(value) json['fields'][key] = float(value)
except (ValueError, TypeError): except (ValueError, TypeError):
new_key = "{}_str".format(key) new_key = "{}_str".format(key)
new_value = str(value) new_value = str(value)
json_body[0]['fields'][new_key] = new_value json['fields'][new_key] = new_value
if RE_DIGIT_TAIL.match(new_value): if RE_DIGIT_TAIL.match(new_value):
json_body[0]['fields'][key] = float( json['fields'][key] = float(
RE_DECIMAL.sub('', new_value)) RE_DECIMAL.sub('', new_value))
json_body[0]['tags'].update(tags) json['tags'].update(tags)
try: return json
influx.write_points(json_body)
return True
except (exceptions.InfluxDBClientError, IOError):
return False
instance = hass.data[DOMAIN] = InfluxThread( instance = hass.data[DOMAIN] = InfluxThread(
hass, influx_handle_event, max_tries) hass, influx, event_to_json, max_tries)
instance.start() instance.start()
def shutdown(event): def shutdown(event):
@ -247,12 +243,15 @@ def setup(hass, config):
class InfluxThread(threading.Thread): class InfluxThread(threading.Thread):
"""A threaded event handler class.""" """A threaded event handler class."""
def __init__(self, hass, event_handler, max_tries): def __init__(self, hass, influx, event_to_json, max_tries):
"""Initialize the listener.""" """Initialize the listener."""
threading.Thread.__init__(self, name='InfluxDB') threading.Thread.__init__(self, name='InfluxDB')
self.queue = queue.Queue() self.queue = queue.Queue()
self.event_handler = event_handler self.influx = influx
self.event_to_json = event_to_json
self.max_tries = max_tries self.max_tries = max_tries
self.write_errors = 0
self.shutdown = False
hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener) hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener)
def _event_listener(self, event): def _event_listener(self, event):
@ -260,40 +259,76 @@ class InfluxThread(threading.Thread):
item = (time.monotonic(), event) item = (time.monotonic(), event)
self.queue.put(item) self.queue.put(item)
def run(self): @staticmethod
"""Process incoming events.""" def batch_timeout():
"""Return number of seconds to wait for more events."""
return BATCH_TIMEOUT
def get_events_json(self):
"""Return a batch of events formatted for writing."""
queue_seconds = QUEUE_BACKLOG_SECONDS + self.max_tries*RETRY_DELAY queue_seconds = QUEUE_BACKLOG_SECONDS + self.max_tries*RETRY_DELAY
write_error = False count = 0
dropped = False json = []
while True: dropped = 0
item = self.queue.get()
try:
while len(json) < BATCH_BUFFER_SIZE and not self.shutdown:
timeout = None if count == 0 else self.batch_timeout()
item = self.queue.get(timeout=timeout)
count += 1
if item is None: if item is None:
self.queue.task_done() self.shutdown = True
return else:
timestamp, event = item timestamp, event = item
age = time.monotonic() - timestamp age = time.monotonic() - timestamp
if age < queue_seconds: if age < queue_seconds:
for retry in range(self.max_tries+1): event_json = self.event_to_json(event)
if self.event_handler(event): if event_json:
if write_error: json.append(event_json)
_LOGGER.error("Resumed writing to InfluxDB") else:
write_error = False dropped += 1
dropped = False
break
elif retry < self.max_tries:
time.sleep(RETRY_DELAY)
elif not write_error:
_LOGGER.error("Error writing to InfluxDB")
write_error = True
elif not dropped:
_LOGGER.warning("Dropping old events to catch up")
dropped = True
except queue.Empty:
pass
if dropped:
_LOGGER.warning("Catching up, dropped %d old events", dropped)
return count, json
def write_to_influxdb(self, json):
"""Write preprocessed events to influxdb, with retry."""
from influxdb import exceptions
for retry in range(self.max_tries+1):
try:
self.influx.write_points(json)
if self.write_errors:
_LOGGER.error("Resumed, lost %d events", self.write_errors)
self.write_errors = 0
_LOGGER.debug("Wrote %d events", len(json))
break
except (exceptions.InfluxDBClientError, IOError):
if retry < self.max_tries:
time.sleep(RETRY_DELAY)
else:
if not self.write_errors:
_LOGGER.exception("Write error")
self.write_errors += len(json)
def run(self):
"""Process incoming events."""
while not self.shutdown:
count, json = self.get_events_json()
if json:
self.write_to_influxdb(json)
for _ in range(count):
self.queue.task_done() self.queue.task_done()
def block_till_done(self): def block_till_done(self):

View file

@ -14,6 +14,9 @@ from tests.common import get_test_home_assistant
@mock.patch('influxdb.InfluxDBClient') @mock.patch('influxdb.InfluxDBClient')
@mock.patch(
'homeassistant.components.influxdb.InfluxThread.batch_timeout',
mock.Mock(return_value=0))
class TestInfluxDB(unittest.TestCase): class TestInfluxDB(unittest.TestCase):
"""Test the InfluxDB component.""" """Test the InfluxDB component."""