Compare commits

...
Sign in to create a new pull request.

5 commits

Author SHA1 Message Date
marvin-w
3d40828aeb Correct dependencies in manifest 2022-08-14 00:07:03 +02:00
marvin-w
1cc4b193ce Apply review suggestions 2022-08-14 00:06:31 +02:00
marvin-w
dca7e9cb76 Add custom panel to KNX integration 2022-08-14 00:06:31 +02:00
marvin-w
09d1c90e67 Test invalid payload type for websocket subscription 2022-08-12 20:34:16 +02:00
marvin-w
78524d8f83 Adds websocket commands for KNX integration 2022-08-12 20:34:16 +02:00
8 changed files with 279 additions and 2 deletions

View file

@ -84,6 +84,7 @@ from .schema import (
ga_validator,
sensor_type_validator,
)
from .websocket import register_panel
_LOGGER = logging.getLogger(__name__)
@ -214,6 +215,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
conf = dict(conf)
hass.data[DATA_KNX_CONFIG] = conf
return True
@ -294,6 +296,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
schema=SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA,
)
await register_panel(hass)
return True

View file

@ -1,9 +1,12 @@
"""Constants for the KNX integration."""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from enum import Enum
from typing import Final, TypedDict
from xknx.telegram import Telegram
from homeassistant.components.climate.const import (
PRESET_AWAY,
PRESET_COMFORT,
@ -70,6 +73,9 @@ DATA_HASS_CONFIG: Final = "knx_hass_config"
ATTR_COUNTER: Final = "counter"
ATTR_SOURCE: Final = "source"
AsyncMessageCallbackType = Callable[[Telegram], Awaitable[None]]
MessageCallbackType = Callable[[Telegram], None]
class KNXConfigEntryData(TypedDict, total=False):
"""Config entry for the KNX integration."""
@ -92,6 +98,17 @@ class KNXConfigEntryData(TypedDict, total=False):
knxkeys_password: str
class KNXBusMonitorMessage(TypedDict):
"""KNX bus monitor message."""
destination_address: str
payload: str
type: str
source_address: str
direction: str
timestamp: str
class ColorTempModes(Enum):
"""Color temperature modes for config validation."""

View file

@ -3,7 +3,9 @@
"name": "KNX",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/knx",
"requirements": ["xknx==0.22.1"],
"requirements": ["xknx==0.22.1", "xknx-custom-panel==1.6.6"],
"dependencies": ["websocket_api"],
"after_dependencies": ["panel_custom"],
"codeowners": ["@Julius2342", "@farmio", "@marvin-w"],
"quality_scale": "platinum",
"iot_class": "local_push",

View file

@ -0,0 +1,135 @@
"""KNX Websocket API."""
from __future__ import annotations
from collections.abc import Callable
from typing import Final
import voluptuous as vol
from xknx.telegram import Telegram, TelegramDirection
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
from xknx_custom_panel import get_knx_ui
from homeassistant.components import panel_custom, websocket_api
from homeassistant.core import HomeAssistant, callback
from .const import (
DOMAIN,
AsyncMessageCallbackType,
KNXBusMonitorMessage,
MessageCallbackType,
)
URL_BASE: Final = "/static_knx"
async def register_panel(hass: HomeAssistant) -> None:
"""Register the KNX Panel and Websocket API."""
websocket_api.async_register_command(hass, ws_info)
websocket_api.async_register_command(hass, ws_subscribe_telegram)
if DOMAIN not in hass.data.get("frontend_panels", {}):
hass.http.register_static_path(URL_BASE, get_knx_ui())
await panel_custom.async_register_panel(
hass=hass,
frontend_url_path=DOMAIN,
webcomponent_name="knx-panel",
sidebar_title=DOMAIN.upper(),
sidebar_icon="mdi:earth",
module_url=f"{URL_BASE}",
embed_iframe=True,
require_admin=True,
)
@websocket_api.websocket_command(
{
vol.Required("type"): "knx/info",
}
)
@callback
def ws_info(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""Handle get info command."""
xknx = hass.data[DOMAIN].xknx
connection.send_result(
msg["id"],
{
"version": xknx.version,
"connected": xknx.connection_manager.connected.is_set(),
"current_address": str(xknx.current_address),
},
)
@websocket_api.websocket_command(
{
vol.Required("type"): "knx/subscribe_telegrams",
}
)
@callback
def ws_subscribe_telegram(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""Subscribe to incoming and outgoing KNX telegrams."""
async def forward_telegrams(telegram: Telegram) -> None:
"""Forward events to websocket."""
if not isinstance(
telegram.payload, (GroupValueRead, GroupValueWrite, GroupValueResponse)
):
return
payload = (
str(telegram.payload.value)
if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse))
else ""
)
direction = (
"label.incoming"
if telegram.direction == TelegramDirection.INCOMING
else "label.outgoing"
)
bus_message: KNXBusMonitorMessage = KNXBusMonitorMessage(
destination_address=str(telegram.destination_address),
payload=payload,
type=str(telegram.payload.__class__.__name__),
source_address=str(telegram.source_address),
direction=direction,
timestamp=str(telegram.timestamp),
)
connection.send_message(
websocket_api.event_message(
msg["id"],
bus_message,
)
)
connection.subscriptions[msg["id"]] = async_subscribe_telegrams(
hass, forward_telegrams
)
connection.send_message(websocket_api.result_message(msg["id"]))
def async_subscribe_telegrams(
hass: HomeAssistant,
telegram_callback: AsyncMessageCallbackType | MessageCallbackType,
) -> Callable[[], None]:
"""Subscribe to telegram received callback."""
xknx = hass.data[DOMAIN].xknx
unregister = xknx.telegram_queue.register_telegram_received_cb(
telegram_callback, match_for_outgoing=True
)
def async_remove() -> None:
"""Remove callback."""
xknx.telegram_queue.unregister_telegram_received_cb(unregister)
return async_remove

View file

@ -2484,6 +2484,7 @@ xiaomi-ble==0.9.0
# homeassistant.components.knx
xknx==0.22.1
xknx-custom-panel==1.6.6
# homeassistant.components.bluesound
# homeassistant.components.fritz

View file

@ -1682,6 +1682,7 @@ xiaomi-ble==0.9.0
# homeassistant.components.knx
xknx==0.22.1
xknx-custom-panel==1.6.6
# homeassistant.components.bluesound
# homeassistant.components.fritz

View file

@ -11,7 +11,13 @@ from xknx.dpt import DPTArray, DPTBinary
from xknx.io import DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT
from xknx.telegram import Telegram, TelegramDirection
from xknx.telegram.address import GroupAddress, IndividualAddress
from xknx.telegram.apci import APCI, GroupValueRead, GroupValueResponse, GroupValueWrite
from xknx.telegram.apci import (
APCI,
GroupValueRead,
GroupValueResponse,
GroupValueWrite,
IndividualAddressRead,
)
from homeassistant.components.knx.const import (
CONF_KNX_AUTOMATIC,
@ -188,6 +194,19 @@ class KNXTestKit:
await self.xknx.telegrams.join()
await self.hass.async_block_till_done()
async def receive_individual_address_read(self):
"""Inject incoming IndividualAddressRead telegram."""
self.xknx.telegrams.put_nowait(
Telegram(
destination_address=IndividualAddress(self.INDIVIDUAL_ADDRESS),
direction=TelegramDirection.INCOMING,
payload=IndividualAddressRead(),
source_address=IndividualAddress("1.3.5"),
)
)
await self.xknx.telegrams.join()
await self.hass.async_block_till_done()
async def receive_read(
self,
group_address: str,

View file

@ -0,0 +1,98 @@
"""KNX Websocket Tests."""
from homeassistant.components.knx import KNX_ADDRESS, SwitchSchema
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from .conftest import KNXTestKit
async def test_knx_info_command(hass: HomeAssistant, knx: KNXTestKit, hass_ws_client):
"""Test knx/info command."""
await knx.setup_integration({})
client = await hass_ws_client(hass)
await client.send_json({"id": 6, "type": "knx/info"})
res = await client.receive_json()
assert res["success"], res
assert res["result"]["version"] is not None
assert res["result"]["connected"]
assert res["result"]["current_address"] == "0.0.0"
async def test_knx_subscribe_telegrams_command(
hass: HomeAssistant, knx: KNXTestKit, hass_ws_client
):
"""Test knx/subscribe_telegrams command."""
await knx.setup_integration(
{
SwitchSchema.PLATFORM: {
CONF_NAME: "test",
KNX_ADDRESS: "1/2/4",
}
}
)
assert len(hass.states.async_all()) == 1
client = await hass_ws_client(hass)
await client.send_json({"id": 6, "type": "knx/subscribe_telegrams"})
res = await client.receive_json()
assert res["success"], res
# send incoming events
await knx.receive_read("1/2/3")
await knx.receive_write("1/3/4", True)
await knx.receive_write("1/3/4", False)
await knx.receive_individual_address_read()
await knx.receive_write("1/3/8", (0x34, 0x45))
# send outgoing events
await hass.services.async_call(
"switch", "turn_on", {"entity_id": "switch.test"}, blocking=True
)
await knx.assert_write("1/2/4", True)
# receive events
res = await client.receive_json()
assert res["event"]["destination_address"] == "1/2/3"
assert res["event"]["payload"] == ""
assert res["event"]["type"] == "GroupValueRead"
assert res["event"]["source_address"] == "1.2.3"
assert res["event"]["direction"] == "label.incoming"
assert res["event"]["timestamp"] is not None
res = await client.receive_json()
assert res["event"]["destination_address"] == "1/3/4"
assert res["event"]["payload"] == '<DPTBinary value="True" />'
assert res["event"]["type"] == "GroupValueWrite"
assert res["event"]["source_address"] == "1.2.3"
assert res["event"]["direction"] == "label.incoming"
assert res["event"]["timestamp"] is not None
res = await client.receive_json()
assert res["event"]["destination_address"] == "1/3/4"
assert res["event"]["payload"] == '<DPTBinary value="False" />'
assert res["event"]["type"] == "GroupValueWrite"
assert res["event"]["source_address"] == "1.2.3"
assert res["event"]["direction"] == "label.incoming"
assert res["event"]["timestamp"] is not None
res = await client.receive_json()
assert res["event"]["destination_address"] == "1/3/8"
assert res["event"]["payload"] == '<DPTArray value="[0x34,0x45]" />'
assert res["event"]["type"] == "GroupValueWrite"
assert res["event"]["source_address"] == "1.2.3"
assert res["event"]["direction"] == "label.incoming"
assert res["event"]["timestamp"] is not None
res = await client.receive_json()
assert res["event"]["destination_address"] == "1/2/4"
assert res["event"]["payload"] == '<DPTBinary value="True" />'
assert res["event"]["type"] == "GroupValueWrite"
assert (
res["event"]["source_address"] == "0.0.0"
) # needs to be the IA currently connected to
assert res["event"]["direction"] == "label.outgoing"
assert res["event"]["timestamp"] is not None