Ensure configured logger severity is respected (#35749)

This commit is contained in:
J. Nick Koston 2020-05-23 11:12:55 -05:00 committed by GitHub
parent d21cfd869e
commit 2c7eee6722
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 113 additions and 95 deletions

View file

@ -1,6 +1,6 @@
"""Support for setting the level of logging for components."""
from collections import OrderedDict
import logging
import re
import voluptuous as vol
@ -50,33 +50,54 @@ CONFIG_SCHEMA = vol.Schema(
class HomeAssistantLogFilter(logging.Filter):
"""A log filter."""
def __init__(self, logfilter):
def __init__(self):
"""Initialize the filter."""
super().__init__()
self.logfilter = logfilter
self._default = None
self._logs = None
self._log_rx = None
def update_default_level(self, default_level):
"""Update the default logger level."""
self._default = default_level
def update_log_filter(self, logs):
"""Rebuild the internal filter from new config."""
#
# A precompiled regex is used to avoid
# the overhead of a list transversal
#
# Sort to make sure the longer
# names are always matched first
# so they take precedence of the shorter names
# to allow for more granular settings.
#
names_by_len = sorted(list(logs), key=len, reverse=True)
self._log_rx = re.compile("".join(["^(?:", "|".join(names_by_len), ")"]))
self._logs = logs
def filter(self, record):
"""Filter the log entries."""
# Log with filtered severity
if LOGGER_LOGS in self.logfilter:
for filtername in self.logfilter[LOGGER_LOGS]:
logseverity = self.logfilter[LOGGER_LOGS][filtername]
if record.name.startswith(filtername):
return record.levelno >= logseverity
if self._log_rx:
match = self._log_rx.match(record.name)
if match:
return record.levelno >= self._logs[match.group(0)]
# Log with default severity
default = self.logfilter[LOGGER_DEFAULT]
return record.levelno >= default
return record.levelno >= self._default
async def async_setup(hass, config):
"""Set up the logger component."""
logfilter = {}
hass_filter = HomeAssistantLogFilter()
def set_default_log_level(level):
"""Set the default log level for components."""
logfilter[LOGGER_DEFAULT] = LOGSEVERITY[level]
hass_filter.update_default_level(LOGSEVERITY[level])
def set_log_levels(logpoints):
"""Set the specified log levels."""
@ -90,9 +111,9 @@ async def async_setup(hass, config):
for key, value in logpoints.items():
logs[key] = LOGSEVERITY[value]
logfilter[LOGGER_LOGS] = OrderedDict(
sorted(logs.items(), key=lambda t: len(t[0]), reverse=True)
)
logfilter[LOGGER_LOGS] = logs
hass_filter.update_log_filter(logs)
# Set default log severity
if LOGGER_DEFAULT in config.get(DOMAIN):
@ -106,7 +127,7 @@ async def async_setup(hass, config):
# Set log filter for all log handler
for handler in logging.root.handlers:
handler.setLevel(logging.NOTSET)
handler.addFilter(HomeAssistantLogFilter(logfilter))
handler.addFilter(hass_filter)
if LOGGER_LOGS in config.get(DOMAIN):
set_log_levels(config.get(DOMAIN)[LOGGER_LOGS])

View file

@ -1,123 +1,120 @@
"""The tests for the Logger component."""
from collections import namedtuple
import logging
import unittest
from homeassistant.components import logger
from homeassistant.setup import setup_component
from tests.common import get_test_home_assistant
from homeassistant.setup import async_setup_component
RECORD = namedtuple("record", ("name", "levelno"))
NO_DEFAULT_CONFIG = {"logger": {}}
NO_LOGS_CONFIG = {"logger": {"default": "info"}}
TEST_CONFIG = {"logger": {"default": "warning", "logs": {"test": "info"}}}
TEST_CONFIG = {
"logger": {
"default": "warning",
"logs": {"test": "info", "test.child": "debug", "test.child.child": "warning"},
}
}
class TestUpdater(unittest.TestCase):
"""Test logger component."""
async def async_setup_logger(hass, config):
"""Set up logger and save log filter."""
await async_setup_component(hass, logger.DOMAIN, config)
return logging.root.handlers[-1].filters[0]
def setUp(self):
"""Set up things to be run when tests are started."""
self.hass = get_test_home_assistant()
self.log_filter = None
def tearDown(self):
"""Stop everything that was started."""
del logging.root.handlers[-1]
self.hass.stop()
async def test_logger_setup(hass):
"""Use logger to create a logging filter."""
await async_setup_logger(hass, TEST_CONFIG)
def setup_logger(self, config):
"""Set up logger and save log filter."""
setup_component(self.hass, logger.DOMAIN, config)
self.log_filter = logging.root.handlers[-1].filters[0]
assert len(logging.root.handlers) > 0
handler = logging.root.handlers[-1]
def assert_logged(self, name, level):
"""Assert that a certain record was logged."""
assert self.log_filter.filter(RECORD(name, level))
assert len(handler.filters) == 1
def assert_not_logged(self, name, level):
"""Assert that a certain record was not logged."""
assert not self.log_filter.filter(RECORD(name, level))
def test_logger_setup(self):
"""Use logger to create a logging filter."""
self.setup_logger(TEST_CONFIG)
async def test_logger_test_filters(hass):
"""Test resulting filter operation."""
log_filter = await async_setup_logger(hass, TEST_CONFIG)
assert len(logging.root.handlers) > 0
handler = logging.root.handlers[-1]
# Blocked default record
assert not log_filter.filter(RECORD("asdf", logging.DEBUG))
assert len(handler.filters) == 1
log_filter = handler.filters[0].logfilter
# Allowed default record
assert log_filter.filter(RECORD("asdf", logging.WARNING))
assert log_filter["default"] == logging.WARNING
assert log_filter["logs"]["test"] == logging.INFO
# Blocked named record
assert not log_filter.filter(RECORD("test", logging.DEBUG))
def test_logger_test_filters(self):
"""Test resulting filter operation."""
self.setup_logger(TEST_CONFIG)
# Allowed named record
assert log_filter.filter(RECORD("test", logging.INFO))
# Blocked default record
self.assert_not_logged("asdf", logging.DEBUG)
# Allowed named record child
assert log_filter.filter(RECORD("test.child", logging.INFO))
# Allowed default record
self.assert_logged("asdf", logging.WARNING)
# Allowed named record child
assert log_filter.filter(RECORD("test.child", logging.DEBUG))
# Blocked named record
self.assert_not_logged("test", logging.DEBUG)
# Blocked named record child of child
assert not log_filter.filter(RECORD("test.child.child", logging.DEBUG))
# Allowed named record
self.assert_logged("test", logging.INFO)
# Allowed named record child of child
assert log_filter.filter(RECORD("test.child.child", logging.WARNING))
def test_set_filter_empty_config(self):
"""Test change log level from empty configuration."""
self.setup_logger(NO_LOGS_CONFIG)
self.assert_not_logged("test", logging.DEBUG)
async def test_set_filter_empty_config(hass):
"""Test change log level from empty configuration."""
log_filter = await async_setup_logger(hass, NO_LOGS_CONFIG)
self.hass.services.call(logger.DOMAIN, "set_level", {"test": "debug"})
self.hass.block_till_done()
assert not log_filter.filter(RECORD("test", logging.DEBUG))
self.assert_logged("test", logging.DEBUG)
await hass.services.async_call(logger.DOMAIN, "set_level", {"test": "debug"})
await hass.async_block_till_done()
def test_set_filter(self):
"""Test change log level of existing filter."""
self.setup_logger(TEST_CONFIG)
assert log_filter.filter(RECORD("test", logging.DEBUG))
self.assert_not_logged("asdf", logging.DEBUG)
self.assert_logged("dummy", logging.WARNING)
self.hass.services.call(
logger.DOMAIN, "set_level", {"asdf": "debug", "dummy": "info"}
)
self.hass.block_till_done()
async def test_set_filter(hass):
"""Test change log level of existing filter."""
log_filter = await async_setup_logger(hass, TEST_CONFIG)
self.assert_logged("asdf", logging.DEBUG)
self.assert_logged("dummy", logging.WARNING)
assert not log_filter.filter(RECORD("asdf", logging.DEBUG))
assert log_filter.filter(RECORD("dummy", logging.WARNING))
def test_set_default_filter_empty_config(self):
"""Test change default log level from empty configuration."""
self.setup_logger(NO_DEFAULT_CONFIG)
await hass.services.async_call(
logger.DOMAIN, "set_level", {"asdf": "debug", "dummy": "info"}
)
await hass.async_block_till_done()
self.assert_logged("test", logging.DEBUG)
assert log_filter.filter(RECORD("asdf", logging.DEBUG))
assert log_filter.filter(RECORD("dummy", logging.WARNING))
self.hass.services.call(
logger.DOMAIN, "set_default_level", {"level": "warning"}
)
self.hass.block_till_done()
self.assert_not_logged("test", logging.DEBUG)
async def test_set_default_filter_empty_config(hass):
"""Test change default log level from empty configuration."""
log_filter = await async_setup_logger(hass, NO_DEFAULT_CONFIG)
def test_set_default_filter(self):
"""Test change default log level with existing default."""
self.setup_logger(TEST_CONFIG)
assert log_filter.filter(RECORD("test", logging.DEBUG))
self.assert_not_logged("asdf", logging.DEBUG)
self.assert_logged("dummy", logging.WARNING)
await hass.services.async_call(
logger.DOMAIN, "set_default_level", {"level": "warning"}
)
await hass.async_block_till_done()
self.hass.services.call(logger.DOMAIN, "set_default_level", {"level": "debug"})
self.hass.block_till_done()
assert not log_filter.filter(RECORD("test", logging.DEBUG))
self.assert_logged("asdf", logging.DEBUG)
self.assert_logged("dummy", logging.WARNING)
async def test_set_default_filter(hass):
"""Test change default log level with existing default."""
log_filter = await async_setup_logger(hass, TEST_CONFIG)
assert not log_filter.filter(RECORD("asdf", logging.DEBUG))
assert log_filter.filter(RECORD("dummy", logging.WARNING))
await hass.services.async_call(
logger.DOMAIN, "set_default_level", {"level": "debug"}
)
await hass.async_block_till_done()
assert log_filter.filter(RECORD("asdf", logging.DEBUG))
assert log_filter.filter(RECORD("dummy", logging.WARNING))