Limit concurrency of async_get_integration to avoid creating extra threads (#43085)

* Limit concurrency of async_get_integration to avoid creating extra threads

Since async_get_integration is waiting on the disk most of the time
it would end up creating many new threads because the disk could
not deliver the data in time.

* pylint
This commit is contained in:
J. Nick Koston 2020-11-10 21:34:54 -10:00 committed by GitHub
parent 518e462e9a
commit 94bf55e29b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 68 additions and 9 deletions

View file

@ -28,6 +28,7 @@ from homeassistant.setup import (
async_set_domains_to_be_loaded, async_set_domains_to_be_loaded,
async_setup_component, async_setup_component,
) )
from homeassistant.util.async_ import gather_with_concurrency
from homeassistant.util.logging import async_activate_log_queue_handler from homeassistant.util.logging import async_activate_log_queue_handler
from homeassistant.util.package import async_get_user_site, is_virtual_env from homeassistant.util.package import async_get_user_site, is_virtual_env
from homeassistant.util.yaml import clear_secret_cache from homeassistant.util.yaml import clear_secret_cache
@ -49,6 +50,8 @@ STAGE_2_TIMEOUT = 300
WRAP_UP_TIMEOUT = 300 WRAP_UP_TIMEOUT = 300
COOLDOWN_TIME = 60 COOLDOWN_TIME = 60
MAX_LOAD_CONCURRENTLY = 6
DEBUGGER_INTEGRATIONS = {"debugpy", "ptvsd"} DEBUGGER_INTEGRATIONS = {"debugpy", "ptvsd"}
CORE_INTEGRATIONS = ("homeassistant", "persistent_notification") CORE_INTEGRATIONS = ("homeassistant", "persistent_notification")
LOGGING_INTEGRATIONS = { LOGGING_INTEGRATIONS = {
@ -442,7 +445,8 @@ async def _async_set_up_integrations(
integrations_to_process = [ integrations_to_process = [
int_or_exc int_or_exc
for int_or_exc in await asyncio.gather( for int_or_exc in await gather_with_concurrency(
loader.MAX_LOAD_CONCURRENTLY,
*( *(
loader.async_get_integration(hass, domain) loader.async_get_integration(hass, domain)
for domain in old_to_resolve for domain in old_to_resolve

View file

@ -38,7 +38,13 @@ from homeassistant.helpers import template
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.template import Template from homeassistant.helpers.template import Template
from homeassistant.helpers.typing import ConfigType, HomeAssistantType, TemplateVarsType from homeassistant.helpers.typing import ConfigType, HomeAssistantType, TemplateVarsType
from homeassistant.loader import Integration, async_get_integration, bind_hass from homeassistant.loader import (
MAX_LOAD_CONCURRENTLY,
Integration,
async_get_integration,
bind_hass,
)
from homeassistant.util.async_ import gather_with_concurrency
from homeassistant.util.yaml import load_yaml from homeassistant.util.yaml import load_yaml
from homeassistant.util.yaml.loader import JSON_TYPE from homeassistant.util.yaml.loader import JSON_TYPE
@ -307,8 +313,9 @@ async def async_get_all_descriptions(
loaded = {} loaded = {}
if missing: if missing:
integrations = await asyncio.gather( integrations = await gather_with_concurrency(
*(async_get_integration(hass, domain) for domain in missing) MAX_LOAD_CONCURRENTLY,
*(async_get_integration(hass, domain) for domain in missing),
) )
contents = await hass.async_add_executor_job( contents = await hass.async_add_executor_job(

View file

@ -6,11 +6,13 @@ from typing import Any, Dict, List, Optional, Set
from homeassistant.core import callback from homeassistant.core import callback
from homeassistant.loader import ( from homeassistant.loader import (
MAX_LOAD_CONCURRENTLY,
Integration, Integration,
async_get_config_flows, async_get_config_flows,
async_get_integration, async_get_integration,
bind_hass, bind_hass,
) )
from homeassistant.util.async_ import gather_with_concurrency
from homeassistant.util.json import load_json from homeassistant.util.json import load_json
from .typing import HomeAssistantType from .typing import HomeAssistantType
@ -151,8 +153,9 @@ async def async_get_component_strings(
integrations = dict( integrations = dict(
zip( zip(
domains, domains,
await asyncio.gather( await gather_with_concurrency(
*[async_get_integration(hass, domain) for domain in domains] MAX_LOAD_CONCURRENTLY,
*[async_get_integration(hass, domain) for domain in domains],
), ),
) )
) )

View file

@ -50,6 +50,8 @@ CUSTOM_WARNING = (
) )
_UNDEF = object() _UNDEF = object()
MAX_LOAD_CONCURRENTLY = 4
def manifest_from_legacy_module(domain: str, module: ModuleType) -> Dict: def manifest_from_legacy_module(domain: str, module: ModuleType) -> Dict:
"""Generate a manifest from a legacy module.""" """Generate a manifest from a legacy module."""

View file

@ -1,12 +1,12 @@
"""Asyncio backports for Python 3.6 compatibility.""" """Asyncio utilities."""
from asyncio import coroutines, ensure_future, get_running_loop from asyncio import Semaphore, coroutines, ensure_future, gather, get_running_loop
from asyncio.events import AbstractEventLoop from asyncio.events import AbstractEventLoop
import concurrent.futures import concurrent.futures
import functools import functools
import logging import logging
import threading import threading
from traceback import extract_stack from traceback import extract_stack
from typing import Any, Callable, Coroutine, TypeVar from typing import Any, Awaitable, Callable, Coroutine, TypeVar
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -121,3 +121,21 @@ def protect_loop(func: Callable) -> Callable:
return func(*args, **kwargs) return func(*args, **kwargs)
return protected_loop_func return protected_loop_func
async def gather_with_concurrency(
limit: int, *tasks: Any, return_exceptions: bool = False
) -> Any:
"""Wrap asyncio.gather to limit the number of concurrent tasks.
From: https://stackoverflow.com/a/61478547/9127614
"""
semaphore = Semaphore(limit)
async def sem_task(task: Awaitable[Any]) -> Any:
async with semaphore:
return await task
return await gather(
*(sem_task(task) for task in tasks), return_exceptions=return_exceptions
)

View file

@ -1,4 +1,7 @@
"""Tests for async util methods from Python source.""" """Tests for async util methods from Python source."""
import asyncio
import time
import pytest import pytest
from homeassistant.util import async_ as hasync from homeassistant.util import async_ as hasync
@ -144,3 +147,25 @@ def test_protect_loop_sync():
hasync.protect_loop(calls.append)(1) hasync.protect_loop(calls.append)(1)
assert len(mock_loop.mock_calls) == 1 assert len(mock_loop.mock_calls) == 1
assert calls == [1] assert calls == [1]
async def test_gather_with_concurrency():
"""Test gather_with_concurrency limits the number of running tasks."""
runs = 0
now_time = time.time()
async def _increment_runs_if_in_time():
if time.time() - now_time > 0.1:
return -1
nonlocal runs
runs += 1
await asyncio.sleep(0.1)
return runs
results = await hasync.gather_with_concurrency(
2, *[_increment_runs_if_in_time() for i in range(4)]
)
assert results == [2, 2, -1, -1]