diff --git a/homeassistant/__init__.py b/homeassistant/__init__.py index 8bb4a0b8b1f..88066bca33e 100644 --- a/homeassistant/__init__.py +++ b/homeassistant/__init__.py @@ -30,8 +30,22 @@ TIMER_INTERVAL = 10 # seconds # How long we wait for the result of a service call SERVICE_CALL_LIMIT = 10 # seconds -# Number of worker threads -POOL_NUM_THREAD = 4 +# Define number of worker threads +# +# There are two categories of Home Assistant jobs: +# - jobs that poll external components that are mostly waiting for IO +# - jobs that respond to events that happen inside HA (state_changed, etc) +# +# Based on different setups I see 3 times as many events responding to events +# then that there are ones that poll components. We therefore want to set the +# number of threads to 1.25 of the CPU count, we will round it up so the +# minimum number of threads is 2. +# +# We want to have atleast 2 threads because a call to the homeassistant.turn_on +# will wait till the service is executed which is in a different thread. +# +# If os.cpu_count() cannot determine the cpu_count, we will assume there is 1. +POOL_NUM_THREAD = int((os.cpu_count() or 1) * 1.25) + 1 # Pattern for validating entity IDs (format: .) ENTITY_ID_PATTERN = re.compile(r"^(?P\w+)\.(?P\w+)$") @@ -268,12 +282,12 @@ def create_worker_pool(thread_count=POOL_NUM_THREAD): def busy_callback(current_jobs, pending_jobs_count): """ Callback to be called when the pool queue gets too big. """ - _LOGGER.error( + _LOGGER.warning( "WorkerPool:All %d threads are busy and %d jobs pending", thread_count, pending_jobs_count) for start, job in current_jobs: - _LOGGER.error("WorkerPool:Current job from %s: %s", + _LOGGER.warning("WorkerPool:Current job from %s: %s", util.datetime_to_str(start), job) return util.ThreadPool(thread_count, job_handler, busy_callback) @@ -347,9 +361,10 @@ class EventBus(object): if not listeners: return + job_priority = JobPriority.from_event_type(event_type) + for func in listeners: - self._pool.add_job(JobPriority.from_event_type(event_type), - (func, event)) + self._pool.add_job(job_priority, (func, event)) def listen(self, event_type, listener): """ Listen for all events or events of a specific type.