Update KNX frontend - add Group monitor telegram detail view (#95144)

* Use TelegramDict for WS communication

* Update knx_frontend
This commit is contained in:
Matthias Alphart 2023-06-25 14:58:08 +02:00 committed by GitHub
parent f84887d5f8
commit 2ce23c17ca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 89 additions and 127 deletions

View file

@ -3,15 +3,14 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Final
from knx_frontend import entrypoint_js, is_dev_build, locate_dir
import knx_frontend as knx_panel
import voluptuous as vol
from xknx.telegram import TelegramDirection
from xknxproject.exceptions import XknxProjectException
from homeassistant.components import panel_custom, websocket_api
from homeassistant.core import HomeAssistant, callback
from .const import DOMAIN, KNXBusMonitorMessage
from .const import DOMAIN
from .telegrams import TelegramDict
if TYPE_CHECKING:
@ -30,19 +29,18 @@ async def register_panel(hass: HomeAssistant) -> None:
websocket_api.async_register_command(hass, ws_subscribe_telegram)
if DOMAIN not in hass.data.get("frontend_panels", {}):
path = locate_dir()
hass.http.register_static_path(
URL_BASE,
path,
cache_headers=not is_dev_build(),
path=knx_panel.locate_dir(),
cache_headers=knx_panel.is_prod_build,
)
await panel_custom.async_register_panel(
hass=hass,
frontend_url_path=DOMAIN,
webcomponent_name="knx-frontend",
webcomponent_name=knx_panel.webcomponent_name,
sidebar_title=DOMAIN.upper(),
sidebar_icon="mdi:bus-electric",
module_url=f"{URL_BASE}/{entrypoint_js()}",
module_url=f"{URL_BASE}/{knx_panel.entrypoint_js}",
embed_iframe=True,
require_admin=True,
)
@ -145,10 +143,7 @@ def ws_group_monitor_info(
) -> None:
"""Handle get info command of group monitor."""
knx: KNXModule = hass.data[DOMAIN]
recent_telegrams = [
_telegram_dict_to_group_monitor(telegram)
for telegram in knx.telegrams.recent_telegrams
]
recent_telegrams = [*knx.telegrams.recent_telegrams]
connection.send_result(
msg["id"],
{
@ -178,7 +173,7 @@ def ws_subscribe_telegram(
"""Forward telegram to websocket subscription."""
connection.send_event(
msg["id"],
_telegram_dict_to_group_monitor(telegram),
telegram,
)
connection.subscriptions[msg["id"]] = knx.telegrams.async_listen_telegram(
@ -186,38 +181,3 @@ def ws_subscribe_telegram(
name="KNX GroupMonitor subscription",
)
connection.send_result(msg["id"])
def _telegram_dict_to_group_monitor(telegram: TelegramDict) -> KNXBusMonitorMessage:
"""Convert a TelegramDict to a KNXBusMonitorMessage object."""
direction = (
"group_monitor_incoming"
if telegram["direction"] == TelegramDirection.INCOMING.value
else "group_monitor_outgoing"
)
_payload = telegram["payload"]
if isinstance(_payload, tuple):
payload = f"0x{bytes(_payload).hex()}"
elif isinstance(_payload, int):
payload = f"{_payload:d}"
else:
payload = ""
timestamp = telegram["timestamp"].strftime("%H:%M:%S.%f")[:-3]
if (value := telegram["value"]) is not None:
unit = telegram["unit"]
value = f"{value}{' ' + unit if unit else ''}"
return KNXBusMonitorMessage(
destination_address=telegram["destination"],
destination_text=telegram["destination_name"],
direction=direction,
payload=payload,
source_address=telegram["source"],
source_text=telegram["source_name"],
timestamp=timestamp,
type=telegram["telegramtype"],
value=value,
)