From e8d7952880cb0c8fa20ea500b9dfb44c4733c663 Mon Sep 17 00:00:00 2001 From: Matthias Alphart Date: Tue, 20 Jul 2021 06:39:19 +0200 Subject: [PATCH] Refactor KNX tests (#53183) * refactor tests for KNX - implement KNXTestKit class for convenient setup and assertion of KNX telegrams - add fixture returning an instance of KNXTestKit with automatic cleanup test * add tests for expose default attribute - fix expose edge case not covered by #53046 * use asyncio.Queue instead of AsyncMock.call_args_list for better readability * get xknx from Mock instead of hass.data * fix type annotations * add injection methods for incoming telegrams * rest read-response in expose --- homeassistant/components/knx/expose.py | 3 +- tests/components/knx/__init__.py | 26 ---- tests/components/knx/conftest.py | 188 +++++++++++++++++++++++-- tests/components/knx/test_expose.py | 117 +++++++++------ 4 files changed, 254 insertions(+), 80 deletions(-) diff --git a/homeassistant/components/knx/expose.py b/homeassistant/components/knx/expose.py index 5b92f9f1f6a..408ab25e7cc 100644 --- a/homeassistant/components/knx/expose.py +++ b/homeassistant/components/knx/expose.py @@ -141,7 +141,8 @@ class KNXExposeSensor: if new_value is None: return old_state = event.data.get("old_state") - old_value = self._get_expose_value(old_state) + # don't use default value for comparison on first state change (old_state is None) + old_value = self._get_expose_value(old_state) if old_state is not None else None # don't send same value sequentially if new_value != old_value: await self._async_set_knx_value(new_value) diff --git a/tests/components/knx/__init__.py b/tests/components/knx/__init__.py index 1c9bfaf15b8..eaa84714dc5 100644 --- a/tests/components/knx/__init__.py +++ b/tests/components/knx/__init__.py @@ -1,27 +1 @@ """Tests for the KNX integration.""" - -from unittest.mock import DEFAULT, patch - -from homeassistant.components.knx.const import DOMAIN as KNX_DOMAIN -from homeassistant.setup import async_setup_component - - -async def setup_knx_integration(hass, knx_ip_interface, config=None): - """Create the KNX gateway.""" - if config is None: - config = {} - - # To get the XKNX object from the constructor call - def side_effect(*args, **kwargs): - knx_ip_interface.xknx = args[0] - # switch off rate delimiter - knx_ip_interface.xknx.rate_limit = 0 - return DEFAULT - - with patch( - "xknx.xknx.KNXIPInterface", - return_value=knx_ip_interface, - side_effect=side_effect, - ): - await async_setup_component(hass, KNX_DOMAIN, {KNX_DOMAIN: config}) - await hass.async_block_till_done() diff --git a/tests/components/knx/conftest.py b/tests/components/knx/conftest.py index b7c27774f78..548e620813a 100644 --- a/tests/components/knx/conftest.py +++ b/tests/components/knx/conftest.py @@ -1,15 +1,183 @@ -"""conftest for knx.""" +"""Conftest for the KNX integration.""" +from __future__ import annotations -from unittest.mock import AsyncMock, Mock +import asyncio +from unittest.mock import DEFAULT, AsyncMock, Mock, patch import pytest +from xknx import XKNX +from xknx.dpt import DPTArray, DPTBinary +from xknx.telegram import Telegram, TelegramDirection +from xknx.telegram.address import GroupAddress, IndividualAddress +from xknx.telegram.apci import APCI, GroupValueRead, GroupValueResponse, GroupValueWrite + +from homeassistant.components.knx.const import DOMAIN as KNX_DOMAIN +from homeassistant.core import HomeAssistant +from homeassistant.setup import async_setup_component -@pytest.fixture(autouse=True) -def knx_ip_interface_mock(): - """Create a knx ip interface mock.""" - mock = Mock() - mock.start = AsyncMock() - mock.stop = AsyncMock() - mock.send_telegram = AsyncMock() - return mock +class KNXTestKit: + """Test helper for the KNX integration.""" + + def __init__(self, hass: HomeAssistant): + """Init KNX test helper class.""" + self.hass: HomeAssistant = hass + self.xknx: XKNX + # outgoing telegrams will be put in the Queue instead of sent to the interface + # telegrams to an InternalGroupAddress won't be queued here + self._outgoing_telegrams: asyncio.Queue = asyncio.Queue() + + async def setup_integration(self, config): + """Create the KNX integration.""" + + def knx_ip_interface_mock(): + """Create a xknx knx ip interface mock.""" + mock = Mock() + mock.start = AsyncMock() + mock.stop = AsyncMock() + mock.send_telegram = AsyncMock(side_effect=self._outgoing_telegrams.put) + return mock + + def fish_xknx(*args, **kwargs): + """Get the XKNX object from the constructor call.""" + self.xknx = args[0] + return DEFAULT + + with patch( + "xknx.xknx.KNXIPInterface", + return_value=knx_ip_interface_mock(), + side_effect=fish_xknx, + ): + await async_setup_component(self.hass, KNX_DOMAIN, {KNX_DOMAIN: config}) + await self.hass.async_block_till_done() + # disable rate limiter for tests + self.xknx.rate_limit = 0 + + ######################## + # Telegram counter tests + ######################## + + def _list_remaining_telegrams(self) -> str: + """Return a string containing remaining outgoing telegrams in test Queue. One per line.""" + remaining_telegrams = [] + while not self._outgoing_telegrams.empty(): + remaining_telegrams.append(self._outgoing_telegrams.get_nowait()) + return "\n".join(map(str, remaining_telegrams)) + + async def assert_no_telegram(self) -> None: + """Assert if every telegram in test Queue was checked.""" + await self.hass.async_block_till_done() + assert self._outgoing_telegrams.empty(), ( + f"Found remaining unasserted Telegrams: {self._outgoing_telegrams.qsize()}\n" + f"{self._list_remaining_telegrams()}" + ) + + async def assert_telegram_count(self, count: int) -> None: + """Assert outgoing telegram count in test Queue.""" + await self.hass.async_block_till_done() + actual_count = self._outgoing_telegrams.qsize() + assert actual_count == count, ( + f"Outgoing telegrams: {actual_count} - Expected: {count}\n" + f"{self._list_remaining_telegrams()}" + ) + + #################### + # APCI Service tests + #################### + + async def _assert_telegram( + self, + group_address: str, + payload: int | tuple[int, ...] | None, + apci_type: type[APCI], + ) -> None: + """Assert outgoing telegram. One by one in timely order.""" + await self.hass.async_block_till_done() + try: + telegram = self._outgoing_telegrams.get_nowait() + except asyncio.QueueEmpty: + raise AssertionError( + f"No Telegram found. Expected: {apci_type.__name__} -" + f" {group_address} - {payload}" + ) + + assert ( + str(telegram.destination_address) == group_address + ), f"Group address mismatch in {telegram} - Expected: {group_address}" + + assert isinstance( + telegram.payload, apci_type + ), f"APCI type mismatch in {telegram} - Expected: {apci_type.__name__}" + + if payload is not None: + assert ( + telegram.payload.value.value == payload # type: ignore + ), f"Payload mismatch in {telegram} - Expected: {payload}" + + async def assert_read(self, group_address: str) -> None: + """Assert outgoing GroupValueRead telegram. One by one in timely order.""" + await self._assert_telegram(group_address, None, GroupValueRead) + + async def assert_response( + self, group_address: str, payload: int | tuple[int, ...] + ) -> None: + """Assert outgoing GroupValueResponse telegram. One by one in timely order.""" + await self._assert_telegram(group_address, payload, GroupValueResponse) + + async def assert_write( + self, group_address: str, payload: int | tuple[int, ...] + ) -> None: + """Assert outgoing GroupValueWrite telegram. One by one in timely order.""" + await self._assert_telegram(group_address, payload, GroupValueWrite) + + #################### + # Incoming telegrams + #################### + + async def _receive_telegram(self, group_address: str, payload: APCI) -> None: + """Inject incoming KNX telegram.""" + self.xknx.telegrams.put_nowait( + Telegram( + destination_address=GroupAddress(group_address), + direction=TelegramDirection.INCOMING, + payload=payload, + source_address=IndividualAddress("1.2.3"), + ) + ) + await self.hass.async_block_till_done() + + @staticmethod + def _payload_value(payload: int | tuple[int, ...]) -> DPTArray | DPTBinary: + """Prepare payload value for GroupValueWrite or GroupValueResponse.""" + if isinstance(payload, int): + return DPTBinary(payload) + return DPTArray(payload) + + async def receive_read( + self, + group_address: str, + ) -> None: + """Inject incoming GroupValueRead telegram.""" + await self._receive_telegram(group_address, GroupValueRead()) + + async def receive_response( + self, group_address: str, payload: int | tuple[int, ...] + ) -> None: + """Inject incoming GroupValueResponse telegram.""" + payload_value = self._payload_value(payload) + await self._receive_telegram(group_address, GroupValueResponse(payload_value)) + + async def receive_write( + self, group_address: str, payload: int | tuple[int, ...] + ) -> None: + """Inject incoming GroupValueWrite telegram.""" + payload_value = self._payload_value(payload) + await self._receive_telegram(group_address, GroupValueWrite(payload_value)) + + +@pytest.fixture +async def knx(request, hass): + """Create a KNX TestKit instance.""" + knx_test_kit = KNXTestKit(hass) + yield knx_test_kit + await knx_test_kit.assert_no_telegram() diff --git a/tests/components/knx/test_expose.py b/tests/components/knx/test_expose.py index 908ef0a56f8..6922b6ed926 100644 --- a/tests/components/knx/test_expose.py +++ b/tests/components/knx/test_expose.py @@ -1,18 +1,13 @@ -"""Test knx expose.""" - - +"""Test KNX expose.""" from homeassistant.components.knx import CONF_KNX_EXPOSE, KNX_ADDRESS +from homeassistant.components.knx.schema import ExposeSchema from homeassistant.const import CONF_ATTRIBUTE, CONF_ENTITY_ID, CONF_TYPE -from . import setup_knx_integration - -async def test_binary_expose(hass, knx_ip_interface_mock): - """Test that a binary expose sends only telegrams on state change.""" +async def test_binary_expose(hass, knx): + """Test a binary expose to only send telegrams on state change.""" entity_id = "fake.entity" - await setup_knx_integration( - hass, - knx_ip_interface_mock, + await knx.setup_integration( { CONF_KNX_EXPOSE: { CONF_TYPE: "binary", @@ -24,37 +19,23 @@ async def test_binary_expose(hass, knx_ip_interface_mock): assert not hass.states.async_all() # Change state to on - knx_ip_interface_mock.reset_mock() hass.states.async_set(entity_id, "on", {}) - await hass.async_block_till_done() - assert ( - knx_ip_interface_mock.send_telegram.call_count == 1 - ), "Expected telegram for state change" + await knx.assert_write("1/1/8", True) # Change attribute; keep state - knx_ip_interface_mock.reset_mock() hass.states.async_set(entity_id, "on", {"brightness": 180}) - await hass.async_block_till_done() - assert ( - knx_ip_interface_mock.send_telegram.call_count == 0 - ), "Expected no telegram; state not changed" + await knx.assert_no_telegram() # Change attribute and state - knx_ip_interface_mock.reset_mock() hass.states.async_set(entity_id, "off", {"brightness": 0}) - await hass.async_block_till_done() - assert ( - knx_ip_interface_mock.send_telegram.call_count == 1 - ), "Expected telegram for state change" + await knx.assert_write("1/1/8", False) -async def test_expose_attribute(hass, knx_ip_interface_mock): - """Test that an expose sends only telegrams on attribute change.""" +async def test_expose_attribute(hass, knx): + """Test an expose to only send telegrams on attribute change.""" entity_id = "fake.entity" attribute = "fake_attribute" - await setup_knx_integration( - hass, - knx_ip_interface_mock, + await knx.setup_integration( { CONF_KNX_EXPOSE: { CONF_TYPE: "percentU8", @@ -66,26 +47,76 @@ async def test_expose_attribute(hass, knx_ip_interface_mock): ) assert not hass.states.async_all() - # Change state to on; no attribute - knx_ip_interface_mock.reset_mock() + # Before init no response shall be sent + await knx.receive_read("1/1/8") + await knx.assert_telegram_count(0) + + # Change state to "on"; no attribute hass.states.async_set(entity_id, "on", {}) - await hass.async_block_till_done() - assert knx_ip_interface_mock.send_telegram.call_count == 0 + await knx.assert_telegram_count(0) # Change attribute; keep state - knx_ip_interface_mock.reset_mock() hass.states.async_set(entity_id, "on", {attribute: 1}) - await hass.async_block_till_done() - assert knx_ip_interface_mock.send_telegram.call_count == 1 + await knx.assert_write("1/1/8", (1,)) + + # Read in between + await knx.receive_read("1/1/8") + await knx.assert_response("1/1/8", (1,)) # Change state keep attribute - knx_ip_interface_mock.reset_mock() hass.states.async_set(entity_id, "off", {attribute: 1}) - await hass.async_block_till_done() - assert knx_ip_interface_mock.send_telegram.call_count == 0 + await knx.assert_telegram_count(0) # Change state and attribute - knx_ip_interface_mock.reset_mock() hass.states.async_set(entity_id, "on", {attribute: 0}) - await hass.async_block_till_done() - assert knx_ip_interface_mock.send_telegram.call_count == 1 + await knx.assert_write("1/1/8", (0,)) + + # Change state to "off"; no attribute + hass.states.async_set(entity_id, "off", {}) + await knx.assert_telegram_count(0) + + +async def test_expose_attribute_with_default(hass, knx): + """Test an expose to only send telegrams on attribute change.""" + entity_id = "fake.entity" + attribute = "fake_attribute" + await knx.setup_integration( + { + CONF_KNX_EXPOSE: { + CONF_TYPE: "percentU8", + KNX_ADDRESS: "1/1/8", + CONF_ENTITY_ID: entity_id, + CONF_ATTRIBUTE: attribute, + ExposeSchema.CONF_KNX_EXPOSE_DEFAULT: 0, + } + }, + ) + assert not hass.states.async_all() + + # Before init default value shall be sent as response + await knx.receive_read("1/1/8") + await knx.assert_response("1/1/8", (0,)) + + # Change state to "on"; no attribute + hass.states.async_set(entity_id, "on", {}) + await knx.assert_write("1/1/8", (0,)) + + # Change attribute; keep state + hass.states.async_set(entity_id, "on", {attribute: 1}) + await knx.assert_write("1/1/8", (1,)) + + # Change state keep attribute + hass.states.async_set(entity_id, "off", {attribute: 1}) + await knx.assert_no_telegram() + + # Change state and attribute + hass.states.async_set(entity_id, "on", {attribute: 3}) + await knx.assert_write("1/1/8", (3,)) + + # Read in between + await knx.receive_read("1/1/8") + await knx.assert_response("1/1/8", (3,)) + + # Change state to "off"; no attribute + hass.states.async_set(entity_id, "off", {}) + await knx.assert_write("1/1/8", (0,))