Consolidate executor jobs when loading integration manifests (#75176)

This commit is contained in:
J. Nick Koston 2022-07-14 22:06:08 +02:00 committed by GitHub
parent fef1b842ce
commit 61cc9f5288
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 143 additions and 118 deletions

View file

@ -7,7 +7,7 @@ documentation as possible to keep it understandable.
from __future__ import annotations
import asyncio
from collections.abc import Callable
from collections.abc import Callable, Iterable
from contextlib import suppress
import functools as ft
import importlib
@ -31,7 +31,6 @@ from .generated.ssdp import SSDP
from .generated.usb import USB
from .generated.zeroconf import HOMEKIT, ZEROCONF
from .helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
from .util.async_ import gather_with_concurrency
# Typing imports that create a circular dependency
if TYPE_CHECKING:
@ -128,6 +127,7 @@ class Manifest(TypedDict, total=False):
version: str
codeowners: list[str]
loggers: list[str]
supported_brands: dict[str, str]
def manifest_from_legacy_module(domain: str, module: ModuleType) -> Manifest:
@ -166,19 +166,15 @@ async def _async_get_custom_components(
get_sub_directories, custom_components.__path__
)
integrations = await gather_with_concurrency(
MAX_LOAD_CONCURRENTLY,
*(
hass.async_add_executor_job(
Integration.resolve_from_root, hass, custom_components, comp.name
)
for comp in dirs
),
integrations = await hass.async_add_executor_job(
_resolve_integrations_from_root,
hass,
custom_components,
[comp.name for comp in dirs],
)
return {
integration.domain: integration
for integration in integrations
for integration in integrations.values()
if integration is not None
}
@ -681,59 +677,101 @@ class Integration:
return f"<Integration {self.domain}: {self.pkg_path}>"
def _resolve_integrations_from_root(
hass: HomeAssistant, root_module: ModuleType, domains: list[str]
) -> dict[str, Integration]:
"""Resolve multiple integrations from root."""
integrations: dict[str, Integration] = {}
for domain in domains:
try:
integration = Integration.resolve_from_root(hass, root_module, domain)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error loading integration: %s", domain)
else:
if integration:
integrations[domain] = integration
return integrations
async def async_get_integration(hass: HomeAssistant, domain: str) -> Integration:
"""Get an integration."""
"""Get integration."""
integrations_or_excs = await async_get_integrations(hass, [domain])
int_or_exc = integrations_or_excs[domain]
if isinstance(int_or_exc, Integration):
return int_or_exc
raise int_or_exc
async def async_get_integrations(
hass: HomeAssistant, domains: Iterable[str]
) -> dict[str, Integration | Exception]:
"""Get integrations."""
if (cache := hass.data.get(DATA_INTEGRATIONS)) is None:
if not _async_mount_config_dir(hass):
raise IntegrationNotFound(domain)
return {domain: IntegrationNotFound(domain) for domain in domains}
cache = hass.data[DATA_INTEGRATIONS] = {}
int_or_evt: Integration | asyncio.Event | None = cache.get(domain, _UNDEF)
results: dict[str, Integration | Exception] = {}
needed: dict[str, asyncio.Event] = {}
in_progress: dict[str, asyncio.Event] = {}
for domain in domains:
int_or_evt: Integration | asyncio.Event | None = cache.get(domain, _UNDEF)
if isinstance(int_or_evt, asyncio.Event):
in_progress[domain] = int_or_evt
elif int_or_evt is not _UNDEF:
results[domain] = cast(Integration, int_or_evt)
elif "." in domain:
results[domain] = ValueError(f"Invalid domain {domain}")
else:
needed[domain] = cache[domain] = asyncio.Event()
if isinstance(int_or_evt, asyncio.Event):
await int_or_evt.wait()
if in_progress:
await asyncio.gather(*[event.wait() for event in in_progress.values()])
for domain in in_progress:
# When we have waited and it's _UNDEF, it doesn't exist
# We don't cache that it doesn't exist, or else people can't fix it
# and then restart, because their config will never be valid.
if (int_or_evt := cache.get(domain, _UNDEF)) is _UNDEF:
results[domain] = IntegrationNotFound(domain)
else:
results[domain] = cast(Integration, int_or_evt)
# When we have waited and it's _UNDEF, it doesn't exist
# We don't cache that it doesn't exist, or else people can't fix it
# and then restart, because their config will never be valid.
if (int_or_evt := cache.get(domain, _UNDEF)) is _UNDEF:
raise IntegrationNotFound(domain)
# First we look for custom components
if needed:
# Instead of using resolve_from_root we use the cache of custom
# components to find the integration.
custom = await async_get_custom_components(hass)
for domain, event in needed.items():
if integration := custom.get(domain):
results[domain] = cache[domain] = integration
event.set()
if int_or_evt is not _UNDEF:
return cast(Integration, int_or_evt)
for domain in results:
if domain in needed:
del needed[domain]
event = cache[domain] = asyncio.Event()
# Now the rest use resolve_from_root
if needed:
from . import components # pylint: disable=import-outside-toplevel
try:
integration = await _async_get_integration(hass, domain)
except Exception:
# Remove event from cache.
cache.pop(domain)
event.set()
raise
integrations = await hass.async_add_executor_job(
_resolve_integrations_from_root, hass, components, list(needed)
)
for domain, event in needed.items():
int_or_exc = integrations.get(domain)
if not int_or_exc:
cache.pop(domain)
results[domain] = IntegrationNotFound(domain)
elif isinstance(int_or_exc, Exception):
cache.pop(domain)
exc = IntegrationNotFound(domain)
exc.__cause__ = int_or_exc
results[domain] = exc
else:
results[domain] = cache[domain] = int_or_exc
event.set()
cache[domain] = integration
event.set()
return integration
async def _async_get_integration(hass: HomeAssistant, domain: str) -> Integration:
if "." in domain:
raise ValueError(f"Invalid domain {domain}")
# Instead of using resolve_from_root we use the cache of custom
# components to find the integration.
if integration := (await async_get_custom_components(hass)).get(domain):
return integration
from . import components # pylint: disable=import-outside-toplevel
if integration := await hass.async_add_executor_job(
Integration.resolve_from_root, hass, components, domain
):
return integration
raise IntegrationNotFound(domain)
return results
class LoaderError(Exception):