diff --git a/homeassistant/__init__.py b/homeassistant/__init__.py index c34de361cbe..eab75f7042f 100644 --- a/homeassistant/__init__.py +++ b/homeassistant/__init__.py @@ -31,6 +31,7 @@ TIMER_INTERVAL = 10 # seconds assert 60 % TIMER_INTERVAL == 0, "60 % TIMER_INTERVAL should be 0!" BUS_NUM_THREAD = 4 +BUS_REPORT_BUSY_TIMEOUT = dt.timedelta(minutes=1) def start_home_assistant(bus): @@ -75,7 +76,7 @@ def track_state_change(bus, entity_id, action, from_state=None, to_state=None): from_state = _process_match_param(from_state) to_state = _process_match_param(to_state) - def listener(event): + def state_listener(event): """ State change listener that listens for specific state changes. """ if entity_id == event.data['entity_id'] and \ _matcher(event.data['old_state'].state, from_state) and \ @@ -85,7 +86,10 @@ def track_state_change(bus, entity_id, action, from_state=None, to_state=None): event.data['old_state'], event.data['new_state']) - bus.listen_event(EVENT_STATE_CHANGED, listener) + # Let the string representation make sense + state_listener.__name__ = 'State listener for {}'.format(action.__name__) + + bus.listen_event(EVENT_STATE_CHANGED, state_listener) # pylint: disable=too-many-arguments @@ -98,7 +102,7 @@ def track_time_change(bus, action, year, month, day = pmp(year), pmp(month), pmp(day) hour, minute, second = pmp(hour), pmp(minute), pmp(second) - def listener(event): + def time_listener(event): """ Listens for matching time_changed events. """ now = event.data['now'] @@ -116,11 +120,14 @@ def track_time_change(bus, action, # point_in_time are exact points in time # so we always remove it after fire if listen_once or point_in_time: - bus.remove_event_listener(EVENT_TIME_CHANGED, listener) + bus.remove_event_listener(EVENT_TIME_CHANGED, time_listener) action(now) - bus.listen_event(EVENT_TIME_CHANGED, listener) + # Let the string representation make sense + time_listener.__name__ = 'Time listener for {}'.format(action.__name__) + + bus.listen_event(EVENT_TIME_CHANGED, time_listener) def create_bus_job_handler(logger): @@ -182,13 +189,15 @@ class Bus(object): """ def __init__(self, thread_count=None): - thread_count = thread_count or BUS_NUM_THREAD + self.thread_count = thread_count or BUS_NUM_THREAD self._event_listeners = {} self._services = {} self.logger = logging.getLogger(__name__) self.event_lock = threading.Lock() self.service_lock = threading.Lock() - self.pool = util.ThreadPool(thread_count, + self.last_busy_notice = dt.datetime.now() + + self.pool = util.ThreadPool(self.thread_count, create_bus_job_handler(self.logger)) @property @@ -223,6 +232,8 @@ class Bus(object): self.pool.add_job(self._services[domain][service], service_call) + self._check_busy() + except KeyError: # if key domain or service does not exist raise ServiceDoesNotExistError( "Service does not exist: {}/{}".format(domain, service)) @@ -255,6 +266,8 @@ class Bus(object): for func in listeners: self.pool.add_job(func, event) + self._check_busy() + def listen_event(self, event_type, listener): """ Listen for all events or events of a specific type. @@ -283,6 +296,9 @@ class Bus(object): listener(event) + onetime_listener.__name__ = "One time listener for {}".format( + listener.__name__) + self.listen_event(event_type, onetime_listener) def remove_event_listener(self, event_type, listener): @@ -300,6 +316,26 @@ class Bus(object): # AttributeError if listener did not exist within event_type pass + def _check_busy(self): + """ Complain if we have more than twice as many jobs queued as threads + and if we didn't complain about it recently. """ + if self.pool.queue.qsize() / self.thread_count >= 2 and \ + dt.datetime.now()-self.last_busy_notice > BUS_REPORT_BUSY_TIMEOUT: + + self.last_busy_notice = dt.datetime.now() + + logger = self.logger + + logger.error( + "Bus:All {} threads are busy and {} jobs pending".format( + self.thread_count, self.pool.queue.qsize())) + + jobs = self.pool.current_jobs + + for start, job in jobs: + logger.error("Bus:Current job from {}: {}".format( + util.datetime_to_str(start), job)) + class State(object): """ Object to represent a state within the state machine. """ diff --git a/homeassistant/util.py b/homeassistant/util.py index da6ff836569..86b72724e80 100644 --- a/homeassistant/util.py +++ b/homeassistant/util.py @@ -89,10 +89,11 @@ class ThreadPool(object): # pylint: disable=too-few-public-methods def __init__(self, worker_count, job_handler): queue = self.queue = Queue.Queue() + current_jobs = self.current_jobs = [] for _ in xrange(worker_count): worker = threading.Thread(target=_threadpool_worker, - args=(queue, job_handler)) + args=(queue, current_jobs, job_handler)) worker.daemon = True worker.start() @@ -101,9 +102,21 @@ class ThreadPool(object): self.queue.put(args) -def _threadpool_worker(queue, job_handler): +def _threadpool_worker(queue, current_jobs, job_handler): """ Provides the base functionality of a worker for the thread pool. """ while True: + # Get new item from queue job = queue.get() + + # Add to current running jobs + job_log = (datetime.datetime.now(), job) + current_jobs.append(job_log) + + # Do the job job_handler(job) + + # Remove from current running job + current_jobs.remove(job_log) + + # Tell queue a task is done queue.task_done()