Improve after_dependencies handling (#36898)

This commit is contained in:
Paulus Schoutsen 2020-06-19 17:24:33 -07:00 committed by GitHub
parent 93272e3083
commit 5642027ffb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 377 additions and 166 deletions

View file

@ -1,6 +1,7 @@
"""Provide methods to bootstrap a Home Assistant instance."""
import asyncio
import contextlib
from datetime import datetime
import logging
import logging.handlers
import os
@ -20,7 +21,12 @@ from homeassistant.const import (
)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.typing import ConfigType
from homeassistant.setup import DATA_SETUP, DATA_SETUP_STARTED, async_setup_component
from homeassistant.setup import (
DATA_SETUP,
DATA_SETUP_STARTED,
async_set_domains_to_be_loaded,
async_setup_component,
)
from homeassistant.util.logging import async_activate_log_queue_handler
from homeassistant.util.package import async_get_user_site, is_virtual_env
from homeassistant.util.yaml import clear_secret_cache
@ -36,10 +42,16 @@ LOG_SLOW_STARTUP_INTERVAL = 60
DEBUGGER_INTEGRATIONS = {"ptvsd"}
CORE_INTEGRATIONS = ("homeassistant", "persistent_notification")
LOGGING_INTEGRATIONS = {"logger", "system_log", "sentry"}
STAGE_1_INTEGRATIONS = {
LOGGING_INTEGRATIONS = {
# Set log levels
"logger",
# Error logging
"system_log",
"sentry",
# To record data
"recorder",
}
STAGE_1_INTEGRATIONS = {
# To make sure we forward data to other instances
"mqtt_eventstream",
# To provide account link implementations
@ -330,77 +342,130 @@ def _get_domains(hass: core.HomeAssistant, config: Dict[str, Any]) -> Set[str]:
return domains
async def _async_log_pending_setups(
domains: Set[str], setup_started: Dict[str, datetime]
) -> None:
"""Periodic log of setups that are pending for longer than LOG_SLOW_STARTUP_INTERVAL."""
while True:
await asyncio.sleep(LOG_SLOW_STARTUP_INTERVAL)
remaining = [domain for domain in domains if domain in setup_started]
if remaining:
_LOGGER.info(
"Waiting on integrations to complete setup: %s", ", ".join(remaining),
)
async def async_setup_multi_components(
hass: core.HomeAssistant,
domains: Set[str],
config: Dict[str, Any],
setup_started: Dict[str, datetime],
) -> None:
"""Set up multiple domains. Log on failure."""
futures = {
domain: hass.async_create_task(async_setup_component(hass, domain, config))
for domain in domains
}
log_task = asyncio.create_task(_async_log_pending_setups(domains, setup_started))
await asyncio.wait(futures.values())
log_task.cancel()
errors = [domain for domain in domains if futures[domain].exception()]
for domain in errors:
exception = futures[domain].exception()
assert exception is not None
_LOGGER.error(
"Error setting up integration %s - received exception",
domain,
exc_info=(type(exception), exception, exception.__traceback__),
)
async def _async_set_up_integrations(
hass: core.HomeAssistant, config: Dict[str, Any]
) -> None:
"""Set up all the integrations."""
setup_started = hass.data[DATA_SETUP_STARTED] = {}
domains_to_setup = _get_domains(hass, config)
async def async_setup_multi_components(domains: Set[str]) -> None:
"""Set up multiple domains. Log on failure."""
# Resolve all dependencies so we know all integrations
# that will have to be loaded and start rightaway
integration_cache: Dict[str, loader.Integration] = {}
to_resolve = domains_to_setup
while to_resolve:
old_to_resolve = to_resolve
to_resolve = set()
async def _async_log_pending_setups() -> None:
"""Periodic log of setups that are pending for longer than LOG_SLOW_STARTUP_INTERVAL."""
while True:
await asyncio.sleep(LOG_SLOW_STARTUP_INTERVAL)
remaining = [domain for domain in domains if domain in setup_started]
if remaining:
_LOGGER.info(
"Waiting on integrations to complete setup: %s",
", ".join(remaining),
)
futures = {
domain: hass.async_create_task(async_setup_component(hass, domain, config))
for domain in domains
}
log_task = asyncio.create_task(_async_log_pending_setups())
await asyncio.wait(futures.values())
log_task.cancel()
errors = [domain for domain in domains if futures[domain].exception()]
for domain in errors:
exception = futures[domain].exception()
assert exception is not None
_LOGGER.error(
"Error setting up integration %s - received exception",
domain,
exc_info=(type(exception), exception, exception.__traceback__),
integrations_to_process = [
int_or_exc
for int_or_exc in await asyncio.gather(
*(
loader.async_get_integration(hass, domain)
for domain in old_to_resolve
),
return_exceptions=True,
)
if isinstance(int_or_exc, loader.Integration)
]
resolve_dependencies_tasks = [
itg.resolve_dependencies()
for itg in integrations_to_process
if not itg.all_dependencies_resolved
]
domains = _get_domains(hass, config)
if resolve_dependencies_tasks:
await asyncio.gather(*resolve_dependencies_tasks)
for itg in integrations_to_process:
integration_cache[itg.domain] = itg
for dep in itg.all_dependencies:
if dep in domains_to_setup:
continue
domains_to_setup.add(dep)
to_resolve.add(dep)
_LOGGER.info("Domains to be set up: %s", domains_to_setup)
logging_domains = domains_to_setup & LOGGING_INTEGRATIONS
# Load logging as soon as possible
if logging_domains:
_LOGGER.info("Setting up logging: %s", logging_domains)
await async_setup_multi_components(hass, logging_domains, config, setup_started)
# Start up debuggers. Start these first in case they want to wait.
debuggers = domains & DEBUGGER_INTEGRATIONS
debuggers = domains_to_setup & DEBUGGER_INTEGRATIONS
if debuggers:
_LOGGER.debug("Starting up debuggers %s", debuggers)
await async_setup_multi_components(debuggers)
domains -= DEBUGGER_INTEGRATIONS
_LOGGER.debug("Setting up debuggers: %s", debuggers)
await async_setup_multi_components(hass, debuggers, config, setup_started)
# Resolve all dependencies of all components so we can find the logging
# and integrations that need faster initialization.
resolved_domains_task = asyncio.gather(
*(loader.async_component_dependencies(hass, domain) for domain in domains),
return_exceptions=True,
)
# calculate what components to setup in what stage
stage_1_domains = set()
# Finish resolving domains
for dep_domains in await resolved_domains_task:
# Result is either a set or an exception. We ignore exceptions
# It will be properly handled during setup of the domain.
if isinstance(dep_domains, set):
domains.update(dep_domains)
# Find all dependencies of any dependency of any stage 1 integration that
# we plan on loading and promote them to stage 1
deps_promotion = STAGE_1_INTEGRATIONS
while deps_promotion:
old_deps_promotion = deps_promotion
deps_promotion = set()
# setup components
logging_domains = domains & LOGGING_INTEGRATIONS
stage_1_domains = domains & STAGE_1_INTEGRATIONS
stage_2_domains = domains - logging_domains - stage_1_domains
for domain in old_deps_promotion:
if domain not in domains_to_setup or domain in stage_1_domains:
continue
if logging_domains:
_LOGGER.info("Setting up %s", logging_domains)
stage_1_domains.add(domain)
await async_setup_multi_components(logging_domains)
dep_itg = integration_cache.get(domain)
if dep_itg is None:
continue
deps_promotion.update(dep_itg.all_dependencies)
stage_2_domains = domains_to_setup - logging_domains - debuggers - stage_1_domains
# Kick off loading the registries. They don't need to be awaited.
asyncio.gather(
@ -409,49 +474,17 @@ async def _async_set_up_integrations(
hass.helpers.area_registry.async_get_registry(),
)
# Start setup
if stage_1_domains:
_LOGGER.info("Setting up %s", stage_1_domains)
_LOGGER.info("Setting up stage 1: %s", stage_1_domains)
await async_setup_multi_components(hass, stage_1_domains, config, setup_started)
await async_setup_multi_components(stage_1_domains)
# Enables after dependencies
async_set_domains_to_be_loaded(hass, stage_1_domains | stage_2_domains)
# Load all integrations
after_dependencies: Dict[str, Set[str]] = {}
for int_or_exc in await asyncio.gather(
*(loader.async_get_integration(hass, domain) for domain in stage_2_domains),
return_exceptions=True,
):
# Exceptions are handled in async_setup_component.
if isinstance(int_or_exc, loader.Integration) and int_or_exc.after_dependencies:
after_dependencies[int_or_exc.domain] = set(int_or_exc.after_dependencies)
last_load = None
while stage_2_domains:
domains_to_load = set()
for domain in stage_2_domains:
after_deps = after_dependencies.get(domain)
# Load if integration has no after_dependencies or they are
# all loaded
if not after_deps or not after_deps - hass.config.components:
domains_to_load.add(domain)
if not domains_to_load or domains_to_load == last_load:
break
_LOGGER.debug("Setting up %s", domains_to_load)
await async_setup_multi_components(domains_to_load)
last_load = domains_to_load
stage_2_domains -= domains_to_load
# These are stage 2 domains that never have their after_dependencies
# satisfied.
if stage_2_domains:
_LOGGER.debug("Final set up: %s", stage_2_domains)
await async_setup_multi_components(stage_2_domains)
_LOGGER.info("Setting up stage 2: %s", stage_2_domains)
await async_setup_multi_components(hass, stage_2_domains, config, setup_started)
# Wrap up startup
_LOGGER.debug("Waiting for startup to wrap up")