New events and service calls now use a shared thread pool

This commit is contained in:
Paulus Schoutsen 2014-01-26 18:44:36 -08:00
parent 70d2506e01
commit 2758d81525
2 changed files with 151 additions and 57 deletions

View file

@ -9,7 +9,6 @@ of entities and react to changes.
import time import time
import logging import logging
import threading import threading
from collections import namedtuple
import datetime as dt import datetime as dt
import homeassistant.util as util import homeassistant.util as util
@ -33,6 +32,8 @@ TIMER_INTERVAL = 10 # seconds
# every minute. # every minute.
assert 60 % TIMER_INTERVAL == 0, "60 % TIMER_INTERVAL should be 0!" assert 60 % TIMER_INTERVAL == 0, "60 % TIMER_INTERVAL should be 0!"
BUS_NUM_THREAD = 4
def start_home_assistant(bus): def start_home_assistant(bus):
""" Start home assistant. """ """ Start home assistant. """
@ -117,14 +118,64 @@ def track_time_change(bus, action,
# point_in_time are exact points in time # point_in_time are exact points in time
# so we always remove it after fire # so we always remove it after fire
if listen_once or point_in_time: if listen_once or point_in_time:
event.bus.remove_event_listener(EVENT_TIME_CHANGED, listener) bus.remove_event_listener(EVENT_TIME_CHANGED, listener)
action(now) action(now)
bus.listen_event(EVENT_TIME_CHANGED, listener) bus.listen_event(EVENT_TIME_CHANGED, listener)
ServiceCall = namedtuple("ServiceCall", ["bus", "domain", "service", "data"])
Event = namedtuple("Event", ["bus", "event_type", "data"]) def create_bus_job_handler(logger):
""" Creates a job handler that logs errors to supplied `logger`. """
def job_handler(job):
""" Called whenever a job is available to do. """
try:
func, arg = job
func(arg)
except Exception: # pylint: disable=broad-except
# Catch any exception our service/event_listener might throw
# We do not want to crash our ThreadPool
logger.exception("BusHandler:Exception doing job")
return job_handler
# pylint: disable=too-few-public-methods
class ServiceCall(object):
""" Represents a call to a service. """
__slots__ = ['domain', 'service', 'data']
def __init__(self, domain, service, data=None):
self.domain = domain
self.service = service
self.data = data or {}
def __repr__(self):
if self.data:
return "<ServiceCall {}.{}: {}>".format(
self.domain, self.service, util.repr_helper(self.data))
else:
return "<ServiceCall {}.{}>".format(self.domain, self.service)
# pylint: disable=too-few-public-methods
class Event(object):
""" Represents an event within the Bus. """
__slots__ = ['event_type', 'data']
def __init__(self, event_type, data=None):
self.event_type = event_type
self.data = data or {}
def __repr__(self):
if self.data:
return "<Event {}: {}>".format(
self.event_type, util.repr_helper(self.data))
else:
return "<Event {}>".format(self.event_type)
class Bus(object): class Bus(object):
@ -132,12 +183,15 @@ class Bus(object):
and events. and events.
""" """
def __init__(self): def __init__(self, thread_count=None):
thread_count = thread_count or BUS_NUM_THREAD
self._event_listeners = {} self._event_listeners = {}
self._services = {} self._services = {}
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.event_lock = threading.Lock() self.event_lock = threading.Lock()
self.service_lock = threading.Lock() self.service_lock = threading.Lock()
self.pool = util.ThreadPool(thread_count,
create_bus_job_handler(self.logger))
@property @property
def services(self): def services(self):
@ -157,71 +211,51 @@ class Bus(object):
def has_service(self, domain, service): def has_service(self, domain, service):
""" Returns True if specified service exists. """ """ Returns True if specified service exists. """
return (domain in self._services and try:
service in self._services[domain]) return service in self._services[domain]
except KeyError: # if key 'domain' does not exist
return False
def call_service(self, domain, service, service_data=None): def call_service(self, domain, service, service_data=None):
""" Calls a service. """ """ Calls a service. """
if not self.has_service(domain, service): service_call = ServiceCall(domain, service, service_data)
raise ServiceDoesNotExistError(
"Service does not exist: {}/{}".format(domain, service))
with self.service_lock: with self.service_lock:
service_data = service_data or {} try:
self.pool.add_job(self._services[domain][service],
service_call)
def run(): except KeyError: # if key domain or service does not exist
""" Executes a service. """ raise ServiceDoesNotExistError(
service_call = ServiceCall(self, domain, service, service_data) "Service does not exist: {}/{}".format(domain, service))
try: def register_service(self, domain, service, service_func):
self._services[domain][service](service_call)
except Exception: # pylint: disable=broad-except
self.logger.exception(
"Bus:Exception in service {}/{}".format(
domain, service))
# We dont want the bus to be blocking - run in a thread.
threading.Thread(target=run).start()
def register_service(self, domain, service, service_callback):
""" Register a service. """ """ Register a service. """
with self.service_lock: with self.service_lock:
try: try:
self._services[domain][service] = service_callback self._services[domain][service] = service_func
except KeyError:
# Domain does not exist yet except KeyError: # Domain does not exist yet in self._services
self._services[domain] = {service: service_callback} self._services[domain] = {service: service_func}
def fire_event(self, event_type, event_data=None): def fire_event(self, event_type, event_data=None):
""" Fire an event. """ """ Fire an event. """
with self.event_lock: with self.event_lock:
# Copy the list of the current listeners because some listeners # Copy the list of the current listeners because some listeners
# choose to remove themselves as a listener while being executed # remove themselves as a listener while being executed which
# which causes the iterator to be confused. # causes the iterator to be confused.
get = self._event_listeners.get get = self._event_listeners.get
listeners = get(MATCH_ALL, []) + get(event_type, []) listeners = get(MATCH_ALL, []) + get(event_type, [])
self.logger.info("Bus:Event {}: {}".format( event = Event(event_type, event_data)
event_type, event_data))
self.logger.info("Bus:Handling {}".format(event))
if not listeners: if not listeners:
return return
event_data = event_data or {} for func in listeners:
self.pool.add_job(func, event)
def run():
""" Fire listeners for event. """
event = Event(self, event_type, event_data)
for listener in listeners:
try:
listener(event)
except Exception: # pylint: disable=broad-except
self.logger.exception(
"Bus:Exception in event listener")
# We dont want the bus to be blocking - run in a thread.
threading.Thread(target=run).start()
def listen_event(self, event_type, listener): def listen_event(self, event_type, listener):
""" Listen for all events or events of a specific type. """ Listen for all events or events of a specific type.
@ -232,6 +266,7 @@ class Bus(object):
with self.event_lock: with self.event_lock:
try: try:
self._event_listeners[event_type].append(listener) self._event_listeners[event_type].append(listener)
except KeyError: # event_type did not exist except KeyError: # event_type did not exist
self._event_listeners[event_type] = [listener] self._event_listeners[event_type] = [listener]
@ -242,6 +277,7 @@ class Bus(object):
as event_type. as event_type.
Note: at the moment it is impossible to remove a one time listener. Note: at the moment it is impossible to remove a one time listener.
Note2: it is also not guaranteed that it will only run once
""" """
def onetime_listener(event): def onetime_listener(event):
""" Removes listener from eventbus and then fires listener. """ """ Removes listener from eventbus and then fires listener. """
@ -259,11 +295,11 @@ class Bus(object):
# delete event_type list if empty # delete event_type list if empty
if not self._event_listeners[event_type]: if not self._event_listeners[event_type]:
del self._event_listeners[event_type] self._event_listeners.pop(event_type)
except (KeyError, ValueError): except (KeyError, AttributeError):
# KeyError is key event_type did not exist # KeyError is key event_type listener did not exist
# ValueError if the list [event_type] did not contain listener # AttributeError if listener did not exist within event_type
pass pass
@ -317,13 +353,17 @@ class State(object):
json_dict['state'], json_dict['state'],
json_dict.get('attributes'), json_dict.get('attributes'),
last_changed) last_changed)
except KeyError: # if key 'state' did not exist except KeyError: # if key 'entity_id' or 'state' did not exist
return None return None
def __repr__(self): def __repr__(self):
return "<state {}:{}, {}>".format( if self.attributes:
self.state, self.attributes, return "<state {}:{} @ {}>".format(
util.datetime_to_str(self.last_changed)) self.state, util.repr_helper(self.attributes),
util.datetime_to_str(self.last_changed))
else:
return "<state {} @ {}>".format(
self.state, util.datetime_to_str(self.last_changed))
class StateMachine(object): class StateMachine(object):

View file

@ -1,5 +1,6 @@
""" Helper methods for various modules. """ """ Helper methods for various modules. """
import threading
import Queue
import datetime import datetime
import re import re
@ -53,3 +54,56 @@ def filter_entity_ids(entity_ids, domain_filter=None, strip_domain=False):
for entity_id in entity_ids if for entity_id in entity_ids if
not domain_filter or entity_id.startswith(domain_filter) not domain_filter or entity_id.startswith(domain_filter)
] ]
def repr_helper(inp):
""" Helps creating a more readable string representation of objects. """
if isinstance(inp, dict):
return ", ".join(
repr_helper(key)+"="+repr_helper(item) for key, item in inp.items()
)
elif isinstance(inp, list):
return '[' + ', '.join(inp) + ']'
elif isinstance(inp, datetime.datetime):
return datetime_to_str(inp)
else:
return str(inp)
# Reason why I decided to roll my own ThreadPool instead of using
# multiprocessing.dummy.pool or even better, use multiprocessing.pool and
# not be hurt by the GIL in the cpython interpreter:
# 1. The built in threadpool does not allow me to create custom workers and so
# I would have to wrap every listener that I passed into it with code to log
# the exceptions. Saving a reference to the logger in the worker seemed
# like a more sane thing to do.
# 2. Most event listeners are simple checks if attributes match. If the method
# that they will call takes a long time to complete it might be better to
# put that request in a seperate thread. This is for every component to
# decide on its own instead of enforcing it for everyone.
class ThreadPool(object):
""" A simple queue-based thread pool.
Will initiate it's workers using worker(queue).start() """
# pylint: disable=too-few-public-methods
def __init__(self, worker_count, job_handler):
queue = self.queue = Queue.Queue()
for _ in xrange(worker_count):
worker = threading.Thread(target=_threadpool_worker,
args=(queue, job_handler))
worker.daemon = True
worker.start()
def add_job(self, *args):
""" Add a job to be sent to the workers. """
self.queue.put(args)
def _threadpool_worker(queue, job_handler):
""" Provides the base functionality of a worker for the thread pool. """
while True:
job = queue.get()
job_handler(job)
queue.task_done()