* Start moving parts of yaml utils to own module Move parts of yaml loader out of the single large file and start to create the structure of the yaml loaders in Ansible [0]. [0]: https://github.com/ansible/ansible/tree/devel/lib/ansible/parsing/yaml * Finish yaml migration, update tests and mocks * Move code around to finish the migration * Update the mocks so that `open` is patched in `homeassistant.util.yaml.loader` instead of `homeassistant.util.yaml`. * Updated mypy ignores * Updated external API of `homeasistant.util.yaml`, see below: Checked what part of the api of `homeassistant.util.yaml` was actually called from outside the tests and added an `__ALL__` that contains only these elements. Updated the tests so that references to internal parts of the API (e.g. the yaml module imported into `homeassistant.util.yaml.loader`) are referenced directly from `homeassistant.util.yaml.loader`. In `tests/test_yaml.py` the import `yaml` refers to `homeassistant.util.yaml` and `yaml_loader` refers to `~.loader`. Future work that remains for the next iteration is to create a custom SafeConstructor and refers to that instead of monkey patching `yaml` with custom loaders. * Update mocks in yaml dumper, check_config
423 lines
15 KiB
Python
423 lines
15 KiB
Python
"""Script to check the configuration file."""
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
from collections import OrderedDict, namedtuple
|
|
from glob import glob
|
|
from typing import Dict, List, Sequence
|
|
from unittest.mock import patch
|
|
|
|
import attr
|
|
import voluptuous as vol
|
|
|
|
from homeassistant import bootstrap, core, loader, requirements
|
|
from homeassistant.config import (
|
|
get_default_config_dir, CONF_CORE, CORE_CONFIG_SCHEMA,
|
|
CONF_PACKAGES, merge_packages_config, _format_config_error,
|
|
find_config_file, load_yaml_config_file,
|
|
extract_domain_configs, config_per_platform)
|
|
|
|
import homeassistant.util.yaml.loader as yaml_loader
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
|
|
REQUIREMENTS = ('colorlog==4.0.2',)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
# pylint: disable=protected-access
|
|
MOCKS = {
|
|
'load': ("homeassistant.util.yaml.loader.load_yaml",
|
|
yaml_loader.load_yaml),
|
|
'load*': ("homeassistant.config.load_yaml", yaml_loader.load_yaml),
|
|
'secrets': ("homeassistant.util.yaml.loader.secret_yaml",
|
|
yaml_loader.secret_yaml),
|
|
}
|
|
SILENCE = (
|
|
'homeassistant.scripts.check_config.yaml_loader.clear_secret_cache',
|
|
)
|
|
|
|
PATCHES = {}
|
|
|
|
C_HEAD = 'bold'
|
|
ERROR_STR = 'General Errors'
|
|
|
|
|
|
def color(the_color, *args, reset=None):
|
|
"""Color helper."""
|
|
from colorlog.escape_codes import escape_codes, parse_colors
|
|
try:
|
|
if not args:
|
|
assert reset is None, "You cannot reset if nothing being printed"
|
|
return parse_colors(the_color)
|
|
return parse_colors(the_color) + ' '.join(args) + \
|
|
escape_codes[reset or 'reset']
|
|
except KeyError as k:
|
|
raise ValueError("Invalid color {} in {}".format(str(k), the_color))
|
|
|
|
|
|
def run(script_args: List) -> int:
|
|
"""Handle ensure config commandline script."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Check Home Assistant configuration.")
|
|
parser.add_argument(
|
|
'--script', choices=['check_config'])
|
|
parser.add_argument(
|
|
'-c', '--config',
|
|
default=get_default_config_dir(),
|
|
help="Directory that contains the Home Assistant configuration")
|
|
parser.add_argument(
|
|
'-i', '--info', nargs='?',
|
|
default=None, const='all',
|
|
help="Show a portion of the config")
|
|
parser.add_argument(
|
|
'-f', '--files',
|
|
action='store_true',
|
|
help="Show used configuration files")
|
|
parser.add_argument(
|
|
'-s', '--secrets',
|
|
action='store_true',
|
|
help="Show secret information")
|
|
|
|
args, unknown = parser.parse_known_args()
|
|
if unknown:
|
|
print(color('red', "Unknown arguments:", ', '.join(unknown)))
|
|
|
|
config_dir = os.path.join(os.getcwd(), args.config)
|
|
|
|
print(color('bold', "Testing configuration at", config_dir))
|
|
|
|
res = check(config_dir, args.secrets)
|
|
|
|
domain_info = []
|
|
if args.info:
|
|
domain_info = args.info.split(',')
|
|
|
|
if args.files:
|
|
print(color(C_HEAD, 'yaml files'), '(used /',
|
|
color('red', 'not used') + ')')
|
|
deps = os.path.join(config_dir, 'deps')
|
|
yaml_files = [f for f in glob(os.path.join(config_dir, '**/*.yaml'),
|
|
recursive=True)
|
|
if not f.startswith(deps)]
|
|
|
|
for yfn in sorted(yaml_files):
|
|
the_color = '' if yfn in res['yaml_files'] else 'red'
|
|
print(color(the_color, '-', yfn))
|
|
|
|
if res['except']:
|
|
print(color('bold_white', 'Failed config'))
|
|
for domain, config in res['except'].items():
|
|
domain_info.append(domain)
|
|
print(' ', color('bold_red', domain + ':'),
|
|
color('red', '', reset='red'))
|
|
dump_dict(config, reset='red')
|
|
print(color('reset'))
|
|
|
|
if domain_info:
|
|
if 'all' in domain_info:
|
|
print(color('bold_white', 'Successful config (all)'))
|
|
for domain, config in res['components'].items():
|
|
print(' ', color(C_HEAD, domain + ':'))
|
|
dump_dict(config)
|
|
else:
|
|
print(color('bold_white', 'Successful config (partial)'))
|
|
for domain in domain_info:
|
|
if domain == ERROR_STR:
|
|
continue
|
|
print(' ', color(C_HEAD, domain + ':'))
|
|
dump_dict(res['components'].get(domain, None))
|
|
|
|
if args.secrets:
|
|
flatsecret = {}
|
|
|
|
for sfn, sdict in res['secret_cache'].items():
|
|
sss = []
|
|
for skey in sdict:
|
|
if skey in flatsecret:
|
|
_LOGGER.error('Duplicated secrets in files %s and %s',
|
|
flatsecret[skey], sfn)
|
|
flatsecret[skey] = sfn
|
|
sss.append(color('green', skey) if skey in res['secrets']
|
|
else skey)
|
|
print(color(C_HEAD, 'Secrets from', sfn + ':'), ', '.join(sss))
|
|
|
|
print(color(C_HEAD, 'Used Secrets:'))
|
|
for skey, sval in res['secrets'].items():
|
|
if sval is None:
|
|
print(' -', skey + ':', color('red', "not found"))
|
|
continue
|
|
print(' -', skey + ':', sval, color('cyan', '[from:', flatsecret
|
|
.get(skey, 'keyring') + ']'))
|
|
|
|
return len(res['except'])
|
|
|
|
|
|
def check(config_dir, secrets=False):
|
|
"""Perform a check by mocking hass load functions."""
|
|
logging.getLogger('homeassistant.loader').setLevel(logging.CRITICAL)
|
|
res = {
|
|
'yaml_files': OrderedDict(), # yaml_files loaded
|
|
'secrets': OrderedDict(), # secret cache and secrets loaded
|
|
'except': OrderedDict(), # exceptions raised (with config)
|
|
'components': None, # successful components
|
|
'secret_cache': None,
|
|
}
|
|
|
|
# pylint: disable=possibly-unused-variable
|
|
def mock_load(filename):
|
|
"""Mock hass.util.load_yaml to save config file names."""
|
|
res['yaml_files'][filename] = True
|
|
return MOCKS['load'][1](filename)
|
|
|
|
# pylint: disable=possibly-unused-variable
|
|
def mock_secrets(ldr, node):
|
|
"""Mock _get_secrets."""
|
|
try:
|
|
val = MOCKS['secrets'][1](ldr, node)
|
|
except HomeAssistantError:
|
|
val = None
|
|
res['secrets'][node.value] = val
|
|
return val
|
|
|
|
# Patches to skip functions
|
|
for sil in SILENCE:
|
|
PATCHES[sil] = patch(sil)
|
|
|
|
# Patches with local mock functions
|
|
for key, val in MOCKS.items():
|
|
if not secrets and key == 'secrets':
|
|
continue
|
|
# The * in the key is removed to find the mock_function (side_effect)
|
|
# This allows us to use one side_effect to patch multiple locations
|
|
mock_function = locals()['mock_' + key.replace('*', '')]
|
|
PATCHES[key] = patch(val[0], side_effect=mock_function)
|
|
|
|
# Start all patches
|
|
for pat in PATCHES.values():
|
|
pat.start()
|
|
|
|
if secrets:
|
|
# Ensure !secrets point to the patched function
|
|
yaml_loader.yaml.SafeLoader.add_constructor('!secret',
|
|
yaml_loader.secret_yaml)
|
|
|
|
try:
|
|
hass = core.HomeAssistant()
|
|
hass.config.config_dir = config_dir
|
|
|
|
res['components'] = hass.loop.run_until_complete(
|
|
check_ha_config_file(hass))
|
|
res['secret_cache'] = OrderedDict(yaml_loader.__SECRET_CACHE)
|
|
|
|
for err in res['components'].errors:
|
|
domain = err.domain or ERROR_STR
|
|
res['except'].setdefault(domain, []).append(err.message)
|
|
if err.config:
|
|
res['except'].setdefault(domain, []).append(err.config)
|
|
|
|
except Exception as err: # pylint: disable=broad-except
|
|
_LOGGER.exception("BURB")
|
|
print(color('red', 'Fatal error while loading config:'), str(err))
|
|
res['except'].setdefault(ERROR_STR, []).append(str(err))
|
|
finally:
|
|
# Stop all patches
|
|
for pat in PATCHES.values():
|
|
pat.stop()
|
|
if secrets:
|
|
# Ensure !secrets point to the original function
|
|
yaml_loader.yaml.SafeLoader.add_constructor(
|
|
'!secret', yaml_loader.secret_yaml)
|
|
bootstrap.clear_secret_cache()
|
|
|
|
return res
|
|
|
|
|
|
def line_info(obj, **kwargs):
|
|
"""Display line config source."""
|
|
if hasattr(obj, '__config_file__'):
|
|
return color('cyan', "[source {}:{}]"
|
|
.format(obj.__config_file__, obj.__line__ or '?'),
|
|
**kwargs)
|
|
return '?'
|
|
|
|
|
|
def dump_dict(layer, indent_count=3, listi=False, **kwargs):
|
|
"""Display a dict.
|
|
|
|
A friendly version of print yaml_loader.yaml.dump(config).
|
|
"""
|
|
def sort_dict_key(val):
|
|
"""Return the dict key for sorting."""
|
|
key = str(val[0]).lower()
|
|
return '0' if key == 'platform' else key
|
|
|
|
indent_str = indent_count * ' '
|
|
if listi or isinstance(layer, list):
|
|
indent_str = indent_str[:-1] + '-'
|
|
if isinstance(layer, Dict):
|
|
for key, value in sorted(layer.items(), key=sort_dict_key):
|
|
if isinstance(value, (dict, list)):
|
|
print(indent_str, str(key) + ':', line_info(value, **kwargs))
|
|
dump_dict(value, indent_count + 2)
|
|
else:
|
|
print(indent_str, str(key) + ':', value)
|
|
indent_str = indent_count * ' '
|
|
if isinstance(layer, Sequence):
|
|
for i in layer:
|
|
if isinstance(i, dict):
|
|
dump_dict(i, indent_count + 2, True)
|
|
else:
|
|
print(' ', indent_str, i)
|
|
|
|
|
|
CheckConfigError = namedtuple(
|
|
'CheckConfigError', "message domain config")
|
|
|
|
|
|
@attr.s
|
|
class HomeAssistantConfig(OrderedDict):
|
|
"""Configuration result with errors attribute."""
|
|
|
|
errors = attr.ib(default=attr.Factory(list))
|
|
|
|
def add_error(self, message, domain=None, config=None):
|
|
"""Add a single error."""
|
|
self.errors.append(CheckConfigError(str(message), domain, config))
|
|
return self
|
|
|
|
|
|
async def check_ha_config_file(hass):
|
|
"""Check if Home Assistant configuration file is valid."""
|
|
config_dir = hass.config.config_dir
|
|
result = HomeAssistantConfig()
|
|
|
|
def _pack_error(package, component, config, message):
|
|
"""Handle errors from packages: _log_pkg_error."""
|
|
message = "Package {} setup failed. Component {} {}".format(
|
|
package, component, message)
|
|
domain = 'homeassistant.packages.{}.{}'.format(package, component)
|
|
pack_config = core_config[CONF_PACKAGES].get(package, config)
|
|
result.add_error(message, domain, pack_config)
|
|
|
|
def _comp_error(ex, domain, config):
|
|
"""Handle errors from components: async_log_exception."""
|
|
result.add_error(
|
|
_format_config_error(ex, domain, config), domain, config)
|
|
|
|
# Load configuration.yaml
|
|
try:
|
|
config_path = await hass.async_add_executor_job(
|
|
find_config_file, config_dir)
|
|
if not config_path:
|
|
return result.add_error("File configuration.yaml not found.")
|
|
config = await hass.async_add_executor_job(
|
|
load_yaml_config_file, config_path)
|
|
except HomeAssistantError as err:
|
|
return result.add_error(
|
|
"Error loading {}: {}".format(config_path, err))
|
|
finally:
|
|
yaml_loader.clear_secret_cache()
|
|
|
|
# Extract and validate core [homeassistant] config
|
|
try:
|
|
core_config = config.pop(CONF_CORE, {})
|
|
core_config = CORE_CONFIG_SCHEMA(core_config)
|
|
result[CONF_CORE] = core_config
|
|
except vol.Invalid as err:
|
|
result.add_error(err, CONF_CORE, core_config)
|
|
core_config = {}
|
|
|
|
# Merge packages
|
|
await merge_packages_config(
|
|
hass, config, core_config.get(CONF_PACKAGES, {}), _pack_error)
|
|
core_config.pop(CONF_PACKAGES, None)
|
|
|
|
# Filter out repeating config sections
|
|
components = set(key.split(' ')[0] for key in config.keys())
|
|
|
|
# Process and validate config
|
|
for domain in components:
|
|
try:
|
|
integration = await loader.async_get_integration(hass, domain)
|
|
except loader.IntegrationNotFound:
|
|
result.add_error("Integration not found: {}".format(domain))
|
|
continue
|
|
|
|
try:
|
|
component = integration.get_component()
|
|
except ImportError:
|
|
result.add_error("Component not found: {}".format(domain))
|
|
continue
|
|
|
|
if (not hass.config.skip_pip and integration.requirements and
|
|
not await requirements.async_process_requirements(
|
|
hass, integration.domain, integration.requirements)):
|
|
result.add_error("Unable to install all requirements: {}".format(
|
|
', '.join(integration.requirements)))
|
|
continue
|
|
|
|
if hasattr(component, 'CONFIG_SCHEMA'):
|
|
try:
|
|
config = component.CONFIG_SCHEMA(config)
|
|
result[domain] = config[domain]
|
|
except vol.Invalid as ex:
|
|
_comp_error(ex, domain, config)
|
|
continue
|
|
|
|
component_platform_schema = getattr(
|
|
component, 'PLATFORM_SCHEMA_BASE',
|
|
getattr(component, 'PLATFORM_SCHEMA', None))
|
|
|
|
if component_platform_schema is None:
|
|
continue
|
|
|
|
platforms = []
|
|
for p_name, p_config in config_per_platform(config, domain):
|
|
# Validate component specific platform schema
|
|
try:
|
|
p_validated = component_platform_schema( # type: ignore
|
|
p_config)
|
|
except vol.Invalid as ex:
|
|
_comp_error(ex, domain, config)
|
|
continue
|
|
|
|
# Not all platform components follow same pattern for platforms
|
|
# So if p_name is None we are not going to validate platform
|
|
# (the automation component is one of them)
|
|
if p_name is None:
|
|
platforms.append(p_validated)
|
|
continue
|
|
|
|
try:
|
|
p_integration = await loader.async_get_integration(hass,
|
|
p_name)
|
|
except loader.IntegrationNotFound:
|
|
result.add_error(
|
|
"Integration {} not found when trying to verify its {} "
|
|
"platform.".format(p_name, domain))
|
|
continue
|
|
|
|
try:
|
|
platform = p_integration.get_platform(domain)
|
|
except ImportError:
|
|
result.add_error(
|
|
"Platform not found: {}.{}".format(domain, p_name))
|
|
continue
|
|
|
|
# Validate platform specific schema
|
|
if hasattr(platform, 'PLATFORM_SCHEMA'):
|
|
try:
|
|
p_validated = platform.PLATFORM_SCHEMA(p_validated)
|
|
except vol.Invalid as ex:
|
|
_comp_error(
|
|
ex, '{}.{}'.format(domain, p_name), p_validated)
|
|
continue
|
|
|
|
platforms.append(p_validated)
|
|
|
|
# Remove config for current component and add validated config back in.
|
|
for filter_comp in extract_domain_configs(config, domain):
|
|
del config[filter_comp]
|
|
result[domain] = platforms
|
|
|
|
return result
|