Refactor KNX tests (#53183)

* refactor tests for KNX

- implement KNXTestKit class for convenient setup and assertion of KNX telegrams
- add fixture returning an instance of KNXTestKit with automatic cleanup test

* add tests for expose default attribute

- fix expose edge case not covered by #53046

* use asyncio.Queue instead of AsyncMock.call_args_list

for better readability

* get xknx from Mock instead of hass.data

* fix type annotations

* add injection methods for incoming telegrams

* rest read-response in expose
This commit is contained in:
Matthias Alphart 2021-07-20 06:39:19 +02:00 committed by GitHub
parent f0b28c90bf
commit e8d7952880
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 254 additions and 80 deletions

View file

@ -141,7 +141,8 @@ class KNXExposeSensor:
if new_value is None:
return
old_state = event.data.get("old_state")
old_value = self._get_expose_value(old_state)
# don't use default value for comparison on first state change (old_state is None)
old_value = self._get_expose_value(old_state) if old_state is not None else None
# don't send same value sequentially
if new_value != old_value:
await self._async_set_knx_value(new_value)

View file

@ -1,27 +1 @@
"""Tests for the KNX integration."""
from unittest.mock import DEFAULT, patch
from homeassistant.components.knx.const import DOMAIN as KNX_DOMAIN
from homeassistant.setup import async_setup_component
async def setup_knx_integration(hass, knx_ip_interface, config=None):
"""Create the KNX gateway."""
if config is None:
config = {}
# To get the XKNX object from the constructor call
def side_effect(*args, **kwargs):
knx_ip_interface.xknx = args[0]
# switch off rate delimiter
knx_ip_interface.xknx.rate_limit = 0
return DEFAULT
with patch(
"xknx.xknx.KNXIPInterface",
return_value=knx_ip_interface,
side_effect=side_effect,
):
await async_setup_component(hass, KNX_DOMAIN, {KNX_DOMAIN: config})
await hass.async_block_till_done()

View file

@ -1,15 +1,183 @@
"""conftest for knx."""
"""Conftest for the KNX integration."""
from __future__ import annotations
from unittest.mock import AsyncMock, Mock
import asyncio
from unittest.mock import DEFAULT, AsyncMock, Mock, patch
import pytest
from xknx import XKNX
from xknx.dpt import DPTArray, DPTBinary
from xknx.telegram import Telegram, TelegramDirection
from xknx.telegram.address import GroupAddress, IndividualAddress
from xknx.telegram.apci import APCI, GroupValueRead, GroupValueResponse, GroupValueWrite
from homeassistant.components.knx.const import DOMAIN as KNX_DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
@pytest.fixture(autouse=True)
def knx_ip_interface_mock():
"""Create a knx ip interface mock."""
mock = Mock()
mock.start = AsyncMock()
mock.stop = AsyncMock()
mock.send_telegram = AsyncMock()
return mock
class KNXTestKit:
"""Test helper for the KNX integration."""
def __init__(self, hass: HomeAssistant):
"""Init KNX test helper class."""
self.hass: HomeAssistant = hass
self.xknx: XKNX
# outgoing telegrams will be put in the Queue instead of sent to the interface
# telegrams to an InternalGroupAddress won't be queued here
self._outgoing_telegrams: asyncio.Queue = asyncio.Queue()
async def setup_integration(self, config):
"""Create the KNX integration."""
def knx_ip_interface_mock():
"""Create a xknx knx ip interface mock."""
mock = Mock()
mock.start = AsyncMock()
mock.stop = AsyncMock()
mock.send_telegram = AsyncMock(side_effect=self._outgoing_telegrams.put)
return mock
def fish_xknx(*args, **kwargs):
"""Get the XKNX object from the constructor call."""
self.xknx = args[0]
return DEFAULT
with patch(
"xknx.xknx.KNXIPInterface",
return_value=knx_ip_interface_mock(),
side_effect=fish_xknx,
):
await async_setup_component(self.hass, KNX_DOMAIN, {KNX_DOMAIN: config})
await self.hass.async_block_till_done()
# disable rate limiter for tests
self.xknx.rate_limit = 0
########################
# Telegram counter tests
########################
def _list_remaining_telegrams(self) -> str:
"""Return a string containing remaining outgoing telegrams in test Queue. One per line."""
remaining_telegrams = []
while not self._outgoing_telegrams.empty():
remaining_telegrams.append(self._outgoing_telegrams.get_nowait())
return "\n".join(map(str, remaining_telegrams))
async def assert_no_telegram(self) -> None:
"""Assert if every telegram in test Queue was checked."""
await self.hass.async_block_till_done()
assert self._outgoing_telegrams.empty(), (
f"Found remaining unasserted Telegrams: {self._outgoing_telegrams.qsize()}\n"
f"{self._list_remaining_telegrams()}"
)
async def assert_telegram_count(self, count: int) -> None:
"""Assert outgoing telegram count in test Queue."""
await self.hass.async_block_till_done()
actual_count = self._outgoing_telegrams.qsize()
assert actual_count == count, (
f"Outgoing telegrams: {actual_count} - Expected: {count}\n"
f"{self._list_remaining_telegrams()}"
)
####################
# APCI Service tests
####################
async def _assert_telegram(
self,
group_address: str,
payload: int | tuple[int, ...] | None,
apci_type: type[APCI],
) -> None:
"""Assert outgoing telegram. One by one in timely order."""
await self.hass.async_block_till_done()
try:
telegram = self._outgoing_telegrams.get_nowait()
except asyncio.QueueEmpty:
raise AssertionError(
f"No Telegram found. Expected: {apci_type.__name__} -"
f" {group_address} - {payload}"
)
assert (
str(telegram.destination_address) == group_address
), f"Group address mismatch in {telegram} - Expected: {group_address}"
assert isinstance(
telegram.payload, apci_type
), f"APCI type mismatch in {telegram} - Expected: {apci_type.__name__}"
if payload is not None:
assert (
telegram.payload.value.value == payload # type: ignore
), f"Payload mismatch in {telegram} - Expected: {payload}"
async def assert_read(self, group_address: str) -> None:
"""Assert outgoing GroupValueRead telegram. One by one in timely order."""
await self._assert_telegram(group_address, None, GroupValueRead)
async def assert_response(
self, group_address: str, payload: int | tuple[int, ...]
) -> None:
"""Assert outgoing GroupValueResponse telegram. One by one in timely order."""
await self._assert_telegram(group_address, payload, GroupValueResponse)
async def assert_write(
self, group_address: str, payload: int | tuple[int, ...]
) -> None:
"""Assert outgoing GroupValueWrite telegram. One by one in timely order."""
await self._assert_telegram(group_address, payload, GroupValueWrite)
####################
# Incoming telegrams
####################
async def _receive_telegram(self, group_address: str, payload: APCI) -> None:
"""Inject incoming KNX telegram."""
self.xknx.telegrams.put_nowait(
Telegram(
destination_address=GroupAddress(group_address),
direction=TelegramDirection.INCOMING,
payload=payload,
source_address=IndividualAddress("1.2.3"),
)
)
await self.hass.async_block_till_done()
@staticmethod
def _payload_value(payload: int | tuple[int, ...]) -> DPTArray | DPTBinary:
"""Prepare payload value for GroupValueWrite or GroupValueResponse."""
if isinstance(payload, int):
return DPTBinary(payload)
return DPTArray(payload)
async def receive_read(
self,
group_address: str,
) -> None:
"""Inject incoming GroupValueRead telegram."""
await self._receive_telegram(group_address, GroupValueRead())
async def receive_response(
self, group_address: str, payload: int | tuple[int, ...]
) -> None:
"""Inject incoming GroupValueResponse telegram."""
payload_value = self._payload_value(payload)
await self._receive_telegram(group_address, GroupValueResponse(payload_value))
async def receive_write(
self, group_address: str, payload: int | tuple[int, ...]
) -> None:
"""Inject incoming GroupValueWrite telegram."""
payload_value = self._payload_value(payload)
await self._receive_telegram(group_address, GroupValueWrite(payload_value))
@pytest.fixture
async def knx(request, hass):
"""Create a KNX TestKit instance."""
knx_test_kit = KNXTestKit(hass)
yield knx_test_kit
await knx_test_kit.assert_no_telegram()

View file

@ -1,18 +1,13 @@
"""Test knx expose."""
"""Test KNX expose."""
from homeassistant.components.knx import CONF_KNX_EXPOSE, KNX_ADDRESS
from homeassistant.components.knx.schema import ExposeSchema
from homeassistant.const import CONF_ATTRIBUTE, CONF_ENTITY_ID, CONF_TYPE
from . import setup_knx_integration
async def test_binary_expose(hass, knx_ip_interface_mock):
"""Test that a binary expose sends only telegrams on state change."""
async def test_binary_expose(hass, knx):
"""Test a binary expose to only send telegrams on state change."""
entity_id = "fake.entity"
await setup_knx_integration(
hass,
knx_ip_interface_mock,
await knx.setup_integration(
{
CONF_KNX_EXPOSE: {
CONF_TYPE: "binary",
@ -24,37 +19,23 @@ async def test_binary_expose(hass, knx_ip_interface_mock):
assert not hass.states.async_all()
# Change state to on
knx_ip_interface_mock.reset_mock()
hass.states.async_set(entity_id, "on", {})
await hass.async_block_till_done()
assert (
knx_ip_interface_mock.send_telegram.call_count == 1
), "Expected telegram for state change"
await knx.assert_write("1/1/8", True)
# Change attribute; keep state
knx_ip_interface_mock.reset_mock()
hass.states.async_set(entity_id, "on", {"brightness": 180})
await hass.async_block_till_done()
assert (
knx_ip_interface_mock.send_telegram.call_count == 0
), "Expected no telegram; state not changed"
await knx.assert_no_telegram()
# Change attribute and state
knx_ip_interface_mock.reset_mock()
hass.states.async_set(entity_id, "off", {"brightness": 0})
await hass.async_block_till_done()
assert (
knx_ip_interface_mock.send_telegram.call_count == 1
), "Expected telegram for state change"
await knx.assert_write("1/1/8", False)
async def test_expose_attribute(hass, knx_ip_interface_mock):
"""Test that an expose sends only telegrams on attribute change."""
async def test_expose_attribute(hass, knx):
"""Test an expose to only send telegrams on attribute change."""
entity_id = "fake.entity"
attribute = "fake_attribute"
await setup_knx_integration(
hass,
knx_ip_interface_mock,
await knx.setup_integration(
{
CONF_KNX_EXPOSE: {
CONF_TYPE: "percentU8",
@ -66,26 +47,76 @@ async def test_expose_attribute(hass, knx_ip_interface_mock):
)
assert not hass.states.async_all()
# Change state to on; no attribute
knx_ip_interface_mock.reset_mock()
# Before init no response shall be sent
await knx.receive_read("1/1/8")
await knx.assert_telegram_count(0)
# Change state to "on"; no attribute
hass.states.async_set(entity_id, "on", {})
await hass.async_block_till_done()
assert knx_ip_interface_mock.send_telegram.call_count == 0
await knx.assert_telegram_count(0)
# Change attribute; keep state
knx_ip_interface_mock.reset_mock()
hass.states.async_set(entity_id, "on", {attribute: 1})
await hass.async_block_till_done()
assert knx_ip_interface_mock.send_telegram.call_count == 1
await knx.assert_write("1/1/8", (1,))
# Read in between
await knx.receive_read("1/1/8")
await knx.assert_response("1/1/8", (1,))
# Change state keep attribute
knx_ip_interface_mock.reset_mock()
hass.states.async_set(entity_id, "off", {attribute: 1})
await hass.async_block_till_done()
assert knx_ip_interface_mock.send_telegram.call_count == 0
await knx.assert_telegram_count(0)
# Change state and attribute
knx_ip_interface_mock.reset_mock()
hass.states.async_set(entity_id, "on", {attribute: 0})
await hass.async_block_till_done()
assert knx_ip_interface_mock.send_telegram.call_count == 1
await knx.assert_write("1/1/8", (0,))
# Change state to "off"; no attribute
hass.states.async_set(entity_id, "off", {})
await knx.assert_telegram_count(0)
async def test_expose_attribute_with_default(hass, knx):
"""Test an expose to only send telegrams on attribute change."""
entity_id = "fake.entity"
attribute = "fake_attribute"
await knx.setup_integration(
{
CONF_KNX_EXPOSE: {
CONF_TYPE: "percentU8",
KNX_ADDRESS: "1/1/8",
CONF_ENTITY_ID: entity_id,
CONF_ATTRIBUTE: attribute,
ExposeSchema.CONF_KNX_EXPOSE_DEFAULT: 0,
}
},
)
assert not hass.states.async_all()
# Before init default value shall be sent as response
await knx.receive_read("1/1/8")
await knx.assert_response("1/1/8", (0,))
# Change state to "on"; no attribute
hass.states.async_set(entity_id, "on", {})
await knx.assert_write("1/1/8", (0,))
# Change attribute; keep state
hass.states.async_set(entity_id, "on", {attribute: 1})
await knx.assert_write("1/1/8", (1,))
# Change state keep attribute
hass.states.async_set(entity_id, "off", {attribute: 1})
await knx.assert_no_telegram()
# Change state and attribute
hass.states.async_set(entity_id, "on", {attribute: 3})
await knx.assert_write("1/1/8", (3,))
# Read in between
await knx.receive_read("1/1/8")
await knx.assert_response("1/1/8", (3,))
# Change state to "off"; no attribute
hass.states.async_set(entity_id, "off", {})
await knx.assert_write("1/1/8", (0,))