Compare commits

...
Sign in to create a new pull request.

5 commits

Author SHA1 Message Date
marvin-w
3d40828aeb Correct dependencies in manifest 2022-08-14 00:07:03 +02:00
marvin-w
1cc4b193ce Apply review suggestions 2022-08-14 00:06:31 +02:00
marvin-w
dca7e9cb76 Add custom panel to KNX integration 2022-08-14 00:06:31 +02:00
marvin-w
09d1c90e67 Test invalid payload type for websocket subscription 2022-08-12 20:34:16 +02:00
marvin-w
78524d8f83 Adds websocket commands for KNX integration 2022-08-12 20:34:16 +02:00
8 changed files with 279 additions and 2 deletions

View file

@ -84,6 +84,7 @@ from .schema import (
ga_validator, ga_validator,
sensor_type_validator, sensor_type_validator,
) )
from .websocket import register_panel
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -214,6 +215,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
conf = dict(conf) conf = dict(conf)
hass.data[DATA_KNX_CONFIG] = conf hass.data[DATA_KNX_CONFIG] = conf
return True return True
@ -294,6 +296,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
schema=SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA, schema=SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA,
) )
await register_panel(hass)
return True return True

View file

@ -1,9 +1,12 @@
"""Constants for the KNX integration.""" """Constants for the KNX integration."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Awaitable, Callable
from enum import Enum from enum import Enum
from typing import Final, TypedDict from typing import Final, TypedDict
from xknx.telegram import Telegram
from homeassistant.components.climate.const import ( from homeassistant.components.climate.const import (
PRESET_AWAY, PRESET_AWAY,
PRESET_COMFORT, PRESET_COMFORT,
@ -70,6 +73,9 @@ DATA_HASS_CONFIG: Final = "knx_hass_config"
ATTR_COUNTER: Final = "counter" ATTR_COUNTER: Final = "counter"
ATTR_SOURCE: Final = "source" ATTR_SOURCE: Final = "source"
AsyncMessageCallbackType = Callable[[Telegram], Awaitable[None]]
MessageCallbackType = Callable[[Telegram], None]
class KNXConfigEntryData(TypedDict, total=False): class KNXConfigEntryData(TypedDict, total=False):
"""Config entry for the KNX integration.""" """Config entry for the KNX integration."""
@ -92,6 +98,17 @@ class KNXConfigEntryData(TypedDict, total=False):
knxkeys_password: str knxkeys_password: str
class KNXBusMonitorMessage(TypedDict):
"""KNX bus monitor message."""
destination_address: str
payload: str
type: str
source_address: str
direction: str
timestamp: str
class ColorTempModes(Enum): class ColorTempModes(Enum):
"""Color temperature modes for config validation.""" """Color temperature modes for config validation."""

View file

@ -3,7 +3,9 @@
"name": "KNX", "name": "KNX",
"config_flow": true, "config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/knx", "documentation": "https://www.home-assistant.io/integrations/knx",
"requirements": ["xknx==0.22.1"], "requirements": ["xknx==0.22.1", "xknx-custom-panel==1.6.6"],
"dependencies": ["websocket_api"],
"after_dependencies": ["panel_custom"],
"codeowners": ["@Julius2342", "@farmio", "@marvin-w"], "codeowners": ["@Julius2342", "@farmio", "@marvin-w"],
"quality_scale": "platinum", "quality_scale": "platinum",
"iot_class": "local_push", "iot_class": "local_push",

View file

@ -0,0 +1,135 @@
"""KNX Websocket API."""
from __future__ import annotations
from collections.abc import Callable
from typing import Final
import voluptuous as vol
from xknx.telegram import Telegram, TelegramDirection
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
from xknx_custom_panel import get_knx_ui
from homeassistant.components import panel_custom, websocket_api
from homeassistant.core import HomeAssistant, callback
from .const import (
DOMAIN,
AsyncMessageCallbackType,
KNXBusMonitorMessage,
MessageCallbackType,
)
URL_BASE: Final = "/static_knx"
async def register_panel(hass: HomeAssistant) -> None:
"""Register the KNX Panel and Websocket API."""
websocket_api.async_register_command(hass, ws_info)
websocket_api.async_register_command(hass, ws_subscribe_telegram)
if DOMAIN not in hass.data.get("frontend_panels", {}):
hass.http.register_static_path(URL_BASE, get_knx_ui())
await panel_custom.async_register_panel(
hass=hass,
frontend_url_path=DOMAIN,
webcomponent_name="knx-panel",
sidebar_title=DOMAIN.upper(),
sidebar_icon="mdi:earth",
module_url=f"{URL_BASE}",
embed_iframe=True,
require_admin=True,
)
@websocket_api.websocket_command(
{
vol.Required("type"): "knx/info",
}
)
@callback
def ws_info(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""Handle get info command."""
xknx = hass.data[DOMAIN].xknx
connection.send_result(
msg["id"],
{
"version": xknx.version,
"connected": xknx.connection_manager.connected.is_set(),
"current_address": str(xknx.current_address),
},
)
@websocket_api.websocket_command(
{
vol.Required("type"): "knx/subscribe_telegrams",
}
)
@callback
def ws_subscribe_telegram(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""Subscribe to incoming and outgoing KNX telegrams."""
async def forward_telegrams(telegram: Telegram) -> None:
"""Forward events to websocket."""
if not isinstance(
telegram.payload, (GroupValueRead, GroupValueWrite, GroupValueResponse)
):
return
payload = (
str(telegram.payload.value)
if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse))
else ""
)
direction = (
"label.incoming"
if telegram.direction == TelegramDirection.INCOMING
else "label.outgoing"
)
bus_message: KNXBusMonitorMessage = KNXBusMonitorMessage(
destination_address=str(telegram.destination_address),
payload=payload,
type=str(telegram.payload.__class__.__name__),
source_address=str(telegram.source_address),
direction=direction,
timestamp=str(telegram.timestamp),
)
connection.send_message(
websocket_api.event_message(
msg["id"],
bus_message,
)
)
connection.subscriptions[msg["id"]] = async_subscribe_telegrams(
hass, forward_telegrams
)
connection.send_message(websocket_api.result_message(msg["id"]))
def async_subscribe_telegrams(
hass: HomeAssistant,
telegram_callback: AsyncMessageCallbackType | MessageCallbackType,
) -> Callable[[], None]:
"""Subscribe to telegram received callback."""
xknx = hass.data[DOMAIN].xknx
unregister = xknx.telegram_queue.register_telegram_received_cb(
telegram_callback, match_for_outgoing=True
)
def async_remove() -> None:
"""Remove callback."""
xknx.telegram_queue.unregister_telegram_received_cb(unregister)
return async_remove

View file

@ -2484,6 +2484,7 @@ xiaomi-ble==0.9.0
# homeassistant.components.knx # homeassistant.components.knx
xknx==0.22.1 xknx==0.22.1
xknx-custom-panel==1.6.6
# homeassistant.components.bluesound # homeassistant.components.bluesound
# homeassistant.components.fritz # homeassistant.components.fritz

View file

@ -1682,6 +1682,7 @@ xiaomi-ble==0.9.0
# homeassistant.components.knx # homeassistant.components.knx
xknx==0.22.1 xknx==0.22.1
xknx-custom-panel==1.6.6
# homeassistant.components.bluesound # homeassistant.components.bluesound
# homeassistant.components.fritz # homeassistant.components.fritz

View file

@ -11,7 +11,13 @@ from xknx.dpt import DPTArray, DPTBinary
from xknx.io import DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT from xknx.io import DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT
from xknx.telegram import Telegram, TelegramDirection from xknx.telegram import Telegram, TelegramDirection
from xknx.telegram.address import GroupAddress, IndividualAddress from xknx.telegram.address import GroupAddress, IndividualAddress
from xknx.telegram.apci import APCI, GroupValueRead, GroupValueResponse, GroupValueWrite from xknx.telegram.apci import (
APCI,
GroupValueRead,
GroupValueResponse,
GroupValueWrite,
IndividualAddressRead,
)
from homeassistant.components.knx.const import ( from homeassistant.components.knx.const import (
CONF_KNX_AUTOMATIC, CONF_KNX_AUTOMATIC,
@ -188,6 +194,19 @@ class KNXTestKit:
await self.xknx.telegrams.join() await self.xknx.telegrams.join()
await self.hass.async_block_till_done() await self.hass.async_block_till_done()
async def receive_individual_address_read(self):
"""Inject incoming IndividualAddressRead telegram."""
self.xknx.telegrams.put_nowait(
Telegram(
destination_address=IndividualAddress(self.INDIVIDUAL_ADDRESS),
direction=TelegramDirection.INCOMING,
payload=IndividualAddressRead(),
source_address=IndividualAddress("1.3.5"),
)
)
await self.xknx.telegrams.join()
await self.hass.async_block_till_done()
async def receive_read( async def receive_read(
self, self,
group_address: str, group_address: str,

View file

@ -0,0 +1,98 @@
"""KNX Websocket Tests."""
from homeassistant.components.knx import KNX_ADDRESS, SwitchSchema
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from .conftest import KNXTestKit
async def test_knx_info_command(hass: HomeAssistant, knx: KNXTestKit, hass_ws_client):
"""Test knx/info command."""
await knx.setup_integration({})
client = await hass_ws_client(hass)
await client.send_json({"id": 6, "type": "knx/info"})
res = await client.receive_json()
assert res["success"], res
assert res["result"]["version"] is not None
assert res["result"]["connected"]
assert res["result"]["current_address"] == "0.0.0"
async def test_knx_subscribe_telegrams_command(
hass: HomeAssistant, knx: KNXTestKit, hass_ws_client
):
"""Test knx/subscribe_telegrams command."""
await knx.setup_integration(
{
SwitchSchema.PLATFORM: {
CONF_NAME: "test",
KNX_ADDRESS: "1/2/4",
}
}
)
assert len(hass.states.async_all()) == 1
client = await hass_ws_client(hass)
await client.send_json({"id": 6, "type": "knx/subscribe_telegrams"})
res = await client.receive_json()
assert res["success"], res
# send incoming events
await knx.receive_read("1/2/3")
await knx.receive_write("1/3/4", True)
await knx.receive_write("1/3/4", False)
await knx.receive_individual_address_read()
await knx.receive_write("1/3/8", (0x34, 0x45))
# send outgoing events
await hass.services.async_call(
"switch", "turn_on", {"entity_id": "switch.test"}, blocking=True
)
await knx.assert_write("1/2/4", True)
# receive events
res = await client.receive_json()
assert res["event"]["destination_address"] == "1/2/3"
assert res["event"]["payload"] == ""
assert res["event"]["type"] == "GroupValueRead"
assert res["event"]["source_address"] == "1.2.3"
assert res["event"]["direction"] == "label.incoming"
assert res["event"]["timestamp"] is not None
res = await client.receive_json()
assert res["event"]["destination_address"] == "1/3/4"
assert res["event"]["payload"] == '<DPTBinary value="True" />'
assert res["event"]["type"] == "GroupValueWrite"
assert res["event"]["source_address"] == "1.2.3"
assert res["event"]["direction"] == "label.incoming"
assert res["event"]["timestamp"] is not None
res = await client.receive_json()
assert res["event"]["destination_address"] == "1/3/4"
assert res["event"]["payload"] == '<DPTBinary value="False" />'
assert res["event"]["type"] == "GroupValueWrite"
assert res["event"]["source_address"] == "1.2.3"
assert res["event"]["direction"] == "label.incoming"
assert res["event"]["timestamp"] is not None
res = await client.receive_json()
assert res["event"]["destination_address"] == "1/3/8"
assert res["event"]["payload"] == '<DPTArray value="[0x34,0x45]" />'
assert res["event"]["type"] == "GroupValueWrite"
assert res["event"]["source_address"] == "1.2.3"
assert res["event"]["direction"] == "label.incoming"
assert res["event"]["timestamp"] is not None
res = await client.receive_json()
assert res["event"]["destination_address"] == "1/2/4"
assert res["event"]["payload"] == '<DPTBinary value="True" />'
assert res["event"]["type"] == "GroupValueWrite"
assert (
res["event"]["source_address"] == "0.0.0"
) # needs to be the IA currently connected to
assert res["event"]["direction"] == "label.outgoing"
assert res["event"]["timestamp"] is not None