Convert logbook tests to pytest (#42289)

This commit is contained in:
taiyeoguns 2020-10-26 08:18:02 +00:00 committed by GitHub
parent 0cf9a1621a
commit 8f74a0c2e6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -3,7 +3,6 @@
import collections
from datetime import datetime, timedelta
import json
import unittest
import pytest
import voluptuous as vol
@ -41,264 +40,269 @@ from tests.async_mock import Mock, patch
from tests.common import get_test_home_assistant, init_recorder_component, mock_platform
from tests.components.recorder.common import trigger_db_commit
EMPTY_CONFIG = logbook.CONFIG_SCHEMA({logbook.DOMAIN: {}})
class TestComponentLogbook(unittest.TestCase):
"""Test the History component."""
EMPTY_CONFIG = logbook.CONFIG_SCHEMA({logbook.DOMAIN: {}})
@pytest.fixture
def hass_():
"""Set up things to be run when tests are started."""
hass = get_test_home_assistant()
init_recorder_component(hass) # Force an in memory DB
with patch("homeassistant.components.http.start_http_server_and_save_config"):
assert setup_component(hass, logbook.DOMAIN, EMPTY_CONFIG)
yield hass
hass.stop()
def setUp(self):
"""Set up things to be run when tests are started."""
self.hass = get_test_home_assistant()
init_recorder_component(self.hass) # Force an in memory DB
with patch("homeassistant.components.http.start_http_server_and_save_config"):
assert setup_component(self.hass, logbook.DOMAIN, self.EMPTY_CONFIG)
self.addCleanup(self.hass.stop)
def test_service_call_create_logbook_entry(self):
"""Test if service call create log book entry."""
calls = []
def test_service_call_create_logbook_entry(hass_):
"""Test if service call create log book entry."""
calls = []
@ha.callback
def event_listener(event):
"""Append on event."""
calls.append(event)
@ha.callback
def event_listener(event):
"""Append on event."""
calls.append(event)
self.hass.bus.listen(logbook.EVENT_LOGBOOK_ENTRY, event_listener)
self.hass.services.call(
logbook.DOMAIN,
"log",
{
logbook.ATTR_NAME: "Alarm",
logbook.ATTR_MESSAGE: "is triggered",
logbook.ATTR_DOMAIN: "switch",
logbook.ATTR_ENTITY_ID: "switch.test_switch",
},
True,
hass_.bus.listen(logbook.EVENT_LOGBOOK_ENTRY, event_listener)
hass_.services.call(
logbook.DOMAIN,
"log",
{
logbook.ATTR_NAME: "Alarm",
logbook.ATTR_MESSAGE: "is triggered",
logbook.ATTR_DOMAIN: "switch",
logbook.ATTR_ENTITY_ID: "switch.test_switch",
},
True,
)
hass_.services.call(
logbook.DOMAIN,
"log",
{
logbook.ATTR_NAME: "This entry",
logbook.ATTR_MESSAGE: "has no domain or entity_id",
},
True,
)
# Logbook entry service call results in firing an event.
# Our service call will unblock when the event listeners have been
# scheduled. This means that they may not have been processed yet.
trigger_db_commit(hass_)
hass_.block_till_done()
hass_.data[recorder.DATA_INSTANCE].block_till_done()
events = list(
logbook._get_events(
hass_,
dt_util.utcnow() - timedelta(hours=1),
dt_util.utcnow() + timedelta(hours=1),
)
self.hass.services.call(
logbook.DOMAIN,
"log",
{
logbook.ATTR_NAME: "This entry",
logbook.ATTR_MESSAGE: "has no domain or entity_id",
},
True,
)
# Logbook entry service call results in firing an event.
# Our service call will unblock when the event listeners have been
# scheduled. This means that they may not have been processed yet.
trigger_db_commit(self.hass)
self.hass.block_till_done()
self.hass.data[recorder.DATA_INSTANCE].block_till_done()
)
assert len(events) == 2
events = list(
logbook._get_events(
self.hass,
dt_util.utcnow() - timedelta(hours=1),
dt_util.utcnow() + timedelta(hours=1),
)
)
assert len(events) == 2
assert len(calls) == 2
first_call = calls[-2]
assert len(calls) == 2
first_call = calls[-2]
assert first_call.data.get(logbook.ATTR_NAME) == "Alarm"
assert first_call.data.get(logbook.ATTR_MESSAGE) == "is triggered"
assert first_call.data.get(logbook.ATTR_DOMAIN) == "switch"
assert first_call.data.get(logbook.ATTR_ENTITY_ID) == "switch.test_switch"
assert first_call.data.get(logbook.ATTR_NAME) == "Alarm"
assert first_call.data.get(logbook.ATTR_MESSAGE) == "is triggered"
assert first_call.data.get(logbook.ATTR_DOMAIN) == "switch"
assert first_call.data.get(logbook.ATTR_ENTITY_ID) == "switch.test_switch"
last_call = calls[-1]
last_call = calls[-1]
assert last_call.data.get(logbook.ATTR_NAME) == "This entry"
assert last_call.data.get(logbook.ATTR_MESSAGE) == "has no domain or entity_id"
assert last_call.data.get(logbook.ATTR_DOMAIN) == "logbook"
assert last_call.data.get(logbook.ATTR_NAME) == "This entry"
assert last_call.data.get(logbook.ATTR_MESSAGE) == "has no domain or entity_id"
assert last_call.data.get(logbook.ATTR_DOMAIN) == "logbook"
def test_service_call_create_log_book_entry_no_message(self):
"""Test if service call create log book entry without message."""
calls = []
def test_service_call_create_log_book_entry_no_message(hass_):
"""Test if service call create log book entry without message."""
calls = []
@ha.callback
def event_listener(event):
"""Append on event."""
calls.append(event)
@ha.callback
def event_listener(event):
"""Append on event."""
calls.append(event)
self.hass.bus.listen(logbook.EVENT_LOGBOOK_ENTRY, event_listener)
hass_.bus.listen(logbook.EVENT_LOGBOOK_ENTRY, event_listener)
with pytest.raises(vol.Invalid):
self.hass.services.call(logbook.DOMAIN, "log", {}, True)
with pytest.raises(vol.Invalid):
hass_.services.call(logbook.DOMAIN, "log", {}, True)
# Logbook entry service call results in firing an event.
# Our service call will unblock when the event listeners have been
# scheduled. This means that they may not have been processed yet.
self.hass.block_till_done()
# Logbook entry service call results in firing an event.
# Our service call will unblock when the event listeners have been
# scheduled. This means that they may not have been processed yet.
hass_.block_till_done()
assert len(calls) == 0
assert len(calls) == 0
def test_humanify_filter_sensor(self):
"""Test humanify filter too frequent sensor values."""
entity_id = "sensor.bla"
pointA = dt_util.utcnow().replace(minute=2)
pointB = pointA.replace(minute=5)
pointC = pointA + timedelta(minutes=logbook.GROUP_BY_MINUTES)
entity_attr_cache = logbook.EntityAttributeCache(self.hass)
def test_humanify_filter_sensor(hass_):
"""Test humanify filter too frequent sensor values."""
entity_id = "sensor.bla"
eventA = self.create_state_changed_event(pointA, entity_id, 10)
eventB = self.create_state_changed_event(pointB, entity_id, 20)
eventC = self.create_state_changed_event(pointC, entity_id, 30)
pointA = dt_util.utcnow().replace(minute=2)
pointB = pointA.replace(minute=5)
pointC = pointA + timedelta(minutes=logbook.GROUP_BY_MINUTES)
entity_attr_cache = logbook.EntityAttributeCache(hass_)
entries = list(
logbook.humanify(self.hass, (eventA, eventB, eventC), entity_attr_cache, {})
)
eventA = create_state_changed_event(pointA, entity_id, 10)
eventB = create_state_changed_event(pointB, entity_id, 20)
eventC = create_state_changed_event(pointC, entity_id, 30)
assert len(entries) == 2
self.assert_entry(entries[0], pointB, "bla", entity_id=entity_id)
entries = list(
logbook.humanify(hass_, (eventA, eventB, eventC), entity_attr_cache, {})
)
self.assert_entry(entries[1], pointC, "bla", entity_id=entity_id)
assert len(entries) == 2
assert_entry(entries[0], pointB, "bla", entity_id=entity_id)
def test_home_assistant_start_stop_grouped(self):
"""Test if HA start and stop events are grouped.
assert_entry(entries[1], pointC, "bla", entity_id=entity_id)
Events that are occurring in the same minute.
"""
entity_attr_cache = logbook.EntityAttributeCache(self.hass)
entries = list(
logbook.humanify(
self.hass,
(
MockLazyEventPartialState(EVENT_HOMEASSISTANT_STOP),
MockLazyEventPartialState(EVENT_HOMEASSISTANT_START),
),
entity_attr_cache,
{},
def test_home_assistant_start_stop_grouped(hass_):
"""Test if HA start and stop events are grouped.
Events that are occurring in the same minute.
"""
entity_attr_cache = logbook.EntityAttributeCache(hass_)
entries = list(
logbook.humanify(
hass_,
(
MockLazyEventPartialState(EVENT_HOMEASSISTANT_STOP),
MockLazyEventPartialState(EVENT_HOMEASSISTANT_START),
),
entity_attr_cache,
{},
),
)
assert len(entries) == 1
assert_entry(
entries[0], name="Home Assistant", message="restarted", domain=ha.DOMAIN
)
def test_home_assistant_start(hass_):
"""Test if HA start is not filtered or converted into a restart."""
entity_id = "switch.bla"
pointA = dt_util.utcnow()
entity_attr_cache = logbook.EntityAttributeCache(hass_)
entries = list(
logbook.humanify(
hass_,
(
MockLazyEventPartialState(EVENT_HOMEASSISTANT_START),
create_state_changed_event(pointA, entity_id, 10),
),
entity_attr_cache,
{},
)
)
assert len(entries) == 1
self.assert_entry(
entries[0], name="Home Assistant", message="restarted", domain=ha.DOMAIN
)
assert len(entries) == 2
assert_entry(entries[0], name="Home Assistant", message="started", domain=ha.DOMAIN)
assert_entry(entries[1], pointA, "bla", entity_id=entity_id)
def test_home_assistant_start(self):
"""Test if HA start is not filtered or converted into a restart."""
entity_id = "switch.bla"
pointA = dt_util.utcnow()
entity_attr_cache = logbook.EntityAttributeCache(self.hass)
entries = list(
logbook.humanify(
self.hass,
(
MockLazyEventPartialState(EVENT_HOMEASSISTANT_START),
self.create_state_changed_event(pointA, entity_id, 10),
def test_process_custom_logbook_entries(hass_):
"""Test if custom log book entries get added as an entry."""
name = "Nice name"
message = "has a custom entry"
entity_id = "sun.sun"
entity_attr_cache = logbook.EntityAttributeCache(hass_)
entries = list(
logbook.humanify(
hass_,
(
MockLazyEventPartialState(
logbook.EVENT_LOGBOOK_ENTRY,
{
logbook.ATTR_NAME: name,
logbook.ATTR_MESSAGE: message,
logbook.ATTR_ENTITY_ID: entity_id,
},
),
entity_attr_cache,
{},
)
),
entity_attr_cache,
{},
)
)
assert len(entries) == 2
self.assert_entry(
entries[0], name="Home Assistant", message="started", domain=ha.DOMAIN
)
self.assert_entry(entries[1], pointA, "bla", entity_id=entity_id)
assert len(entries) == 1
assert_entry(entries[0], name=name, message=message, entity_id=entity_id)
def test_process_custom_logbook_entries(self):
"""Test if custom log book entries get added as an entry."""
name = "Nice name"
message = "has a custom entry"
entity_id = "sun.sun"
entity_attr_cache = logbook.EntityAttributeCache(self.hass)
entries = list(
logbook.humanify(
self.hass,
(
MockLazyEventPartialState(
logbook.EVENT_LOGBOOK_ENTRY,
{
logbook.ATTR_NAME: name,
logbook.ATTR_MESSAGE: message,
logbook.ATTR_ENTITY_ID: entity_id,
},
),
),
entity_attr_cache,
{},
)
)
# pylint: disable=no-self-use
def assert_entry(
entry, when=None, name=None, message=None, domain=None, entity_id=None
):
"""Assert an entry is what is expected."""
return _assert_entry(entry, when, name, message, domain, entity_id)
assert len(entries) == 1
self.assert_entry(entries[0], name=name, message=message, entity_id=entity_id)
# pylint: disable=no-self-use
def assert_entry(
self, entry, when=None, name=None, message=None, domain=None, entity_id=None
):
"""Assert an entry is what is expected."""
return _assert_entry(entry, when, name, message, domain, entity_id)
def create_state_changed_event(
event_time_fired,
entity_id,
state,
attributes=None,
last_changed=None,
last_updated=None,
):
"""Create state changed event."""
old_state = ha.State(
entity_id, "old", attributes, last_changed, last_updated
).as_dict()
new_state = ha.State(
entity_id, state, attributes, last_changed, last_updated
).as_dict()
def create_state_changed_event(
self,
event_time_fired,
entity_id,
state,
attributes=None,
last_changed=None,
last_updated=None,
):
"""Create state changed event."""
old_state = ha.State(
entity_id, "old", attributes, last_changed, last_updated
).as_dict()
new_state = ha.State(
entity_id, state, attributes, last_changed, last_updated
).as_dict()
return create_state_changed_event_from_old_new(
entity_id, event_time_fired, old_state, new_state
)
return self.create_state_changed_event_from_old_new(
entity_id, event_time_fired, old_state, new_state
)
# pylint: disable=no-self-use
def create_state_changed_event_from_old_new(
self, entity_id, event_time_fired, old_state, new_state
):
"""Create a state changed event from a old and new state."""
attributes = {}
if new_state is not None:
attributes = new_state.get("attributes")
attributes_json = json.dumps(attributes, cls=JSONEncoder)
row = collections.namedtuple(
"Row",
[
"event_type"
"event_data"
"time_fired"
"context_id"
"context_user_id"
"state"
"entity_id"
"domain"
"attributes"
"state_id",
"old_state_id",
],
)
# pylint: disable=no-self-use
def create_state_changed_event_from_old_new(
entity_id, event_time_fired, old_state, new_state
):
"""Create a state changed event from a old and new state."""
attributes = {}
if new_state is not None:
attributes = new_state.get("attributes")
attributes_json = json.dumps(attributes, cls=JSONEncoder)
row = collections.namedtuple(
"Row",
[
"event_type"
"event_data"
"time_fired"
"context_id"
"context_user_id"
"state"
"entity_id"
"domain"
"attributes"
"state_id",
"old_state_id",
],
)
row.event_type = EVENT_STATE_CHANGED
row.event_data = "{}"
row.attributes = attributes_json
row.time_fired = event_time_fired
row.state = new_state and new_state.get("state")
row.entity_id = entity_id
row.domain = entity_id and ha.split_entity_id(entity_id)[0]
row.context_id = None
row.context_user_id = None
row.old_state_id = old_state and 1
row.state_id = new_state and 1
return logbook.LazyEventPartialState(row)
row.event_type = EVENT_STATE_CHANGED
row.event_data = "{}"
row.attributes = attributes_json
row.time_fired = event_time_fired
row.state = new_state and new_state.get("state")
row.entity_id = entity_id
row.domain = entity_id and ha.split_entity_id(entity_id)[0]
row.context_id = None
row.context_user_id = None
row.old_state_id = old_state and 1
row.state_id = new_state and 1
return logbook.LazyEventPartialState(row)
async def test_logbook_view(hass, hass_client):