diff --git a/tests/components/logbook/test_init.py b/tests/components/logbook/test_init.py index 7c16faf2361..36d47115acc 100644 --- a/tests/components/logbook/test_init.py +++ b/tests/components/logbook/test_init.py @@ -3,7 +3,6 @@ import collections from datetime import datetime, timedelta import json -import unittest import pytest import voluptuous as vol @@ -41,264 +40,269 @@ from tests.async_mock import Mock, patch from tests.common import get_test_home_assistant, init_recorder_component, mock_platform from tests.components.recorder.common import trigger_db_commit +EMPTY_CONFIG = logbook.CONFIG_SCHEMA({logbook.DOMAIN: {}}) -class TestComponentLogbook(unittest.TestCase): - """Test the History component.""" - EMPTY_CONFIG = logbook.CONFIG_SCHEMA({logbook.DOMAIN: {}}) +@pytest.fixture +def hass_(): + """Set up things to be run when tests are started.""" + hass = get_test_home_assistant() + init_recorder_component(hass) # Force an in memory DB + with patch("homeassistant.components.http.start_http_server_and_save_config"): + assert setup_component(hass, logbook.DOMAIN, EMPTY_CONFIG) + yield hass + hass.stop() - def setUp(self): - """Set up things to be run when tests are started.""" - self.hass = get_test_home_assistant() - init_recorder_component(self.hass) # Force an in memory DB - with patch("homeassistant.components.http.start_http_server_and_save_config"): - assert setup_component(self.hass, logbook.DOMAIN, self.EMPTY_CONFIG) - self.addCleanup(self.hass.stop) - def test_service_call_create_logbook_entry(self): - """Test if service call create log book entry.""" - calls = [] +def test_service_call_create_logbook_entry(hass_): + """Test if service call create log book entry.""" + calls = [] - @ha.callback - def event_listener(event): - """Append on event.""" - calls.append(event) + @ha.callback + def event_listener(event): + """Append on event.""" + calls.append(event) - self.hass.bus.listen(logbook.EVENT_LOGBOOK_ENTRY, event_listener) - self.hass.services.call( - logbook.DOMAIN, - "log", - { - logbook.ATTR_NAME: "Alarm", - logbook.ATTR_MESSAGE: "is triggered", - logbook.ATTR_DOMAIN: "switch", - logbook.ATTR_ENTITY_ID: "switch.test_switch", - }, - True, + hass_.bus.listen(logbook.EVENT_LOGBOOK_ENTRY, event_listener) + hass_.services.call( + logbook.DOMAIN, + "log", + { + logbook.ATTR_NAME: "Alarm", + logbook.ATTR_MESSAGE: "is triggered", + logbook.ATTR_DOMAIN: "switch", + logbook.ATTR_ENTITY_ID: "switch.test_switch", + }, + True, + ) + hass_.services.call( + logbook.DOMAIN, + "log", + { + logbook.ATTR_NAME: "This entry", + logbook.ATTR_MESSAGE: "has no domain or entity_id", + }, + True, + ) + # Logbook entry service call results in firing an event. + # Our service call will unblock when the event listeners have been + # scheduled. This means that they may not have been processed yet. + trigger_db_commit(hass_) + hass_.block_till_done() + hass_.data[recorder.DATA_INSTANCE].block_till_done() + + events = list( + logbook._get_events( + hass_, + dt_util.utcnow() - timedelta(hours=1), + dt_util.utcnow() + timedelta(hours=1), ) - self.hass.services.call( - logbook.DOMAIN, - "log", - { - logbook.ATTR_NAME: "This entry", - logbook.ATTR_MESSAGE: "has no domain or entity_id", - }, - True, - ) - # Logbook entry service call results in firing an event. - # Our service call will unblock when the event listeners have been - # scheduled. This means that they may not have been processed yet. - trigger_db_commit(self.hass) - self.hass.block_till_done() - self.hass.data[recorder.DATA_INSTANCE].block_till_done() + ) + assert len(events) == 2 - events = list( - logbook._get_events( - self.hass, - dt_util.utcnow() - timedelta(hours=1), - dt_util.utcnow() + timedelta(hours=1), - ) - ) - assert len(events) == 2 + assert len(calls) == 2 + first_call = calls[-2] - assert len(calls) == 2 - first_call = calls[-2] + assert first_call.data.get(logbook.ATTR_NAME) == "Alarm" + assert first_call.data.get(logbook.ATTR_MESSAGE) == "is triggered" + assert first_call.data.get(logbook.ATTR_DOMAIN) == "switch" + assert first_call.data.get(logbook.ATTR_ENTITY_ID) == "switch.test_switch" - assert first_call.data.get(logbook.ATTR_NAME) == "Alarm" - assert first_call.data.get(logbook.ATTR_MESSAGE) == "is triggered" - assert first_call.data.get(logbook.ATTR_DOMAIN) == "switch" - assert first_call.data.get(logbook.ATTR_ENTITY_ID) == "switch.test_switch" + last_call = calls[-1] - last_call = calls[-1] + assert last_call.data.get(logbook.ATTR_NAME) == "This entry" + assert last_call.data.get(logbook.ATTR_MESSAGE) == "has no domain or entity_id" + assert last_call.data.get(logbook.ATTR_DOMAIN) == "logbook" - assert last_call.data.get(logbook.ATTR_NAME) == "This entry" - assert last_call.data.get(logbook.ATTR_MESSAGE) == "has no domain or entity_id" - assert last_call.data.get(logbook.ATTR_DOMAIN) == "logbook" - def test_service_call_create_log_book_entry_no_message(self): - """Test if service call create log book entry without message.""" - calls = [] +def test_service_call_create_log_book_entry_no_message(hass_): + """Test if service call create log book entry without message.""" + calls = [] - @ha.callback - def event_listener(event): - """Append on event.""" - calls.append(event) + @ha.callback + def event_listener(event): + """Append on event.""" + calls.append(event) - self.hass.bus.listen(logbook.EVENT_LOGBOOK_ENTRY, event_listener) + hass_.bus.listen(logbook.EVENT_LOGBOOK_ENTRY, event_listener) - with pytest.raises(vol.Invalid): - self.hass.services.call(logbook.DOMAIN, "log", {}, True) + with pytest.raises(vol.Invalid): + hass_.services.call(logbook.DOMAIN, "log", {}, True) - # Logbook entry service call results in firing an event. - # Our service call will unblock when the event listeners have been - # scheduled. This means that they may not have been processed yet. - self.hass.block_till_done() + # Logbook entry service call results in firing an event. + # Our service call will unblock when the event listeners have been + # scheduled. This means that they may not have been processed yet. + hass_.block_till_done() - assert len(calls) == 0 + assert len(calls) == 0 - def test_humanify_filter_sensor(self): - """Test humanify filter too frequent sensor values.""" - entity_id = "sensor.bla" - pointA = dt_util.utcnow().replace(minute=2) - pointB = pointA.replace(minute=5) - pointC = pointA + timedelta(minutes=logbook.GROUP_BY_MINUTES) - entity_attr_cache = logbook.EntityAttributeCache(self.hass) +def test_humanify_filter_sensor(hass_): + """Test humanify filter too frequent sensor values.""" + entity_id = "sensor.bla" - eventA = self.create_state_changed_event(pointA, entity_id, 10) - eventB = self.create_state_changed_event(pointB, entity_id, 20) - eventC = self.create_state_changed_event(pointC, entity_id, 30) + pointA = dt_util.utcnow().replace(minute=2) + pointB = pointA.replace(minute=5) + pointC = pointA + timedelta(minutes=logbook.GROUP_BY_MINUTES) + entity_attr_cache = logbook.EntityAttributeCache(hass_) - entries = list( - logbook.humanify(self.hass, (eventA, eventB, eventC), entity_attr_cache, {}) - ) + eventA = create_state_changed_event(pointA, entity_id, 10) + eventB = create_state_changed_event(pointB, entity_id, 20) + eventC = create_state_changed_event(pointC, entity_id, 30) - assert len(entries) == 2 - self.assert_entry(entries[0], pointB, "bla", entity_id=entity_id) + entries = list( + logbook.humanify(hass_, (eventA, eventB, eventC), entity_attr_cache, {}) + ) - self.assert_entry(entries[1], pointC, "bla", entity_id=entity_id) + assert len(entries) == 2 + assert_entry(entries[0], pointB, "bla", entity_id=entity_id) - def test_home_assistant_start_stop_grouped(self): - """Test if HA start and stop events are grouped. + assert_entry(entries[1], pointC, "bla", entity_id=entity_id) - Events that are occurring in the same minute. - """ - entity_attr_cache = logbook.EntityAttributeCache(self.hass) - entries = list( - logbook.humanify( - self.hass, - ( - MockLazyEventPartialState(EVENT_HOMEASSISTANT_STOP), - MockLazyEventPartialState(EVENT_HOMEASSISTANT_START), - ), - entity_attr_cache, - {}, + +def test_home_assistant_start_stop_grouped(hass_): + """Test if HA start and stop events are grouped. + + Events that are occurring in the same minute. + """ + entity_attr_cache = logbook.EntityAttributeCache(hass_) + entries = list( + logbook.humanify( + hass_, + ( + MockLazyEventPartialState(EVENT_HOMEASSISTANT_STOP), + MockLazyEventPartialState(EVENT_HOMEASSISTANT_START), ), + entity_attr_cache, + {}, + ), + ) + + assert len(entries) == 1 + assert_entry( + entries[0], name="Home Assistant", message="restarted", domain=ha.DOMAIN + ) + + +def test_home_assistant_start(hass_): + """Test if HA start is not filtered or converted into a restart.""" + entity_id = "switch.bla" + pointA = dt_util.utcnow() + entity_attr_cache = logbook.EntityAttributeCache(hass_) + + entries = list( + logbook.humanify( + hass_, + ( + MockLazyEventPartialState(EVENT_HOMEASSISTANT_START), + create_state_changed_event(pointA, entity_id, 10), + ), + entity_attr_cache, + {}, ) + ) - assert len(entries) == 1 - self.assert_entry( - entries[0], name="Home Assistant", message="restarted", domain=ha.DOMAIN - ) + assert len(entries) == 2 + assert_entry(entries[0], name="Home Assistant", message="started", domain=ha.DOMAIN) + assert_entry(entries[1], pointA, "bla", entity_id=entity_id) - def test_home_assistant_start(self): - """Test if HA start is not filtered or converted into a restart.""" - entity_id = "switch.bla" - pointA = dt_util.utcnow() - entity_attr_cache = logbook.EntityAttributeCache(self.hass) - entries = list( - logbook.humanify( - self.hass, - ( - MockLazyEventPartialState(EVENT_HOMEASSISTANT_START), - self.create_state_changed_event(pointA, entity_id, 10), +def test_process_custom_logbook_entries(hass_): + """Test if custom log book entries get added as an entry.""" + name = "Nice name" + message = "has a custom entry" + entity_id = "sun.sun" + entity_attr_cache = logbook.EntityAttributeCache(hass_) + + entries = list( + logbook.humanify( + hass_, + ( + MockLazyEventPartialState( + logbook.EVENT_LOGBOOK_ENTRY, + { + logbook.ATTR_NAME: name, + logbook.ATTR_MESSAGE: message, + logbook.ATTR_ENTITY_ID: entity_id, + }, ), - entity_attr_cache, - {}, - ) + ), + entity_attr_cache, + {}, ) + ) - assert len(entries) == 2 - self.assert_entry( - entries[0], name="Home Assistant", message="started", domain=ha.DOMAIN - ) - self.assert_entry(entries[1], pointA, "bla", entity_id=entity_id) + assert len(entries) == 1 + assert_entry(entries[0], name=name, message=message, entity_id=entity_id) - def test_process_custom_logbook_entries(self): - """Test if custom log book entries get added as an entry.""" - name = "Nice name" - message = "has a custom entry" - entity_id = "sun.sun" - entity_attr_cache = logbook.EntityAttributeCache(self.hass) - entries = list( - logbook.humanify( - self.hass, - ( - MockLazyEventPartialState( - logbook.EVENT_LOGBOOK_ENTRY, - { - logbook.ATTR_NAME: name, - logbook.ATTR_MESSAGE: message, - logbook.ATTR_ENTITY_ID: entity_id, - }, - ), - ), - entity_attr_cache, - {}, - ) - ) +# pylint: disable=no-self-use +def assert_entry( + entry, when=None, name=None, message=None, domain=None, entity_id=None +): + """Assert an entry is what is expected.""" + return _assert_entry(entry, when, name, message, domain, entity_id) - assert len(entries) == 1 - self.assert_entry(entries[0], name=name, message=message, entity_id=entity_id) - # pylint: disable=no-self-use - def assert_entry( - self, entry, when=None, name=None, message=None, domain=None, entity_id=None - ): - """Assert an entry is what is expected.""" - return _assert_entry(entry, when, name, message, domain, entity_id) +def create_state_changed_event( + event_time_fired, + entity_id, + state, + attributes=None, + last_changed=None, + last_updated=None, +): + """Create state changed event.""" + old_state = ha.State( + entity_id, "old", attributes, last_changed, last_updated + ).as_dict() + new_state = ha.State( + entity_id, state, attributes, last_changed, last_updated + ).as_dict() - def create_state_changed_event( - self, - event_time_fired, - entity_id, - state, - attributes=None, - last_changed=None, - last_updated=None, - ): - """Create state changed event.""" - old_state = ha.State( - entity_id, "old", attributes, last_changed, last_updated - ).as_dict() - new_state = ha.State( - entity_id, state, attributes, last_changed, last_updated - ).as_dict() + return create_state_changed_event_from_old_new( + entity_id, event_time_fired, old_state, new_state + ) - return self.create_state_changed_event_from_old_new( - entity_id, event_time_fired, old_state, new_state - ) - # pylint: disable=no-self-use - def create_state_changed_event_from_old_new( - self, entity_id, event_time_fired, old_state, new_state - ): - """Create a state changed event from a old and new state.""" - attributes = {} - if new_state is not None: - attributes = new_state.get("attributes") - attributes_json = json.dumps(attributes, cls=JSONEncoder) - row = collections.namedtuple( - "Row", - [ - "event_type" - "event_data" - "time_fired" - "context_id" - "context_user_id" - "state" - "entity_id" - "domain" - "attributes" - "state_id", - "old_state_id", - ], - ) +# pylint: disable=no-self-use +def create_state_changed_event_from_old_new( + entity_id, event_time_fired, old_state, new_state +): + """Create a state changed event from a old and new state.""" + attributes = {} + if new_state is not None: + attributes = new_state.get("attributes") + attributes_json = json.dumps(attributes, cls=JSONEncoder) + row = collections.namedtuple( + "Row", + [ + "event_type" + "event_data" + "time_fired" + "context_id" + "context_user_id" + "state" + "entity_id" + "domain" + "attributes" + "state_id", + "old_state_id", + ], + ) - row.event_type = EVENT_STATE_CHANGED - row.event_data = "{}" - row.attributes = attributes_json - row.time_fired = event_time_fired - row.state = new_state and new_state.get("state") - row.entity_id = entity_id - row.domain = entity_id and ha.split_entity_id(entity_id)[0] - row.context_id = None - row.context_user_id = None - row.old_state_id = old_state and 1 - row.state_id = new_state and 1 - return logbook.LazyEventPartialState(row) + row.event_type = EVENT_STATE_CHANGED + row.event_data = "{}" + row.attributes = attributes_json + row.time_fired = event_time_fired + row.state = new_state and new_state.get("state") + row.entity_id = entity_id + row.domain = entity_id and ha.split_entity_id(entity_id)[0] + row.context_id = None + row.context_user_id = None + row.old_state_id = old_state and 1 + row.state_id = new_state and 1 + return logbook.LazyEventPartialState(row) async def test_logbook_view(hass, hass_client):