2016-08-23 06:42:05 +02:00
"""Script to ensure a configuration file exists."""
import argparse
2016-09-21 06:26:40 +02:00
import logging
2016-08-23 06:42:05 +02:00
import os
from glob import glob
from platform import system
2016-09-21 06:26:40 +02:00
from unittest.mock import patch
from typing import Dict, List, Sequence
2016-09-23 09:10:19 +02:00
from collections import OrderedDict
2016-08-23 06:42:05 +02:00
import homeassistant.bootstrap as bootstrap
import homeassistant.config as config_util
import homeassistant.loader as loader
import homeassistant.util.yaml as yaml
2016-09-21 06:26:40 +02:00
from homeassistant.exceptions import HomeAssistantError
2016-08-23 06:42:05 +02:00
2016-08-27 01:33:57 +02:00
REQUIREMENTS = ('colorlog>2.1,<3',)
2016-08-23 06:42:05 +02:00
if system() == 'Windows': # Ensure colorama installed for colorlog on Windows
REQUIREMENTS += ('colorama<=1',)
_LOGGER = logging.getLogger(__name__)
# pylint: disable=protected-access
'load': ("homeassistant.util.yaml.load_yaml", yaml.load_yaml),
2016-08-25 07:18:32 +02:00
'load*': ("homeassistant.config.load_yaml", yaml.load_yaml),
2016-08-23 06:42:05 +02:00
'get': ("homeassistant.loader.get_component", loader.get_component),
'secrets': ("homeassistant.util.yaml._secret_yaml", yaml._secret_yaml),
'except': ("homeassistant.bootstrap.log_exception",
2016-08-25 07:18:32 +02:00
2016-08-23 06:42:05 +02:00
C_HEAD = 'bold'
ERROR_STR = 'General Errors'
def color(the_color, *args, reset=None):
"""Color helper."""
from colorlog.escape_codes import escape_codes, parse_colors
if len(args) == 0:
assert reset is None, "You cannot reset if nothing being printed"
return parse_colors(the_color)
return parse_colors(the_color) + ' '.join(args) + \
escape_codes[reset or 'reset']
except KeyError as k:
raise ValueError("Invalid color {} in {}".format(str(k), the_color))
# pylint: disable=too-many-locals, too-many-branches
def run(script_args: List) -> int:
"""Handle ensure config commandline script."""
parser = argparse.ArgumentParser(
description=("Check Home Assistant configuration."))
'--script', choices=['check_config'])
'-c', '--config',
help="Directory that contains the Home Assistant configuration")
'-i', '--info',
help="Show a portion of the config")
'-f', '--files',
help="Show used configuration files")
'-s', '--secrets',
help="Show secret information")
args = parser.parse_args()
config_dir = os.path.join(os.getcwd(), args.config)
config_path = os.path.join(config_dir, 'configuration.yaml')
if not os.path.isfile(config_path):
print('Config does not exist:', config_path)
return 1
print(color('bold', "Testing configuration at", config_dir))
domain_info = []
if args.info:
domain_info = args.info.split(',')
res = check(config_path)
if args.files:
print(color(C_HEAD, 'yaml files'), '(used /',
2016-09-21 06:26:40 +02:00
color('red', 'not used') + ')')
2016-08-23 06:42:05 +02:00
# Python 3.5 gets a recursive, but not in 3.4
for yfn in sorted(glob(os.path.join(config_dir, '*.yaml')) +
glob(os.path.join(config_dir, '*/*.yaml'))):
the_color = '' if yfn in res['yaml_files'] else 'red'
print(color(the_color, '-', yfn))
if len(res['except']) > 0:
print(color('bold_white', 'Failed config'))
for domain, config in res['except'].items():
print(' ', color('bold_red', domain + ':'),
color('red', '', reset='red'))
2016-09-23 09:10:19 +02:00
dump_dict(config, reset='red')
2016-08-23 06:42:05 +02:00
if domain_info:
if 'all' in domain_info:
print(color('bold_white', 'Successful config (all)'))
2016-08-25 07:18:32 +02:00
for domain, config in res['components'].items():
2016-08-27 01:33:57 +02:00
print(' ', color(C_HEAD, domain + ':'))
2016-09-23 09:10:19 +02:00
2016-08-23 06:42:05 +02:00
print(color('bold_white', 'Successful config (partial)'))
for domain in domain_info:
if domain == ERROR_STR:
print(' ', color(C_HEAD, domain + ':'))
2016-09-23 09:10:19 +02:00
dump_dict(res['components'].get(domain, None))
2016-08-23 06:42:05 +02:00
if args.secrets:
flatsecret = {}
for sfn, sdict in res['secret_cache'].items():
sss = []
for skey, sval in sdict.items():
if skey in flatsecret:
_LOGGER.error('Duplicated secrets in files %s and %s',
flatsecret[skey], sfn)
flatsecret[skey] = sfn
sss.append(color('green', skey) if skey in res['secrets']
else skey)
print(color(C_HEAD, 'Secrets from', sfn + ':'), ', '.join(sss))
print(color(C_HEAD, 'Used Secrets:'))
for skey, sval in res['secrets'].items():
print(' -', skey + ':', sval, color('cyan', '[from:', flatsecret
.get(skey, 'keyring') + ']'))
return 0
def check(config_path):
"""Perform a check by mocking hass load functions."""
res = {
2016-09-23 09:10:19 +02:00
'yaml_files': OrderedDict(), # yaml_files loaded
'secrets': OrderedDict(), # secret cache and secrets loaded
'except': OrderedDict(), # exceptions raised (with config)
'components': OrderedDict(), # successful components
'secret_cache': OrderedDict(),
2016-08-23 06:42:05 +02:00
def mock_load(filename): # pylint: disable=unused-variable
"""Mock hass.util.load_yaml to save config files."""
res['yaml_files'][filename] = True
return MOCKS['load'][1](filename)
def mock_get(comp_name): # pylint: disable=unused-variable
"""Mock hass.loader.get_component to replace setup & setup_platform."""
def mock_setup(*kwargs):
"""Mock setup, only record the component name & config."""
assert comp_name not in res['components'], \
"Components should contain a list of platforms"
res['components'][comp_name] = kwargs[1].get(comp_name)
return True
module = MOCKS['get'][1](comp_name)
if module is None:
# Ensure list
res['except'][ERROR_STR] = res['except'].get(ERROR_STR, [])
res['except'][ERROR_STR].append('{} not found: {}'.format(
'Platform' if '.' in comp_name else 'Component', comp_name))
return None
# Test if platform/component and overwrite setup
if '.' in comp_name:
module.setup_platform = mock_setup
module.setup = mock_setup
return module
def mock_secrets(ldr, node): # pylint: disable=unused-variable
"""Mock _get_secrets."""
val = MOCKS['secrets'][1](ldr, node)
except HomeAssistantError:
val = None
res['secrets'][node.value] = val
return val
def mock_except(ex, domain, config): # pylint: disable=unused-variable
"""Mock bootstrap.log_exception."""
MOCKS['except'][1](ex, domain, config)
res['except'][domain] = config.get(domain, config)
# Patches to skip functions
for sil in SILENCE:
PATCHES[sil] = patch(sil)
# Patches with local mock functions
for key, val in MOCKS.items():
2016-08-25 07:18:32 +02:00
# The * in the key is removed to find the mock_function (side_effect)
# This allows us to use one side_effect to patch multiple locations
mock_function = locals()['mock_' + key.replace('*', '')]
2016-08-23 06:42:05 +02:00
PATCHES[key] = patch(val[0], side_effect=mock_function)
# Start all patches
for pat in PATCHES.values():
# Ensure !secrets point to the patched function
yaml.yaml.SafeLoader.add_constructor('!secret', yaml._secret_yaml)
bootstrap.from_config_file(config_path, skip_pip=True)
2016-09-08 22:20:38 +02:00
res['secret_cache'] = dict(yaml.__SECRET_CACHE)
except Exception as err: # pylint: disable=broad-except
print(color('red', 'Fatal error while loading config:'), str(err))
2016-08-23 06:42:05 +02:00
# Stop all patches
for pat in PATCHES.values():
# Ensure !secrets point to the original function
yaml.yaml.SafeLoader.add_constructor('!secret', yaml._secret_yaml)
2016-09-08 22:20:38 +02:00
return res
2016-08-23 06:42:05 +02:00
2016-09-23 09:10:19 +02:00
def dump_dict(layer, indent_count=3, listi=False, **kwargs):
2016-08-23 06:42:05 +02:00
"""Display a dict.
A friendly version of print yaml.yaml.dump(config).
def line_src(this):
"""Display line config source."""
if hasattr(this, '__config_file__'):
return color('cyan', "[source {}:{}]"
.format(this.__config_file__, this.__line__ or '?'),
return ''
2016-09-23 09:10:19 +02:00
def sort_dict_key(val):
"""Return the dict key for sorting."""
skey = str.lower(val[0])
if str(skey) == 'platform':
skey = '0'
return skey
2016-08-23 06:42:05 +02:00
indent_str = indent_count * ' '
if listi or isinstance(layer, list):
2016-09-21 06:26:40 +02:00
indent_str = indent_str[:-1] + '-'
2016-08-23 06:42:05 +02:00
if isinstance(layer, Dict):
2016-09-23 09:10:19 +02:00
for key, value in sorted(layer.items(), key=sort_dict_key):
2016-08-23 06:42:05 +02:00
if isinstance(value, dict) or isinstance(value, list):
print(indent_str, key + ':', line_src(value))
2016-09-21 06:26:40 +02:00
dump_dict(value, indent_count + 2)
2016-08-23 06:42:05 +02:00
print(indent_str, key + ':', value)
indent_str = indent_count * ' '
if isinstance(layer, Sequence):
for i in layer:
if isinstance(i, dict):
2016-09-23 09:10:19 +02:00
dump_dict(i, indent_count+2, True)
2016-08-23 06:42:05 +02:00
2016-09-23 09:10:19 +02:00
print(' ', indent_str, i)