diff --git a/homeassistant/components/knx/__init__.py b/homeassistant/components/knx/__init__.py
index 1910227a5a4..f9c826bb140 100644
--- a/homeassistant/components/knx/__init__.py
+++ b/homeassistant/components/knx/__init__.py
@@ -84,6 +84,7 @@ from .schema import (
ga_validator,
sensor_type_validator,
)
+from .websocket import register_panel
_LOGGER = logging.getLogger(__name__)
@@ -214,6 +215,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
conf = dict(conf)
hass.data[DATA_KNX_CONFIG] = conf
+
return True
@@ -294,6 +296,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
schema=SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA,
)
+ await register_panel(hass)
+
return True
diff --git a/homeassistant/components/knx/const.py b/homeassistant/components/knx/const.py
index df8f0de3216..91a579e06b9 100644
--- a/homeassistant/components/knx/const.py
+++ b/homeassistant/components/knx/const.py
@@ -1,9 +1,12 @@
"""Constants for the KNX integration."""
from __future__ import annotations
+from collections.abc import Awaitable, Callable
from enum import Enum
from typing import Final, TypedDict
+from xknx.telegram import Telegram
+
from homeassistant.components.climate.const import (
PRESET_AWAY,
PRESET_COMFORT,
@@ -70,6 +73,9 @@ DATA_HASS_CONFIG: Final = "knx_hass_config"
ATTR_COUNTER: Final = "counter"
ATTR_SOURCE: Final = "source"
+AsyncMessageCallbackType = Callable[[Telegram], Awaitable[None]]
+MessageCallbackType = Callable[[Telegram], None]
+
class KNXConfigEntryData(TypedDict, total=False):
"""Config entry for the KNX integration."""
@@ -92,6 +98,17 @@ class KNXConfigEntryData(TypedDict, total=False):
knxkeys_password: str
+class KNXBusMonitorMessage(TypedDict):
+ """KNX bus monitor message."""
+
+ destination_address: str
+ payload: str
+ type: str
+ source_address: str
+ direction: str
+ timestamp: str
+
+
class ColorTempModes(Enum):
"""Color temperature modes for config validation."""
diff --git a/homeassistant/components/knx/manifest.json b/homeassistant/components/knx/manifest.json
index 266eceaacee..f0db426c249 100644
--- a/homeassistant/components/knx/manifest.json
+++ b/homeassistant/components/knx/manifest.json
@@ -3,7 +3,9 @@
"name": "KNX",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/knx",
- "requirements": ["xknx==0.22.1"],
+ "requirements": ["xknx==0.22.1", "xknx-custom-panel==1.6.6"],
+ "dependencies": ["websocket_api"],
+ "after_dependencies": ["panel_custom"],
"codeowners": ["@Julius2342", "@farmio", "@marvin-w"],
"quality_scale": "platinum",
"iot_class": "local_push",
diff --git a/homeassistant/components/knx/websocket.py b/homeassistant/components/knx/websocket.py
new file mode 100644
index 00000000000..3cb87698305
--- /dev/null
+++ b/homeassistant/components/knx/websocket.py
@@ -0,0 +1,135 @@
+"""KNX Websocket API."""
+from __future__ import annotations
+
+from collections.abc import Callable
+from typing import Final
+
+import voluptuous as vol
+from xknx.telegram import Telegram, TelegramDirection
+from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
+from xknx_custom_panel import get_knx_ui
+
+from homeassistant.components import panel_custom, websocket_api
+from homeassistant.core import HomeAssistant, callback
+
+from .const import (
+ DOMAIN,
+ AsyncMessageCallbackType,
+ KNXBusMonitorMessage,
+ MessageCallbackType,
+)
+
+URL_BASE: Final = "/static_knx"
+
+
+async def register_panel(hass: HomeAssistant) -> None:
+ """Register the KNX Panel and Websocket API."""
+ websocket_api.async_register_command(hass, ws_info)
+ websocket_api.async_register_command(hass, ws_subscribe_telegram)
+
+ if DOMAIN not in hass.data.get("frontend_panels", {}):
+ hass.http.register_static_path(URL_BASE, get_knx_ui())
+ await panel_custom.async_register_panel(
+ hass=hass,
+ frontend_url_path=DOMAIN,
+ webcomponent_name="knx-panel",
+ sidebar_title=DOMAIN.upper(),
+ sidebar_icon="mdi:earth",
+ module_url=f"{URL_BASE}",
+ embed_iframe=True,
+ require_admin=True,
+ )
+
+
+@websocket_api.websocket_command(
+ {
+ vol.Required("type"): "knx/info",
+ }
+)
+@callback
+def ws_info(
+ hass: HomeAssistant,
+ connection: websocket_api.ActiveConnection,
+ msg: dict,
+) -> None:
+ """Handle get info command."""
+ xknx = hass.data[DOMAIN].xknx
+ connection.send_result(
+ msg["id"],
+ {
+ "version": xknx.version,
+ "connected": xknx.connection_manager.connected.is_set(),
+ "current_address": str(xknx.current_address),
+ },
+ )
+
+
+@websocket_api.websocket_command(
+ {
+ vol.Required("type"): "knx/subscribe_telegrams",
+ }
+)
+@callback
+def ws_subscribe_telegram(
+ hass: HomeAssistant,
+ connection: websocket_api.ActiveConnection,
+ msg: dict,
+) -> None:
+ """Subscribe to incoming and outgoing KNX telegrams."""
+
+ async def forward_telegrams(telegram: Telegram) -> None:
+ """Forward events to websocket."""
+ if not isinstance(
+ telegram.payload, (GroupValueRead, GroupValueWrite, GroupValueResponse)
+ ):
+ return
+
+ payload = (
+ str(telegram.payload.value)
+ if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse))
+ else ""
+ )
+ direction = (
+ "label.incoming"
+ if telegram.direction == TelegramDirection.INCOMING
+ else "label.outgoing"
+ )
+ bus_message: KNXBusMonitorMessage = KNXBusMonitorMessage(
+ destination_address=str(telegram.destination_address),
+ payload=payload,
+ type=str(telegram.payload.__class__.__name__),
+ source_address=str(telegram.source_address),
+ direction=direction,
+ timestamp=str(telegram.timestamp),
+ )
+
+ connection.send_message(
+ websocket_api.event_message(
+ msg["id"],
+ bus_message,
+ )
+ )
+
+ connection.subscriptions[msg["id"]] = async_subscribe_telegrams(
+ hass, forward_telegrams
+ )
+
+ connection.send_message(websocket_api.result_message(msg["id"]))
+
+
+def async_subscribe_telegrams(
+ hass: HomeAssistant,
+ telegram_callback: AsyncMessageCallbackType | MessageCallbackType,
+) -> Callable[[], None]:
+ """Subscribe to telegram received callback."""
+ xknx = hass.data[DOMAIN].xknx
+
+ unregister = xknx.telegram_queue.register_telegram_received_cb(
+ telegram_callback, match_for_outgoing=True
+ )
+
+ def async_remove() -> None:
+ """Remove callback."""
+ xknx.telegram_queue.unregister_telegram_received_cb(unregister)
+
+ return async_remove
diff --git a/requirements_all.txt b/requirements_all.txt
index 35a231a8cd4..9c939eddd63 100644
--- a/requirements_all.txt
+++ b/requirements_all.txt
@@ -2484,6 +2484,7 @@ xiaomi-ble==0.9.0
# homeassistant.components.knx
xknx==0.22.1
+xknx-custom-panel==1.6.6
# homeassistant.components.bluesound
# homeassistant.components.fritz
diff --git a/requirements_test_all.txt b/requirements_test_all.txt
index 011c93291e8..e73a78b21b3 100644
--- a/requirements_test_all.txt
+++ b/requirements_test_all.txt
@@ -1682,6 +1682,7 @@ xiaomi-ble==0.9.0
# homeassistant.components.knx
xknx==0.22.1
+xknx-custom-panel==1.6.6
# homeassistant.components.bluesound
# homeassistant.components.fritz
diff --git a/tests/components/knx/conftest.py b/tests/components/knx/conftest.py
index e5cf18b0c3c..f5c322341ea 100644
--- a/tests/components/knx/conftest.py
+++ b/tests/components/knx/conftest.py
@@ -11,7 +11,13 @@ from xknx.dpt import DPTArray, DPTBinary
from xknx.io import DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT
from xknx.telegram import Telegram, TelegramDirection
from xknx.telegram.address import GroupAddress, IndividualAddress
-from xknx.telegram.apci import APCI, GroupValueRead, GroupValueResponse, GroupValueWrite
+from xknx.telegram.apci import (
+ APCI,
+ GroupValueRead,
+ GroupValueResponse,
+ GroupValueWrite,
+ IndividualAddressRead,
+)
from homeassistant.components.knx.const import (
CONF_KNX_AUTOMATIC,
@@ -188,6 +194,19 @@ class KNXTestKit:
await self.xknx.telegrams.join()
await self.hass.async_block_till_done()
+ async def receive_individual_address_read(self):
+ """Inject incoming IndividualAddressRead telegram."""
+ self.xknx.telegrams.put_nowait(
+ Telegram(
+ destination_address=IndividualAddress(self.INDIVIDUAL_ADDRESS),
+ direction=TelegramDirection.INCOMING,
+ payload=IndividualAddressRead(),
+ source_address=IndividualAddress("1.3.5"),
+ )
+ )
+ await self.xknx.telegrams.join()
+ await self.hass.async_block_till_done()
+
async def receive_read(
self,
group_address: str,
diff --git a/tests/components/knx/test_websocket.py b/tests/components/knx/test_websocket.py
new file mode 100644
index 00000000000..eb8bd6d26f3
--- /dev/null
+++ b/tests/components/knx/test_websocket.py
@@ -0,0 +1,98 @@
+"""KNX Websocket Tests."""
+from homeassistant.components.knx import KNX_ADDRESS, SwitchSchema
+from homeassistant.const import CONF_NAME
+from homeassistant.core import HomeAssistant
+
+from .conftest import KNXTestKit
+
+
+async def test_knx_info_command(hass: HomeAssistant, knx: KNXTestKit, hass_ws_client):
+ """Test knx/info command."""
+ await knx.setup_integration({})
+
+ client = await hass_ws_client(hass)
+
+ await client.send_json({"id": 6, "type": "knx/info"})
+
+ res = await client.receive_json()
+ assert res["success"], res
+ assert res["result"]["version"] is not None
+ assert res["result"]["connected"]
+ assert res["result"]["current_address"] == "0.0.0"
+
+
+async def test_knx_subscribe_telegrams_command(
+ hass: HomeAssistant, knx: KNXTestKit, hass_ws_client
+):
+ """Test knx/subscribe_telegrams command."""
+ await knx.setup_integration(
+ {
+ SwitchSchema.PLATFORM: {
+ CONF_NAME: "test",
+ KNX_ADDRESS: "1/2/4",
+ }
+ }
+ )
+ assert len(hass.states.async_all()) == 1
+
+ client = await hass_ws_client(hass)
+
+ await client.send_json({"id": 6, "type": "knx/subscribe_telegrams"})
+
+ res = await client.receive_json()
+ assert res["success"], res
+
+ # send incoming events
+ await knx.receive_read("1/2/3")
+ await knx.receive_write("1/3/4", True)
+ await knx.receive_write("1/3/4", False)
+ await knx.receive_individual_address_read()
+ await knx.receive_write("1/3/8", (0x34, 0x45))
+ # send outgoing events
+ await hass.services.async_call(
+ "switch", "turn_on", {"entity_id": "switch.test"}, blocking=True
+ )
+ await knx.assert_write("1/2/4", True)
+
+ # receive events
+ res = await client.receive_json()
+ assert res["event"]["destination_address"] == "1/2/3"
+ assert res["event"]["payload"] == ""
+ assert res["event"]["type"] == "GroupValueRead"
+ assert res["event"]["source_address"] == "1.2.3"
+ assert res["event"]["direction"] == "label.incoming"
+ assert res["event"]["timestamp"] is not None
+
+ res = await client.receive_json()
+ assert res["event"]["destination_address"] == "1/3/4"
+ assert res["event"]["payload"] == ''
+ assert res["event"]["type"] == "GroupValueWrite"
+ assert res["event"]["source_address"] == "1.2.3"
+ assert res["event"]["direction"] == "label.incoming"
+ assert res["event"]["timestamp"] is not None
+
+ res = await client.receive_json()
+ assert res["event"]["destination_address"] == "1/3/4"
+ assert res["event"]["payload"] == ''
+ assert res["event"]["type"] == "GroupValueWrite"
+ assert res["event"]["source_address"] == "1.2.3"
+ assert res["event"]["direction"] == "label.incoming"
+ assert res["event"]["timestamp"] is not None
+
+ res = await client.receive_json()
+ assert res["event"]["destination_address"] == "1/3/8"
+ assert res["event"]["payload"] == ''
+ assert res["event"]["type"] == "GroupValueWrite"
+ assert res["event"]["source_address"] == "1.2.3"
+ assert res["event"]["direction"] == "label.incoming"
+ assert res["event"]["timestamp"] is not None
+
+ res = await client.receive_json()
+ assert res["event"]["destination_address"] == "1/2/4"
+ assert res["event"]["payload"] == ''
+ assert res["event"]["type"] == "GroupValueWrite"
+ assert (
+ res["event"]["source_address"] == "0.0.0"
+ ) # needs to be the IA currently connected to
+ assert res["event"]["direction"] == "label.outgoing"
+ assert res["event"]["timestamp"] is not None