check_config script evolution (#12792)

* Initial async_check_ha_config_file

* check_ha_config_file

* Various fixes

* feedback - return the config

* move_to_check_config
This commit is contained in:
Johann Kellerman 2018-03-09 05:34:24 +02:00 committed by Paulus Schoutsen
parent 5e2296f2a4
commit 6734c966b3
5 changed files with 316 additions and 254 deletions

View file

@ -1,17 +1,23 @@
"""Script to ensure a configuration file exists."""
"""Script to check the configuration file."""
import argparse
import logging
import os
from collections import OrderedDict
from collections import OrderedDict, namedtuple
from glob import glob
from platform import system
from unittest.mock import patch
import attr
from typing import Dict, List, Sequence
import voluptuous as vol
from homeassistant.core import callback
from homeassistant import bootstrap, loader, setup, config as config_util
from homeassistant import bootstrap, core, loader
from homeassistant.config import (
get_default_config_dir, CONF_CORE, CORE_CONFIG_SCHEMA,
CONF_PACKAGES, merge_packages_config, _format_config_error,
find_config_file, load_yaml_config_file, get_component,
extract_domain_configs, config_per_platform, get_platform)
import homeassistant.util.yaml as yaml
from homeassistant.exceptions import HomeAssistantError
@ -24,35 +30,18 @@ _LOGGER = logging.getLogger(__name__)
MOCKS = {
'load': ("homeassistant.util.yaml.load_yaml", yaml.load_yaml),
'load*': ("homeassistant.config.load_yaml", yaml.load_yaml),
'get': ("homeassistant.loader.get_component", loader.get_component),
'secrets': ("homeassistant.util.yaml._secret_yaml", yaml._secret_yaml),
'except': ("homeassistant.config.async_log_exception",
config_util.async_log_exception),
'package_error': ("homeassistant.config._log_pkg_error",
config_util._log_pkg_error),
'logger_exception': ("homeassistant.setup._LOGGER.error",
setup._LOGGER.error),
'logger_exception_bootstrap': ("homeassistant.bootstrap._LOGGER.error",
bootstrap._LOGGER.error),
}
SILENCE = (
'homeassistant.bootstrap.async_enable_logging', # callback
'homeassistant.bootstrap.clear_secret_cache',
'homeassistant.bootstrap.async_register_signal_handling', # callback
'homeassistant.config.process_ha_config_upgrade',
'homeassistant.scripts.check_config.yaml.clear_secret_cache',
)
PATCHES = {}
C_HEAD = 'bold'
ERROR_STR = 'General Errors'
@callback
def mock_cb(*args):
"""Callback that returns None."""
return None
def color(the_color, *args, reset=None):
"""Color helper."""
from colorlog.escape_codes import escape_codes, parse_colors
@ -74,11 +63,11 @@ def run(script_args: List) -> int:
'--script', choices=['check_config'])
parser.add_argument(
'-c', '--config',
default=config_util.get_default_config_dir(),
default=get_default_config_dir(),
help="Directory that contains the Home Assistant configuration")
parser.add_argument(
'-i', '--info',
default=None,
'-i', '--info', nargs='?',
default=None, const='all',
help="Show a portion of the config")
parser.add_argument(
'-f', '--files',
@ -89,21 +78,20 @@ def run(script_args: List) -> int:
action='store_true',
help="Show secret information")
args = parser.parse_args()
args, unknown = parser.parse_known_args()
if unknown:
print(color('red', "Unknown arguments:", ', '.join(unknown)))
config_dir = os.path.join(os.getcwd(), args.config)
config_path = os.path.join(config_dir, 'configuration.yaml')
if not os.path.isfile(config_path):
print('Config does not exist:', config_path)
return 1
print(color('bold', "Testing configuration at", config_dir))
res = check(config_dir, args.secrets)
domain_info = []
if args.info:
domain_info = args.info.split(',')
res = check(config_path)
if args.files:
print(color(C_HEAD, 'yaml files'), '(used /',
color('red', 'not used') + ')')
@ -158,59 +146,23 @@ def run(script_args: List) -> int:
return len(res['except'])
def check(config_path):
def check(config_dir, secrets=False):
"""Perform a check by mocking hass load functions."""
logging.getLogger('homeassistant.core').setLevel(logging.WARNING)
logging.getLogger('homeassistant.loader').setLevel(logging.WARNING)
logging.getLogger('homeassistant.setup').setLevel(logging.WARNING)
logging.getLogger('homeassistant.bootstrap').setLevel(logging.ERROR)
logging.getLogger('homeassistant.util.yaml').setLevel(logging.INFO)
logging.getLogger('homeassistant.loader').setLevel(logging.CRITICAL)
res = {
'yaml_files': OrderedDict(), # yaml_files loaded
'secrets': OrderedDict(), # secret cache and secrets loaded
'except': OrderedDict(), # exceptions raised (with config)
'components': OrderedDict(), # successful components
'secret_cache': OrderedDict(),
'components': None, # successful components
'secret_cache': None,
}
# pylint: disable=unused-variable
def mock_load(filename):
"""Mock hass.util.load_yaml to save config files."""
"""Mock hass.util.load_yaml to save config file names."""
res['yaml_files'][filename] = True
return MOCKS['load'][1](filename)
# pylint: disable=unused-variable
def mock_get(comp_name):
"""Mock hass.loader.get_component to replace setup & setup_platform."""
async def mock_async_setup(*args):
"""Mock setup, only record the component name & config."""
assert comp_name not in res['components'], \
"Components should contain a list of platforms"
res['components'][comp_name] = args[1].get(comp_name)
return True
module = MOCKS['get'][1](comp_name)
if module is None:
# Ensure list
msg = '{} not found: {}'.format(
'Platform' if '.' in comp_name else 'Component', comp_name)
res['except'].setdefault(ERROR_STR, []).append(msg)
return None
# Test if platform/component and overwrite setup
if '.' in comp_name:
module.async_setup_platform = mock_async_setup
if hasattr(module, 'setup_platform'):
del module.setup_platform
else:
module.async_setup = mock_async_setup
if hasattr(module, 'setup'):
del module.setup
return module
# pylint: disable=unused-variable
def mock_secrets(ldr, node):
"""Mock _get_secrets."""
@ -221,37 +173,14 @@ def check(config_path):
res['secrets'][node.value] = val
return val
def mock_except(ex, domain, config, # pylint: disable=unused-variable
hass=None):
"""Mock config.log_exception."""
MOCKS['except'][1](ex, domain, config, hass)
res['except'][domain] = config.get(domain, config)
def mock_package_error( # pylint: disable=unused-variable
package, component, config, message):
"""Mock config_util._log_pkg_error."""
MOCKS['package_error'][1](package, component, config, message)
pkg_key = 'homeassistant.packages.{}'.format(package)
res['except'][pkg_key] = config.get('homeassistant', {}) \
.get('packages', {}).get(package)
def mock_logger_exception(msg, *params):
"""Log logger.exceptions."""
res['except'].setdefault(ERROR_STR, []).append(msg % params)
MOCKS['logger_exception'][1](msg, *params)
def mock_logger_exception_bootstrap(msg, *params):
"""Log logger.exceptions."""
res['except'].setdefault(ERROR_STR, []).append(msg % params)
MOCKS['logger_exception_bootstrap'][1](msg, *params)
# Patches to skip functions
for sil in SILENCE:
PATCHES[sil] = patch(sil, return_value=mock_cb())
PATCHES[sil] = patch(sil)
# Patches with local mock functions
for key, val in MOCKS.items():
if not secrets and key == 'secrets':
continue
# The * in the key is removed to find the mock_function (side_effect)
# This allows us to use one side_effect to patch multiple locations
mock_function = locals()['mock_' + key.replace('*', '')]
@ -260,22 +189,42 @@ def check(config_path):
# Start all patches
for pat in PATCHES.values():
pat.start()
# Ensure !secrets point to the patched function
yaml.yaml.SafeLoader.add_constructor('!secret', yaml._secret_yaml)
if secrets:
# Ensure !secrets point to the patched function
yaml.yaml.SafeLoader.add_constructor('!secret', yaml._secret_yaml)
try:
with patch('homeassistant.util.logging.AsyncHandler._process'):
bootstrap.from_config_file(config_path, skip_pip=True)
res['secret_cache'] = dict(yaml.__SECRET_CACHE)
class HassConfig():
"""Hass object with config."""
def __init__(self, conf_dir):
"""Init the config_dir."""
self.config = core.Config()
self.config.config_dir = conf_dir
loader.prepare(HassConfig(config_dir))
res['components'] = check_ha_config_file(config_dir)
res['secret_cache'] = OrderedDict(yaml.__SECRET_CACHE)
for err in res['components'].errors:
domain = err.domain or ERROR_STR
res['except'].setdefault(domain, []).append(err.message)
if err.config:
res['except'].setdefault(domain, []).append(err.config)
except Exception as err: # pylint: disable=broad-except
print(color('red', 'Fatal error while loading config:'), str(err))
res['except'].setdefault(ERROR_STR, []).append(err)
res['except'].setdefault(ERROR_STR, []).append(str(err))
finally:
# Stop all patches
for pat in PATCHES.values():
pat.stop()
# Ensure !secrets point to the original function
yaml.yaml.SafeLoader.add_constructor('!secret', yaml._secret_yaml)
if secrets:
# Ensure !secrets point to the original function
yaml.yaml.SafeLoader.add_constructor('!secret', yaml._secret_yaml)
bootstrap.clear_secret_cache()
return res
@ -317,3 +266,125 @@ def dump_dict(layer, indent_count=3, listi=False, **kwargs):
dump_dict(i, indent_count + 2, True)
else:
print(' ', indent_str, i)
CheckConfigError = namedtuple( # pylint: disable=invalid-name
'CheckConfigError', "message domain config")
@attr.s
class HomeAssistantConfig(OrderedDict):
"""Configuration result with errors attribute."""
errors = attr.ib(default=attr.Factory(list))
def add_error(self, message, domain=None, config=None):
"""Add a single error."""
self.errors.append(CheckConfigError(str(message), domain, config))
return self
def check_ha_config_file(config_dir):
"""Check if Home Assistant configuration file is valid."""
result = HomeAssistantConfig()
def _pack_error(package, component, config, message):
"""Handle errors from packages: _log_pkg_error."""
message = "Package {} setup failed. Component {} {}".format(
package, component, message)
domain = 'homeassistant.packages.{}.{}'.format(package, component)
pack_config = core_config[CONF_PACKAGES].get(package, config)
result.add_error(message, domain, pack_config)
def _comp_error(ex, domain, config):
"""Handle errors from components: async_log_exception."""
result.add_error(
_format_config_error(ex, domain, config), domain, config)
# Load configuration.yaml
try:
config_path = find_config_file(config_dir)
if not config_path:
return result.add_error("File configuration.yaml not found.")
config = load_yaml_config_file(config_path)
except HomeAssistantError as err:
return result.add_error(err)
finally:
yaml.clear_secret_cache()
# Extract and validate core [homeassistant] config
try:
core_config = config.pop(CONF_CORE, {})
core_config = CORE_CONFIG_SCHEMA(core_config)
result[CONF_CORE] = core_config
except vol.Invalid as err:
result.add_error(err, CONF_CORE, core_config)
core_config = {}
# Merge packages
merge_packages_config(
config, core_config.get(CONF_PACKAGES, {}), _pack_error)
del core_config[CONF_PACKAGES]
# Filter out repeating config sections
components = set(key.split(' ')[0] for key in config.keys())
# Process and validate config
for domain in components:
component = get_component(domain)
if not component:
result.add_error("Component not found: {}".format(domain))
continue
if hasattr(component, 'CONFIG_SCHEMA'):
try:
config = component.CONFIG_SCHEMA(config)
result[domain] = config[domain]
except vol.Invalid as ex:
_comp_error(ex, domain, config)
continue
if not hasattr(component, 'PLATFORM_SCHEMA'):
continue
platforms = []
for p_name, p_config in config_per_platform(config, domain):
# Validate component specific platform schema
try:
p_validated = component.PLATFORM_SCHEMA(p_config)
except vol.Invalid as ex:
_comp_error(ex, domain, config)
continue
# Not all platform components follow same pattern for platforms
# So if p_name is None we are not going to validate platform
# (the automation component is one of them)
if p_name is None:
platforms.append(p_validated)
continue
platform = get_platform(domain, p_name)
if platform is None:
result.add_error(
"Platform not found: {}.{}".format(domain, p_name))
continue
# Validate platform specific schema
if hasattr(platform, 'PLATFORM_SCHEMA'):
# pylint: disable=no-member
try:
p_validated = platform.PLATFORM_SCHEMA(p_validated)
except vol.Invalid as ex:
_comp_error(
ex, '{}.{}'.format(domain, p_name), p_validated)
continue
platforms.append(p_validated)
# Remove config for current component and add validated config back in.
for filter_comp in extract_domain_configs(config, domain):
del config[filter_comp]
result[domain] = platforms
return result