Add decoded telegram payload to knx_event service (#57621)
* decode knx_event telegram payload with given dpt * exception handling for invalid payloads * Update homeassistant/components/knx/__init__.py Co-authored-by: Marvin Wichmann <marvin@fam-wichmann.de> Co-authored-by: Marvin Wichmann <marvin@fam-wichmann.de>
This commit is contained in:
parent
2e4ee487c1
commit
fc7d4ed118
5 changed files with 180 additions and 40 deletions
|
@ -10,15 +10,22 @@ from xknx import XKNX
|
||||||
from xknx.core import XknxConnectionState
|
from xknx.core import XknxConnectionState
|
||||||
from xknx.core.telegram_queue import TelegramQueue
|
from xknx.core.telegram_queue import TelegramQueue
|
||||||
from xknx.dpt import DPTArray, DPTBase, DPTBinary
|
from xknx.dpt import DPTArray, DPTBase, DPTBinary
|
||||||
from xknx.exceptions import XKNXException
|
from xknx.exceptions import ConversionError, XKNXException
|
||||||
from xknx.io import ConnectionConfig, ConnectionType
|
from xknx.io import ConnectionConfig, ConnectionType
|
||||||
from xknx.telegram import AddressFilter, Telegram
|
from xknx.telegram import AddressFilter, Telegram
|
||||||
from xknx.telegram.address import parse_device_group_address
|
from xknx.telegram.address import (
|
||||||
|
DeviceGroupAddress,
|
||||||
|
GroupAddress,
|
||||||
|
InternalGroupAddress,
|
||||||
|
parse_device_group_address,
|
||||||
|
)
|
||||||
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
|
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
|
||||||
|
|
||||||
from homeassistant.const import (
|
from homeassistant.const import (
|
||||||
|
CONF_EVENT,
|
||||||
CONF_HOST,
|
CONF_HOST,
|
||||||
CONF_PORT,
|
CONF_PORT,
|
||||||
|
CONF_TYPE,
|
||||||
EVENT_HOMEASSISTANT_STOP,
|
EVENT_HOMEASSISTANT_STOP,
|
||||||
SERVICE_RELOAD,
|
SERVICE_RELOAD,
|
||||||
)
|
)
|
||||||
|
@ -46,6 +53,7 @@ from .schema import (
|
||||||
ClimateSchema,
|
ClimateSchema,
|
||||||
ConnectionSchema,
|
ConnectionSchema,
|
||||||
CoverSchema,
|
CoverSchema,
|
||||||
|
EventSchema,
|
||||||
ExposeSchema,
|
ExposeSchema,
|
||||||
FanSchema,
|
FanSchema,
|
||||||
LightSchema,
|
LightSchema,
|
||||||
|
@ -77,6 +85,8 @@ SERVICE_KNX_READ: Final = "read"
|
||||||
CONFIG_SCHEMA = vol.Schema(
|
CONFIG_SCHEMA = vol.Schema(
|
||||||
{
|
{
|
||||||
DOMAIN: vol.All(
|
DOMAIN: vol.All(
|
||||||
|
# deprecated since 2021.12
|
||||||
|
cv.deprecated(CONF_KNX_EVENT_FILTER),
|
||||||
# deprecated since 2021.4
|
# deprecated since 2021.4
|
||||||
cv.deprecated("config_file"),
|
cv.deprecated("config_file"),
|
||||||
# deprecated since 2021.2
|
# deprecated since 2021.2
|
||||||
|
@ -89,6 +99,7 @@ CONFIG_SCHEMA = vol.Schema(
|
||||||
vol.Optional(CONF_KNX_EVENT_FILTER, default=[]): vol.All(
|
vol.Optional(CONF_KNX_EVENT_FILTER, default=[]): vol.All(
|
||||||
cv.ensure_list, [cv.string]
|
cv.ensure_list, [cv.string]
|
||||||
),
|
),
|
||||||
|
**EventSchema.SCHEMA,
|
||||||
**ExposeSchema.platform_node(),
|
**ExposeSchema.platform_node(),
|
||||||
**BinarySensorSchema.platform_node(),
|
**BinarySensorSchema.platform_node(),
|
||||||
**ClimateSchema.platform_node(),
|
**ClimateSchema.platform_node(),
|
||||||
|
@ -149,6 +160,7 @@ SERVICE_KNX_EVENT_REGISTER_SCHEMA = vol.Schema(
|
||||||
cv.ensure_list,
|
cv.ensure_list,
|
||||||
[ga_validator],
|
[ga_validator],
|
||||||
),
|
),
|
||||||
|
vol.Optional(CONF_TYPE): sensor_type_validator,
|
||||||
vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean,
|
vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -268,11 +280,16 @@ class KNXModule:
|
||||||
self.service_exposures: dict[str, KNXExposeSensor | KNXExposeTime] = {}
|
self.service_exposures: dict[str, KNXExposeSensor | KNXExposeTime] = {}
|
||||||
|
|
||||||
self.init_xknx()
|
self.init_xknx()
|
||||||
self._knx_event_callback: TelegramQueue.Callback = self.register_callback()
|
|
||||||
self.xknx.connection_manager.register_connection_state_changed_cb(
|
self.xknx.connection_manager.register_connection_state_changed_cb(
|
||||||
self.connection_state_changed_cb
|
self.connection_state_changed_cb
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._address_filter_transcoder: dict[AddressFilter, type[DPTBase]] = {}
|
||||||
|
self._group_address_transcoder: dict[DeviceGroupAddress, type[DPTBase]] = {}
|
||||||
|
self._knx_event_callback: TelegramQueue.Callback = (
|
||||||
|
self.register_event_callback()
|
||||||
|
)
|
||||||
|
|
||||||
def init_xknx(self) -> None:
|
def init_xknx(self) -> None:
|
||||||
"""Initialize XKNX object."""
|
"""Initialize XKNX object."""
|
||||||
self.xknx = XKNX(
|
self.xknx = XKNX(
|
||||||
|
@ -332,38 +349,77 @@ class KNXModule:
|
||||||
auto_reconnect=True,
|
auto_reconnect=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def connection_state_changed_cb(self, state: XknxConnectionState) -> None:
|
||||||
|
"""Call invoked after a KNX connection state change was received."""
|
||||||
|
self.connected = state == XknxConnectionState.CONNECTED
|
||||||
|
if tasks := [device.after_update() for device in self.xknx.devices]:
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
async def telegram_received_cb(self, telegram: Telegram) -> None:
|
async def telegram_received_cb(self, telegram: Telegram) -> None:
|
||||||
"""Call invoked after a KNX telegram was received."""
|
"""Call invoked after a KNX telegram was received."""
|
||||||
data = None
|
|
||||||
# Not all telegrams have serializable data.
|
# Not all telegrams have serializable data.
|
||||||
|
data: int | tuple[int, ...] | None = None
|
||||||
|
value = None
|
||||||
if (
|
if (
|
||||||
isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse))
|
isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse))
|
||||||
and telegram.payload.value is not None
|
and telegram.payload.value is not None
|
||||||
|
and isinstance(
|
||||||
|
telegram.destination_address, (GroupAddress, InternalGroupAddress)
|
||||||
|
)
|
||||||
):
|
):
|
||||||
data = telegram.payload.value.value
|
data = telegram.payload.value.value
|
||||||
|
|
||||||
|
if isinstance(data, tuple):
|
||||||
|
if transcoder := (
|
||||||
|
self._group_address_transcoder.get(telegram.destination_address)
|
||||||
|
or next(
|
||||||
|
(
|
||||||
|
_transcoder
|
||||||
|
for _filter, _transcoder in self._address_filter_transcoder.items()
|
||||||
|
if _filter.match(telegram.destination_address)
|
||||||
|
),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
value = transcoder.from_knx(data)
|
||||||
|
except ConversionError as err:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Error in `knx_event` at decoding type '%s' from telegram %s\n%s",
|
||||||
|
transcoder.__name__,
|
||||||
|
telegram,
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
|
||||||
self.hass.bus.async_fire(
|
self.hass.bus.async_fire(
|
||||||
"knx_event",
|
"knx_event",
|
||||||
{
|
{
|
||||||
"data": data,
|
"data": data,
|
||||||
"destination": str(telegram.destination_address),
|
"destination": str(telegram.destination_address),
|
||||||
"direction": telegram.direction.value,
|
"direction": telegram.direction.value,
|
||||||
|
"value": value,
|
||||||
"source": str(telegram.source_address),
|
"source": str(telegram.source_address),
|
||||||
"telegramtype": telegram.payload.__class__.__name__,
|
"telegramtype": telegram.payload.__class__.__name__,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
async def connection_state_changed_cb(self, state: XknxConnectionState) -> None:
|
def register_event_callback(self) -> TelegramQueue.Callback:
|
||||||
"""Call invoked after a KNX connection state change was received."""
|
"""Register callback for knx_event within XKNX TelegramQueue."""
|
||||||
self.connected = state == XknxConnectionState.CONNECTED
|
# backwards compatibility for deprecated CONF_KNX_EVENT_FILTER
|
||||||
if tasks := [device.after_update() for device in self.xknx.devices]:
|
# use `address_filters = []` when this is not needed anymore
|
||||||
await asyncio.gather(*tasks)
|
|
||||||
|
|
||||||
def register_callback(self) -> TelegramQueue.Callback:
|
|
||||||
"""Register callback within XKNX TelegramQueue."""
|
|
||||||
address_filters = list(
|
address_filters = list(
|
||||||
map(AddressFilter, self.config[DOMAIN][CONF_KNX_EVENT_FILTER])
|
map(AddressFilter, self.config[DOMAIN][CONF_KNX_EVENT_FILTER])
|
||||||
)
|
)
|
||||||
|
for filter_set in self.config[DOMAIN][CONF_EVENT]:
|
||||||
|
_filters = list(map(AddressFilter, filter_set[KNX_ADDRESS]))
|
||||||
|
address_filters.extend(_filters)
|
||||||
|
if (dpt := filter_set.get(CONF_TYPE)) and (
|
||||||
|
transcoder := DPTBase.parse_transcoder(dpt)
|
||||||
|
):
|
||||||
|
self._address_filter_transcoder.update(
|
||||||
|
{_filter: transcoder for _filter in _filters} # type: ignore[misc]
|
||||||
|
)
|
||||||
|
|
||||||
return self.xknx.telegram_queue.register_telegram_received_cb(
|
return self.xknx.telegram_queue.register_telegram_received_cb(
|
||||||
self.telegram_received_cb,
|
self.telegram_received_cb,
|
||||||
address_filters=address_filters,
|
address_filters=address_filters,
|
||||||
|
@ -374,7 +430,7 @@ class KNXModule:
|
||||||
async def service_event_register_modify(self, call: ServiceCall) -> None:
|
async def service_event_register_modify(self, call: ServiceCall) -> None:
|
||||||
"""Service for adding or removing a GroupAddress to the knx_event filter."""
|
"""Service for adding or removing a GroupAddress to the knx_event filter."""
|
||||||
attr_address = call.data[KNX_ADDRESS]
|
attr_address = call.data[KNX_ADDRESS]
|
||||||
group_addresses = map(parse_device_group_address, attr_address)
|
group_addresses = list(map(parse_device_group_address, attr_address))
|
||||||
|
|
||||||
if call.data.get(SERVICE_KNX_ATTR_REMOVE):
|
if call.data.get(SERVICE_KNX_ATTR_REMOVE):
|
||||||
for group_address in group_addresses:
|
for group_address in group_addresses:
|
||||||
|
@ -385,8 +441,16 @@ class KNXModule:
|
||||||
"Service event_register could not remove event for '%s'",
|
"Service event_register could not remove event for '%s'",
|
||||||
str(group_address),
|
str(group_address),
|
||||||
)
|
)
|
||||||
|
if group_address in self._group_address_transcoder:
|
||||||
|
del self._group_address_transcoder[group_address]
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if (dpt := call.data.get(CONF_TYPE)) and (
|
||||||
|
transcoder := DPTBase.parse_transcoder(dpt)
|
||||||
|
):
|
||||||
|
self._group_address_transcoder.update(
|
||||||
|
{_address: transcoder for _address in group_addresses} # type: ignore[misc]
|
||||||
|
)
|
||||||
for group_address in group_addresses:
|
for group_address in group_addresses:
|
||||||
if group_address in self._knx_event_callback.group_addresses:
|
if group_address in self._knx_event_callback.group_addresses:
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -24,6 +24,7 @@ from homeassistant.const import (
|
||||||
CONF_DEVICE_CLASS,
|
CONF_DEVICE_CLASS,
|
||||||
CONF_ENTITY_CATEGORY,
|
CONF_ENTITY_CATEGORY,
|
||||||
CONF_ENTITY_ID,
|
CONF_ENTITY_ID,
|
||||||
|
CONF_EVENT,
|
||||||
CONF_HOST,
|
CONF_HOST,
|
||||||
CONF_MODE,
|
CONF_MODE,
|
||||||
CONF_NAME,
|
CONF_NAME,
|
||||||
|
@ -204,6 +205,28 @@ class ConnectionSchema:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#########
|
||||||
|
# EVENT
|
||||||
|
#########
|
||||||
|
|
||||||
|
|
||||||
|
class EventSchema:
|
||||||
|
"""Voluptuous schema for KNX events."""
|
||||||
|
|
||||||
|
KNX_EVENT_FILTER_SCHEMA = vol.Schema(
|
||||||
|
{
|
||||||
|
vol.Required(KNX_ADDRESS): vol.All(cv.ensure_list, [cv.string]),
|
||||||
|
vol.Optional(CONF_TYPE): sensor_type_validator,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
SCHEMA = {
|
||||||
|
vol.Optional(CONF_EVENT, default=[]): vol.All(
|
||||||
|
cv.ensure_list, [KNX_EVENT_FILTER_SCHEMA]
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#############
|
#############
|
||||||
# PLATFORMS
|
# PLATFORMS
|
||||||
#############
|
#############
|
||||||
|
|
|
@ -45,6 +45,13 @@ event_register:
|
||||||
example: "1/1/0"
|
example: "1/1/0"
|
||||||
selector:
|
selector:
|
||||||
object:
|
object:
|
||||||
|
type:
|
||||||
|
name: "Value type"
|
||||||
|
description: "If set, the payload will be decoded as given DPT in the event data `value` key. Knx sensor types are valid values (see https://www.home-assistant.io/integrations/sensor.knx)."
|
||||||
|
required: false
|
||||||
|
example: "2byte_float"
|
||||||
|
selector:
|
||||||
|
text:
|
||||||
remove:
|
remove:
|
||||||
name: "Remove event registration"
|
name: "Remove event registration"
|
||||||
description: "If `True` the group address(es) will be removed."
|
description: "If `True` the group address(es) will be removed."
|
||||||
|
|
|
@ -1,6 +1,11 @@
|
||||||
"""Test KNX events."""
|
"""Test KNX events."""
|
||||||
|
|
||||||
from homeassistant.components.knx import CONF_KNX_EVENT_FILTER
|
from homeassistant.components.knx import (
|
||||||
|
CONF_EVENT,
|
||||||
|
CONF_KNX_EVENT_FILTER,
|
||||||
|
CONF_TYPE,
|
||||||
|
KNX_ADDRESS,
|
||||||
|
)
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import HomeAssistant
|
||||||
|
|
||||||
from .conftest import KNXTestKit
|
from .conftest import KNXTestKit
|
||||||
|
@ -9,7 +14,7 @@ from tests.common import async_capture_events
|
||||||
|
|
||||||
|
|
||||||
async def test_knx_event(hass: HomeAssistant, knx: KNXTestKit):
|
async def test_knx_event(hass: HomeAssistant, knx: KNXTestKit):
|
||||||
"""Test `knx_event` event."""
|
"""Test the `knx_event` event."""
|
||||||
test_group_a = "0/4/*"
|
test_group_a = "0/4/*"
|
||||||
test_address_a_1 = "0/4/0"
|
test_address_a_1 = "0/4/0"
|
||||||
test_address_a_2 = "0/4/100"
|
test_address_a_2 = "0/4/100"
|
||||||
|
@ -20,13 +25,15 @@ async def test_knx_event(hass: HomeAssistant, knx: KNXTestKit):
|
||||||
test_address_c_1 = "2/6/4"
|
test_address_c_1 = "2/6/4"
|
||||||
test_address_c_2 = "2/6/5"
|
test_address_c_2 = "2/6/5"
|
||||||
test_address_d = "5/4/3"
|
test_address_d = "5/4/3"
|
||||||
|
test_address_e = "6/4/3"
|
||||||
events = async_capture_events(hass, "knx_event")
|
events = async_capture_events(hass, "knx_event")
|
||||||
|
|
||||||
async def test_event_data(address, payload):
|
async def test_event_data(address, payload, value=None):
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert len(events) == 1
|
assert len(events) == 1
|
||||||
event = events.pop()
|
event = events.pop()
|
||||||
assert event.data["data"] == payload
|
assert event.data["data"] == payload
|
||||||
|
assert event.data["value"] == value
|
||||||
assert event.data["direction"] == "Incoming"
|
assert event.data["direction"] == "Incoming"
|
||||||
assert event.data["destination"] == address
|
assert event.data["destination"] == address
|
||||||
if payload is None:
|
if payload is None:
|
||||||
|
@ -40,12 +47,24 @@ async def test_knx_event(hass: HomeAssistant, knx: KNXTestKit):
|
||||||
|
|
||||||
await knx.setup_integration(
|
await knx.setup_integration(
|
||||||
{
|
{
|
||||||
CONF_KNX_EVENT_FILTER: [
|
CONF_EVENT: [
|
||||||
test_group_a,
|
{
|
||||||
test_group_b,
|
KNX_ADDRESS: [
|
||||||
test_group_c,
|
test_group_a,
|
||||||
test_address_d,
|
test_group_b,
|
||||||
]
|
],
|
||||||
|
CONF_TYPE: "2byte_unsigned",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
KNX_ADDRESS: test_group_c,
|
||||||
|
CONF_TYPE: "2byte_float",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
KNX_ADDRESS: [test_address_d],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
# test legacy `event_filter` config
|
||||||
|
CONF_KNX_EVENT_FILTER: [test_address_e],
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -54,28 +73,35 @@ async def test_knx_event(hass: HomeAssistant, knx: KNXTestKit):
|
||||||
assert len(events) == 0
|
assert len(events) == 0
|
||||||
|
|
||||||
# receive telegrams for group addresses matching the filter
|
# receive telegrams for group addresses matching the filter
|
||||||
await knx.receive_write(test_address_a_1, True)
|
await knx.receive_write(test_address_a_1, (0x03, 0x2F))
|
||||||
await test_event_data(test_address_a_1, True)
|
await test_event_data(test_address_a_1, (0x03, 0x2F), value=815)
|
||||||
|
|
||||||
await knx.receive_response(test_address_a_2, False)
|
await knx.receive_response(test_address_a_2, (0x12, 0x67))
|
||||||
await test_event_data(test_address_a_2, False)
|
await test_event_data(test_address_a_2, (0x12, 0x67), value=4711)
|
||||||
|
|
||||||
await knx.receive_write(test_address_b_1, (1,))
|
await knx.receive_write(test_address_b_1, (0, 0))
|
||||||
await test_event_data(test_address_b_1, (1,))
|
await test_event_data(test_address_b_1, (0, 0), value=0)
|
||||||
|
|
||||||
await knx.receive_response(test_address_b_2, (255,))
|
await knx.receive_response(test_address_b_2, (255, 255))
|
||||||
await test_event_data(test_address_b_2, (255,))
|
await test_event_data(test_address_b_2, (255, 255), value=65535)
|
||||||
|
|
||||||
await knx.receive_write(test_address_c_1, (89, 43, 34, 11))
|
await knx.receive_write(test_address_c_1, (0x06, 0xA0))
|
||||||
await test_event_data(test_address_c_1, (89, 43, 34, 11))
|
await test_event_data(test_address_c_1, (0x06, 0xA0), value=16.96)
|
||||||
|
|
||||||
await knx.receive_response(test_address_c_2, (255, 255, 255, 255))
|
await knx.receive_response(test_address_c_2, (0x8A, 0x24))
|
||||||
await test_event_data(test_address_c_2, (255, 255, 255, 255))
|
await test_event_data(test_address_c_2, (0x8A, 0x24), value=-30.0)
|
||||||
|
|
||||||
await knx.receive_read(test_address_d)
|
await knx.receive_read(test_address_d)
|
||||||
await test_event_data(test_address_d, None)
|
await test_event_data(test_address_d, None)
|
||||||
|
|
||||||
# receive telegrams for group addresses not matching the filter
|
await knx.receive_write(test_address_d, True)
|
||||||
|
await test_event_data(test_address_d, True)
|
||||||
|
|
||||||
|
# test legacy `event_filter` config
|
||||||
|
await knx.receive_write(test_address_e, (89, 43, 34, 11))
|
||||||
|
await test_event_data(test_address_e, (89, 43, 34, 11))
|
||||||
|
|
||||||
|
# receive telegrams for group addresses not matching any filter
|
||||||
await knx.receive_write("0/5/0", True)
|
await knx.receive_write("0/5/0", True)
|
||||||
await knx.receive_write("1/7/0", True)
|
await knx.receive_write("1/7/0", True)
|
||||||
await knx.receive_write("2/6/6", True)
|
await knx.receive_write("2/6/6", True)
|
||||||
|
|
|
@ -86,14 +86,19 @@ async def test_event_register(hass: HomeAssistant, knx: KNXTestKit):
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert len(events) == 0
|
assert len(events) == 0
|
||||||
|
|
||||||
# register event
|
# register event with `type`
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"knx", "event_register", {"address": test_address}, blocking=True
|
"knx",
|
||||||
|
"event_register",
|
||||||
|
{"address": test_address, "type": "2byte_unsigned"},
|
||||||
|
blocking=True,
|
||||||
)
|
)
|
||||||
await knx.receive_write(test_address, True)
|
await knx.receive_write(test_address, (0x04, 0xD2))
|
||||||
await knx.receive_write(test_address, False)
|
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert len(events) == 2
|
assert len(events) == 1
|
||||||
|
typed_event = events.pop()
|
||||||
|
assert typed_event.data["data"] == (0x04, 0xD2)
|
||||||
|
assert typed_event.data["value"] == 1234
|
||||||
|
|
||||||
# remove event registration - no event added
|
# remove event registration - no event added
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
|
@ -104,7 +109,22 @@ async def test_event_register(hass: HomeAssistant, knx: KNXTestKit):
|
||||||
)
|
)
|
||||||
await knx.receive_write(test_address, True)
|
await knx.receive_write(test_address, True)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
assert len(events) == 0
|
||||||
|
|
||||||
|
# register event without `type`
|
||||||
|
await hass.services.async_call(
|
||||||
|
"knx", "event_register", {"address": test_address}, blocking=True
|
||||||
|
)
|
||||||
|
await knx.receive_write(test_address, True)
|
||||||
|
await knx.receive_write(test_address, False)
|
||||||
|
await hass.async_block_till_done()
|
||||||
assert len(events) == 2
|
assert len(events) == 2
|
||||||
|
untyped_event_2 = events.pop()
|
||||||
|
assert untyped_event_2.data["data"] is False
|
||||||
|
assert untyped_event_2.data["value"] is None
|
||||||
|
untyped_event_1 = events.pop()
|
||||||
|
assert untyped_event_1.data["data"] is True
|
||||||
|
assert untyped_event_1.data["value"] is None
|
||||||
|
|
||||||
|
|
||||||
async def test_exposure_register(hass: HomeAssistant, knx: KNXTestKit):
|
async def test_exposure_register(hass: HomeAssistant, knx: KNXTestKit):
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue