From 4517af509c3e3f0c795e1e22c7e443a2b81dad07 Mon Sep 17 00:00:00 2001 From: Matthias Alphart Date: Sun, 27 Nov 2022 23:33:12 +0100 Subject: [PATCH] Add support for KNX IP-Secure routing (#82765) * always use instance variable for new entry data - change `self._tunneling_config` to non-optional `self.new_entry_data` - always use self.new_entry_data in `finish_flow()` * support secure routing * amend current tests * use sync latency tolerance * test secure routing config flow * diagnostics redact backbone_key * test xknx library setup * check length of backbone_key * better readable key validation --- homeassistant/components/knx/__init__.py | 37 +++- homeassistant/components/knx/config_flow.py | 168 ++++++++++++------ homeassistant/components/knx/const.py | 5 + homeassistant/components/knx/diagnostics.py | 2 + homeassistant/components/knx/strings.json | 40 ++++- .../components/knx/translations/en.json | 60 +++++-- tests/components/knx/test_config_flow.py | 167 ++++++++++++++++- tests/components/knx/test_diagnostic.py | 3 + tests/components/knx/test_init.py | 28 +++ 9 files changed, 422 insertions(+), 88 deletions(-) diff --git a/homeassistant/components/knx/__init__.py b/homeassistant/components/knx/__init__.py index a486582daec..c8b9ca40bd7 100644 --- a/homeassistant/components/knx/__init__.py +++ b/homeassistant/components/knx/__init__.py @@ -51,6 +51,9 @@ from .const import ( CONF_KNX_RATE_LIMIT, CONF_KNX_ROUTE_BACK, CONF_KNX_ROUTING, + CONF_KNX_ROUTING_BACKBONE_KEY, + CONF_KNX_ROUTING_SECURE, + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE, CONF_KNX_SECURE_DEVICE_AUTHENTICATION, CONF_KNX_SECURE_USER_ID, CONF_KNX_SECURE_USER_PASSWORD, @@ -406,15 +409,15 @@ class KNXModule: auto_reconnect=True, threaded=True, ) - if _conn_type == CONF_KNX_TUNNELING_TCP_SECURE: - knxkeys_file: str | None = ( - self.hass.config.path( - STORAGE_DIR, - self.entry.data[CONF_KNX_KNXKEY_FILENAME], - ) - if self.entry.data.get(CONF_KNX_KNXKEY_FILENAME) is not None - else None + knxkeys_file: str | None = ( + self.hass.config.path( + STORAGE_DIR, + self.entry.data[CONF_KNX_KNXKEY_FILENAME], ) + if self.entry.data.get(CONF_KNX_KNXKEY_FILENAME) is not None + else None + ) + if _conn_type == CONF_KNX_TUNNELING_TCP_SECURE: return ConnectionConfig( connection_type=ConnectionType.TUNNELING_TCP_SECURE, gateway_ip=self.entry.data[CONF_HOST], @@ -431,6 +434,24 @@ class KNXModule: auto_reconnect=True, threaded=True, ) + if _conn_type == CONF_KNX_ROUTING_SECURE: + return ConnectionConfig( + connection_type=ConnectionType.ROUTING_SECURE, + individual_address=self.entry.data[CONF_KNX_INDIVIDUAL_ADDRESS], + multicast_group=self.entry.data[CONF_KNX_MCAST_GRP], + multicast_port=self.entry.data[CONF_KNX_MCAST_PORT], + local_ip=self.entry.data.get(CONF_KNX_LOCAL_IP), + secure_config=SecureConfig( + backbone_key=self.entry.data.get(CONF_KNX_ROUTING_BACKBONE_KEY), + latency_ms=self.entry.data.get( + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE + ), + knxkeys_password=self.entry.data.get(CONF_KNX_KNXKEY_PASSWORD), + knxkeys_file_path=knxkeys_file, + ), + auto_reconnect=True, + threaded=True, + ) return ConnectionConfig( auto_reconnect=True, threaded=True, diff --git a/homeassistant/components/knx/config_flow.py b/homeassistant/components/knx/config_flow.py index c4107ed7e8b..e2ff0908bbe 100644 --- a/homeassistant/components/knx/config_flow.py +++ b/homeassistant/components/knx/config_flow.py @@ -33,6 +33,9 @@ from .const import ( CONF_KNX_RATE_LIMIT, CONF_KNX_ROUTE_BACK, CONF_KNX_ROUTING, + CONF_KNX_ROUTING_BACKBONE_KEY, + CONF_KNX_ROUTING_SECURE, + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE, CONF_KNX_SECURE_DEVICE_AUTHENTICATION, CONF_KNX_SECURE_USER_ID, CONF_KNX_SECURE_USER_PASSWORD, @@ -87,13 +90,13 @@ class KNXCommonFlow(ABC, FlowHandler): def __init__(self, initial_data: KNXConfigEntryData) -> None: """Initialize KNXCommonFlow.""" self.initial_data = initial_data + self.new_entry_data = KNXConfigEntryData() self._found_gateways: list[GatewayDescriptor] = [] self._found_tunnels: list[GatewayDescriptor] = [] self._selected_tunnel: GatewayDescriptor | None = None - self._tunneling_config: KNXConfigEntryData | None = None @abstractmethod - def finish_flow(self, new_entry_data: KNXConfigEntryData, title: str) -> FlowResult: + def finish_flow(self, title: str) -> FlowResult: """Finish the flow.""" async def async_step_connection_type( @@ -119,11 +122,8 @@ class KNXCommonFlow(ABC, FlowHandler): return await self.async_step_tunnel() # Automatic connection type - entry_data = KNXConfigEntryData(connection_type=CONF_KNX_AUTOMATIC) - return self.finish_flow( - new_entry_data=entry_data, - title=CONF_KNX_AUTOMATIC.capitalize(), - ) + self.new_entry_data = KNXConfigEntryData(connection_type=CONF_KNX_AUTOMATIC) + return self.finish_flow(title=CONF_KNX_AUTOMATIC.capitalize()) supported_connection_types = { CONF_KNX_TUNNELING: CONF_KNX_TUNNELING.capitalize(), @@ -163,7 +163,7 @@ class KNXCommonFlow(ABC, FlowHandler): if self._selected_tunnel.supports_tunnelling_tcp else CONF_KNX_TUNNELING ) - self._tunneling_config = KNXConfigEntryData( + self.new_entry_data = KNXConfigEntryData( host=self._selected_tunnel.ip_addr, port=self._selected_tunnel.port, route_back=False, @@ -171,13 +171,10 @@ class KNXCommonFlow(ABC, FlowHandler): ) if connection_type == CONF_KNX_TUNNELING_TCP_SECURE: return self.async_show_menu( - step_id="secure_tunneling", + step_id="secure_key_source", menu_options=["secure_knxkeys", "secure_tunnel_manual"], ) - return self.finish_flow( - new_entry_data=self._tunneling_config, - title=f"Tunneling @ {self._selected_tunnel}", - ) + return self.finish_flow(title=f"Tunneling @ {self._selected_tunnel}") if not self._found_tunnels: return await self.async_step_manual_tunnel() @@ -211,7 +208,7 @@ class KNXCommonFlow(ABC, FlowHandler): if not errors: connection_type = user_input[CONF_KNX_TUNNELING_TYPE] - self._tunneling_config = KNXConfigEntryData( + self.new_entry_data = KNXConfigEntryData( host=_host, port=user_input[CONF_PORT], route_back=user_input[CONF_KNX_ROUTE_BACK], @@ -221,13 +218,10 @@ class KNXCommonFlow(ABC, FlowHandler): if connection_type == CONF_KNX_TUNNELING_TCP_SECURE: return self.async_show_menu( - step_id="secure_tunneling", - menu_options=["secure_knxkeys", "secure_tunnel_manual"], + step_id="secure_key_source", + menu_options=["secure_knxkeys", "secure_routing_manual"], ) - return self.finish_flow( - new_entry_data=self._tunneling_config, - title=f"Tunneling @ {_host}", - ) + return self.finish_flow(title=f"Tunneling @ {_host}") _reconfiguring_existing_tunnel = ( self.initial_data.get(CONF_KNX_CONNECTION_TYPE) @@ -290,20 +284,18 @@ class KNXCommonFlow(ABC, FlowHandler): async def async_step_secure_tunnel_manual( self, user_input: dict | None = None ) -> FlowResult: - """Configure ip secure manually.""" + """Configure ip secure tunnelling manually.""" errors: dict = {} if user_input is not None: - assert self._tunneling_config - entry_data = self._tunneling_config | KNXConfigEntryData( + self.new_entry_data |= KNXConfigEntryData( connection_type=CONF_KNX_TUNNELING_TCP_SECURE, device_authentication=user_input[CONF_KNX_SECURE_DEVICE_AUTHENTICATION], user_id=user_input[CONF_KNX_SECURE_USER_ID], user_password=user_input[CONF_KNX_SECURE_USER_PASSWORD], ) return self.finish_flow( - new_entry_data=entry_data, - title=f"Secure Tunneling @ {self._tunneling_config[CONF_HOST]}", + title=f"Secure Tunneling @ {self.new_entry_data[CONF_HOST]}" ) fields = { @@ -338,6 +330,60 @@ class KNXCommonFlow(ABC, FlowHandler): errors=errors, ) + async def async_step_secure_routing_manual( + self, user_input: dict | None = None + ) -> FlowResult: + """Configure ip secure routing manually.""" + errors: dict = {} + + if user_input is not None: + try: + key_bytes = bytes.fromhex(user_input[CONF_KNX_ROUTING_BACKBONE_KEY]) + if len(key_bytes) != 16: + raise ValueError + except ValueError: + errors[CONF_KNX_ROUTING_BACKBONE_KEY] = "invalid_backbone_key" + if not errors: + self.new_entry_data |= KNXConfigEntryData( + backbone_key=user_input[CONF_KNX_ROUTING_BACKBONE_KEY], + sync_latency_tolerance=user_input[ + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE + ], + ) + return self.finish_flow( + title=f"Secure Routing as {self.new_entry_data[CONF_KNX_INDIVIDUAL_ADDRESS]}" + ) + + fields = { + vol.Required( + CONF_KNX_ROUTING_BACKBONE_KEY, + default=self.initial_data.get(CONF_KNX_ROUTING_BACKBONE_KEY), + ): selector.TextSelector( + selector.TextSelectorConfig(type=selector.TextSelectorType.PASSWORD), + ), + vol.Required( + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE, + default=self.initial_data.get(CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE) + or 1000, + ): vol.All( + selector.NumberSelector( + selector.NumberSelectorConfig( + min=400, + max=4000, + unit_of_measurement="ms", + mode=selector.NumberSelectorMode.BOX, + ), + ), + vol.Coerce(int), + ), + } + + return self.async_show_form( + step_id="secure_routing_manual", + data_schema=vol.Schema(fields), + errors=errors, + ) + async def async_step_secure_knxkeys( self, user_input: dict | None = None ) -> FlowResult: @@ -345,7 +391,6 @@ class KNXCommonFlow(ABC, FlowHandler): errors = {} if user_input is not None: - assert self._tunneling_config storage_key = CONST_KNX_STORAGE_KEY + user_input[CONF_KNX_KNXKEY_FILENAME] try: await load_keyring( @@ -358,15 +403,20 @@ class KNXCommonFlow(ABC, FlowHandler): errors[CONF_KNX_KNXKEY_PASSWORD] = "invalid_signature" if not errors: - entry_data = self._tunneling_config | KNXConfigEntryData( - connection_type=CONF_KNX_TUNNELING_TCP_SECURE, + self.new_entry_data |= KNXConfigEntryData( + backbone_key=None, + sync_latency_tolerance=None, knxkeys_filename=storage_key, knxkeys_password=user_input[CONF_KNX_KNXKEY_PASSWORD], ) - return self.finish_flow( - new_entry_data=entry_data, - title=f"Secure Tunneling @ {self._tunneling_config[CONF_HOST]}", - ) + if ( + self.new_entry_data[CONF_KNX_CONNECTION_TYPE] + == CONF_KNX_ROUTING_SECURE + ): + title = f"Secure Routing as {self.new_entry_data[CONF_KNX_INDIVIDUAL_ADDRESS]}" + else: + title = f"Secure Tunneling @ {self.new_entry_data[CONF_HOST]}" + return self.finish_flow(title=title) fields = { vol.Required( @@ -418,34 +468,46 @@ class KNXCommonFlow(ABC, FlowHandler): errors[CONF_KNX_LOCAL_IP] = "invalid_ip_address" if not errors: - entry_data = KNXConfigEntryData( - connection_type=CONF_KNX_ROUTING, + connection_type = ( + CONF_KNX_ROUTING_SECURE + if user_input[CONF_KNX_ROUTING_SECURE] + else CONF_KNX_ROUTING + ) + self.new_entry_data = KNXConfigEntryData( + connection_type=connection_type, individual_address=_individual_address, multicast_group=_multicast_group, multicast_port=_multicast_port, local_ip=_local_ip, ) - return self.finish_flow( - new_entry_data=entry_data, - title=f"Routing as {_individual_address}", - ) + if connection_type == CONF_KNX_ROUTING_SECURE: + return self.async_show_menu( + step_id="secure_key_source", + menu_options=["secure_knxkeys", "secure_routing_manual"], + ) + return self.finish_flow(title=f"Routing as {_individual_address}") + + routers = [router for router in self._found_gateways if router.supports_routing] + if not routers: + errors["base"] = "no_router_discovered" + default_secure_routing_enable = any( + router for router in routers if router.routing_requires_secure + ) fields = { vol.Required( CONF_KNX_INDIVIDUAL_ADDRESS, default=_individual_address ): _IA_SELECTOR, + vol.Required( + CONF_KNX_ROUTING_SECURE, default=default_secure_routing_enable + ): selector.BooleanSelector(), vol.Required(CONF_KNX_MCAST_GRP, default=_multicast_group): _IP_SELECTOR, vol.Required(CONF_KNX_MCAST_PORT, default=_multicast_port): _PORT_SELECTOR, } - if self.show_advanced_options: # Optional with default doesn't work properly in flow UI fields[vol.Optional(CONF_KNX_LOCAL_IP)] = _IP_SELECTOR - if not any( - router for router in self._found_gateways if router.supports_routing - ): - errors["base"] = "no_router_discovered" return self.async_show_form( step_id="routing", data_schema=vol.Schema(fields), errors=errors ) @@ -467,11 +529,11 @@ class KNXConfigFlow(KNXCommonFlow, ConfigFlow, domain=DOMAIN): return KNXOptionsFlow(config_entry) @callback - def finish_flow(self, new_entry_data: KNXConfigEntryData, title: str) -> FlowResult: + def finish_flow(self, title: str) -> FlowResult: """Create the ConfigEntry.""" return self.async_create_entry( title=title, - data=DEFAULT_ENTRY_DATA | new_entry_data, + data=DEFAULT_ENTRY_DATA | self.new_entry_data, ) async def async_step_user(self, user_input: dict | None = None) -> FlowResult: @@ -492,11 +554,9 @@ class KNXOptionsFlow(KNXCommonFlow, OptionsFlow): super().__init__(initial_data=config_entry.data) # type: ignore[arg-type] @callback - def finish_flow( - self, new_entry_data: KNXConfigEntryData, title: str | None - ) -> FlowResult: + def finish_flow(self, title: str | None) -> FlowResult: """Update the ConfigEntry and finish the flow.""" - new_data = DEFAULT_ENTRY_DATA | self.initial_data | new_entry_data + new_data = DEFAULT_ENTRY_DATA | self.initial_data | self.new_entry_data self.hass.config_entries.async_update_entry( self.config_entry, data=new_data, @@ -518,13 +578,11 @@ class KNXOptionsFlow(KNXCommonFlow, OptionsFlow): ) -> FlowResult: """Manage KNX communication settings.""" if user_input is not None: - return self.finish_flow( - new_entry_data=KNXConfigEntryData( - state_updater=user_input[CONF_KNX_STATE_UPDATER], - rate_limit=user_input[CONF_KNX_RATE_LIMIT], - ), - title=None, + self.new_entry_data = KNXConfigEntryData( + state_updater=user_input[CONF_KNX_STATE_UPDATER], + rate_limit=user_input[CONF_KNX_RATE_LIMIT], ) + return self.finish_flow(title=None) data_schema = { vol.Required( diff --git a/homeassistant/components/knx/const.py b/homeassistant/components/knx/const.py index a6a88b2010a..4a8b113516c 100644 --- a/homeassistant/components/knx/const.py +++ b/homeassistant/components/knx/const.py @@ -30,6 +30,9 @@ CONF_KNX_INDIVIDUAL_ADDRESS: Final = "individual_address" CONF_KNX_CONNECTION_TYPE: Final = "connection_type" CONF_KNX_AUTOMATIC: Final = "automatic" CONF_KNX_ROUTING: Final = "routing" +CONF_KNX_ROUTING_BACKBONE_KEY: Final = "backbone_key" +CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: Final = "sync_latency_tolerance" +CONF_KNX_ROUTING_SECURE: Final = "routing_secure" CONF_KNX_TUNNELING: Final = "tunneling" CONF_KNX_TUNNELING_TCP: Final = "tunneling_tcp" CONF_KNX_TUNNELING_TCP_SECURE: Final = "tunneling_tcp_secure" @@ -92,6 +95,8 @@ class KNXConfigEntryData(TypedDict, total=False): device_authentication: str knxkeys_filename: str knxkeys_password: str + backbone_key: str | None + sync_latency_tolerance: int | None class ColorTempModes(Enum): diff --git a/homeassistant/components/knx/diagnostics.py b/homeassistant/components/knx/diagnostics.py index c409b4116bf..3dd14aef653 100644 --- a/homeassistant/components/knx/diagnostics.py +++ b/homeassistant/components/knx/diagnostics.py @@ -13,12 +13,14 @@ from homeassistant.core import HomeAssistant from . import CONFIG_SCHEMA from .const import ( CONF_KNX_KNXKEY_PASSWORD, + CONF_KNX_ROUTING_BACKBONE_KEY, CONF_KNX_SECURE_DEVICE_AUTHENTICATION, CONF_KNX_SECURE_USER_PASSWORD, DOMAIN, ) TO_REDACT = { + CONF_KNX_ROUTING_BACKBONE_KEY, CONF_KNX_KNXKEY_PASSWORD, CONF_KNX_SECURE_USER_PASSWORD, CONF_KNX_SECURE_DEVICE_AUTHENTICATION, diff --git a/homeassistant/components/knx/strings.json b/homeassistant/components/knx/strings.json index d87ad6ac177..0fbb7fc3ae6 100644 --- a/homeassistant/components/knx/strings.json +++ b/homeassistant/components/knx/strings.json @@ -29,11 +29,12 @@ "local_ip": "Leave blank to use auto-discovery." } }, - "secure_tunneling": { + "secure_key_source": { "description": "Select how you want to configure KNX/IP Secure.", "menu_options": { "secure_knxkeys": "Use a `.knxkeys` file containing IP secure keys", - "secure_tunnel_manual": "Configure IP secure keys manually" + "secure_tunnel_manual": "Configure IP secure credentials manually", + "secure_routing_manual": "Configure IP secure backbone key manually" } }, "secure_knxkeys": { @@ -60,10 +61,22 @@ "device_authentication": "This is set in the 'IP' panel of the interface in ETS." } }, + "secure_routing_manual": { + "description": "Please enter your IP secure information.", + "data": { + "backbone_key": "Backbone key", + "sync_latency_tolerance": "Network latency tolerance" + }, + "data_description": { + "backbone_key": "Can be seen in the 'Security' report of an ETS project. Eg. '00112233445566778899AABBCCDDEEFF'", + "sync_latency_tolerance": "Default is 1000." + } + }, "routing": { "description": "Please configure the routing options.", "data": { "individual_address": "Individual address", + "routing_secure": "Use KNX IP Secure", "multicast_group": "Multicast group", "multicast_port": "Multicast port", "local_ip": "Local IP of Home Assistant" @@ -80,6 +93,7 @@ }, "error": { "cannot_connect": "[%key:common::config_flow::error::cannot_connect%]", + "invalid_backbone_key": "Invalid backbone key. 32 hexadecimal numbers expected.", "invalid_individual_address": "Value does not match pattern for KNX individual address.\n'area.line.device'", "invalid_ip_address": "Invalid IPv4 address.", "invalid_signature": "The password to decrypt the `.knxkeys` file is wrong.", @@ -134,11 +148,12 @@ "local_ip": "[%key:component::knx::config::step::manual_tunnel::data_description::local_ip%]" } }, - "secure_tunneling": { - "description": "[%key:component::knx::config::step::secure_tunneling::description%]", + "secure_key_source": { + "description": "[%key:component::knx::config::step::secure_key_source::description%]", "menu_options": { - "secure_knxkeys": "[%key:component::knx::config::step::secure_tunneling::menu_options::secure_knxkeys%]", - "secure_tunnel_manual": "[%key:component::knx::config::step::secure_tunneling::menu_options::secure_tunnel_manual%]" + "secure_knxkeys": "[%key:component::knx::config::step::secure_key_source::menu_options::secure_knxkeys%]", + "secure_tunnel_manual": "[%key:component::knx::config::step::secure_key_source::menu_options::secure_tunnel_manual%]", + "secure_routing_manual": "[%key:component::knx::config::step::secure_key_source::menu_options::secure_routing_manual%]" } }, "secure_knxkeys": { @@ -165,10 +180,22 @@ "device_authentication": "[%key:component::knx::config::step::secure_tunnel_manual::data_description::device_authentication%]" } }, + "secure_routing_manual": { + "description": "[%key:component::knx::config::step::secure_routing_manual::description%]", + "data": { + "backbone_key": "[%key:component::knx::config::step::secure_routing_manual::data::backbone_key%]", + "sync_latency_tolerance": "[%key:component::knx::config::step::secure_routing_manual::data::sync_latency_tolerance%]" + }, + "data_description": { + "backbone_key": "[%key:component::knx::config::step::secure_routing_manual::data_description::backbone_key%]", + "sync_latency_tolerance": "[%key:component::knx::config::step::secure_routing_manual::data_description::sync_latency_tolerance%]" + } + }, "routing": { "description": "[%key:component::knx::config::step::routing::description%]", "data": { "individual_address": "[%key:component::knx::config::step::routing::data::individual_address%]", + "routing_secure": "[%key:component::knx::config::step::routing::data::routing_secure%]", "multicast_group": "[%key:component::knx::config::step::routing::data::multicast_group%]", "multicast_port": "[%key:component::knx::config::step::routing::data::multicast_port%]", "local_ip": "[%key:component::knx::config::step::routing::data::local_ip%]" @@ -181,6 +208,7 @@ }, "error": { "cannot_connect": "[%key:common::config_flow::error::cannot_connect%]", + "invalid_backbone_key": "[%key:component::knx::config::error::invalid_backbone_key%]", "invalid_individual_address": "[%key:component::knx::config::error::invalid_individual_address%]", "invalid_ip_address": "[%key:component::knx::config::error::invalid_ip_address%]", "invalid_signature": "[%key:component::knx::config::error::invalid_signature%]", diff --git a/homeassistant/components/knx/translations/en.json b/homeassistant/components/knx/translations/en.json index c45c98b070a..8ed45661d0d 100644 --- a/homeassistant/components/knx/translations/en.json +++ b/homeassistant/components/knx/translations/en.json @@ -7,6 +7,7 @@ "error": { "cannot_connect": "Failed to connect", "file_not_found": "The specified `.knxkeys` file was not found in the path config/.storage/knx/", + "invalid_backbone_key": "Invalid backbone key. 32 hexadecimal numbers expected.", "invalid_individual_address": "Value does not match pattern for KNX individual address.\n'area.line.device'", "invalid_ip_address": "Invalid IPv4 address.", "invalid_signature": "The password to decrypt the `.knxkeys` file is wrong.", @@ -41,7 +42,8 @@ "individual_address": "Individual address", "local_ip": "Local IP of Home Assistant", "multicast_group": "Multicast group", - "multicast_port": "Multicast port" + "multicast_port": "Multicast port", + "routing_secure": "Use KNX IP Secure" }, "data_description": { "individual_address": "KNX address to be used by Home Assistant, e.g. `0.0.4`", @@ -49,6 +51,14 @@ }, "description": "Please configure the routing options." }, + "secure_key_source": { + "description": "Select how you want to configure KNX/IP Secure.", + "menu_options": { + "secure_knxkeys": "Use a `.knxkeys` file containing IP secure keys", + "secure_routing_manual": "Configure IP secure backbone key manually", + "secure_tunnel_manual": "Configure IP secure credentials manually" + } + }, "secure_knxkeys": { "data": { "knxkeys_filename": "The filename of your `.knxkeys` file (including extension)", @@ -60,6 +70,17 @@ }, "description": "Please enter the information for your `.knxkeys` file." }, + "secure_routing_manual": { + "data": { + "backbone_key": "Backbone key", + "sync_latency_tolerance": "Network latency tolerance" + }, + "data_description": { + "backbone_key": "Can be seen in the 'Security' report of an ETS project. Eg. '00112233445566778899AABBCCDDEEFF'", + "sync_latency_tolerance": "Default is 1000." + }, + "description": "Please enter your IP secure information." + }, "secure_tunnel_manual": { "data": { "device_authentication": "Device authentication password", @@ -73,13 +94,6 @@ }, "description": "Please enter your IP secure information." }, - "secure_tunneling": { - "description": "Select how you want to configure KNX/IP Secure.", - "menu_options": { - "secure_knxkeys": "Use a `.knxkeys` file containing IP secure keys", - "secure_tunnel_manual": "Configure IP secure keys manually" - } - }, "tunnel": { "data": { "gateway": "KNX Tunnel Connection" @@ -92,6 +106,7 @@ "error": { "cannot_connect": "Failed to connect", "file_not_found": "The specified `.knxkeys` file was not found in the path config/.storage/knx/", + "invalid_backbone_key": "Invalid backbone key. 32 hexadecimal numbers expected.", "invalid_individual_address": "Value does not match pattern for KNX individual address.\n'area.line.device'", "invalid_ip_address": "Invalid IPv4 address.", "invalid_signature": "The password to decrypt the `.knxkeys` file is wrong.", @@ -142,7 +157,8 @@ "individual_address": "Individual address", "local_ip": "Local IP of Home Assistant", "multicast_group": "Multicast group", - "multicast_port": "Multicast port" + "multicast_port": "Multicast port", + "routing_secure": "Use KNX IP Secure" }, "data_description": { "individual_address": "KNX address to be used by Home Assistant, e.g. `0.0.4`", @@ -150,6 +166,14 @@ }, "description": "Please configure the routing options." }, + "secure_key_source": { + "description": "Select how you want to configure KNX/IP Secure.", + "menu_options": { + "secure_knxkeys": "Use a `.knxkeys` file containing IP secure keys", + "secure_routing_manual": "Configure IP secure backbone key manually", + "secure_tunnel_manual": "Configure IP secure credentials manually" + } + }, "secure_knxkeys": { "data": { "knxkeys_filename": "The filename of your `.knxkeys` file (including extension)", @@ -161,6 +185,17 @@ }, "description": "Please enter the information for your `.knxkeys` file." }, + "secure_routing_manual": { + "data": { + "backbone_key": "Backbone key", + "sync_latency_tolerance": "Network latency tolerance" + }, + "data_description": { + "backbone_key": "Can be seen in the 'Security' report of an ETS project. Eg. '00112233445566778899AABBCCDDEEFF'", + "sync_latency_tolerance": "Default is 1000." + }, + "description": "Please enter your IP secure information." + }, "secure_tunnel_manual": { "data": { "device_authentication": "Device authentication password", @@ -174,13 +209,6 @@ }, "description": "Please enter your IP secure information." }, - "secure_tunneling": { - "description": "Select how you want to configure KNX/IP Secure.", - "menu_options": { - "secure_knxkeys": "Use a `.knxkeys` file containing IP secure keys", - "secure_tunnel_manual": "Configure IP secure keys manually" - } - }, "tunnel": { "data": { "gateway": "KNX Tunnel Connection" diff --git a/tests/components/knx/test_config_flow.py b/tests/components/knx/test_config_flow.py index d57007ed28b..2ce3793937b 100644 --- a/tests/components/knx/test_config_flow.py +++ b/tests/components/knx/test_config_flow.py @@ -26,6 +26,9 @@ from homeassistant.components.knx.const import ( CONF_KNX_RATE_LIMIT, CONF_KNX_ROUTE_BACK, CONF_KNX_ROUTING, + CONF_KNX_ROUTING_BACKBONE_KEY, + CONF_KNX_ROUTING_SECURE, + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE, CONF_KNX_SECURE_DEVICE_AUTHENTICATION, CONF_KNX_SECURE_USER_ID, CONF_KNX_SECURE_USER_PASSWORD, @@ -197,6 +200,162 @@ async def test_routing_setup_advanced(hass: HomeAssistant) -> None: assert len(mock_setup_entry.mock_calls) == 1 +async def test_routing_secure_manual_setup(hass: HomeAssistant) -> None: + """Test routing secure setup with manual key config.""" + with patch("xknx.io.gateway_scanner.GatewayScanner.scan") as gateways: + gateways.return_value = [] + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] == FlowResultType.FORM + assert not result["errors"] + + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_KNX_CONNECTION_TYPE: CONF_KNX_ROUTING, + }, + ) + await hass.async_block_till_done() + assert result2["type"] == FlowResultType.FORM + assert result2["step_id"] == "routing" + assert result2["errors"] == {"base": "no_router_discovered"} + + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], + { + CONF_KNX_MCAST_GRP: DEFAULT_MCAST_GRP, + CONF_KNX_MCAST_PORT: 3671, + CONF_KNX_INDIVIDUAL_ADDRESS: "0.0.123", + CONF_KNX_ROUTING_SECURE: True, + }, + ) + assert result3["type"] == FlowResultType.MENU + assert result3["step_id"] == "secure_key_source" + + result4 = await hass.config_entries.flow.async_configure( + result3["flow_id"], + {"next_step_id": "secure_routing_manual"}, + ) + assert result4["type"] == FlowResultType.FORM + assert result4["step_id"] == "secure_routing_manual" + assert not result4["errors"] + + result_invalid_key1 = await hass.config_entries.flow.async_configure( + result4["flow_id"], + { + CONF_KNX_ROUTING_BACKBONE_KEY: "xxaacc44bbaacc44bbaacc44bbaaccyy", # invalid hex string + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: 2000, + }, + ) + assert result_invalid_key1["type"] == FlowResultType.FORM + assert result_invalid_key1["step_id"] == "secure_routing_manual" + assert result_invalid_key1["errors"] == {"backbone_key": "invalid_backbone_key"} + + result_invalid_key2 = await hass.config_entries.flow.async_configure( + result4["flow_id"], + { + CONF_KNX_ROUTING_BACKBONE_KEY: "bbaacc44bbaacc44", # invalid length + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: 2000, + }, + ) + assert result_invalid_key2["type"] == FlowResultType.FORM + assert result_invalid_key2["step_id"] == "secure_routing_manual" + assert result_invalid_key2["errors"] == {"backbone_key": "invalid_backbone_key"} + + with patch( + "homeassistant.components.knx.async_setup_entry", + return_value=True, + ) as mock_setup_entry: + secure_routing_manual = await hass.config_entries.flow.async_configure( + result_invalid_key2["flow_id"], + { + CONF_KNX_ROUTING_BACKBONE_KEY: "bbaacc44bbaacc44bbaacc44bbaacc44", + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: 2000, + }, + ) + await hass.async_block_till_done() + assert secure_routing_manual["type"] == FlowResultType.CREATE_ENTRY + assert secure_routing_manual["title"] == "Secure Routing as 0.0.123" + assert secure_routing_manual["data"] == { + **DEFAULT_ENTRY_DATA, + CONF_KNX_CONNECTION_TYPE: CONF_KNX_ROUTING_SECURE, + CONF_KNX_ROUTING_BACKBONE_KEY: "bbaacc44bbaacc44bbaacc44bbaacc44", + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: 2000, + CONF_KNX_INDIVIDUAL_ADDRESS: "0.0.123", + } + assert len(mock_setup_entry.mock_calls) == 1 + + +async def test_routing_secure_keyfile(hass: HomeAssistant) -> None: + """Test routing secure setup with keyfile.""" + with patch("xknx.io.gateway_scanner.GatewayScanner.scan") as gateways: + gateways.return_value = [] + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + assert result["type"] == FlowResultType.FORM + assert not result["errors"] + + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_KNX_CONNECTION_TYPE: CONF_KNX_ROUTING, + }, + ) + await hass.async_block_till_done() + assert result2["type"] == FlowResultType.FORM + assert result2["step_id"] == "routing" + assert result2["errors"] == {"base": "no_router_discovered"} + + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], + { + CONF_KNX_MCAST_GRP: DEFAULT_MCAST_GRP, + CONF_KNX_MCAST_PORT: 3671, + CONF_KNX_INDIVIDUAL_ADDRESS: "0.0.123", + CONF_KNX_ROUTING_SECURE: True, + }, + ) + assert result3["type"] == FlowResultType.MENU + assert result3["step_id"] == "secure_key_source" + + result4 = await hass.config_entries.flow.async_configure( + result3["flow_id"], + {"next_step_id": "secure_knxkeys"}, + ) + assert result4["type"] == FlowResultType.FORM + assert result4["step_id"] == "secure_knxkeys" + assert not result4["errors"] + + with patch( + "homeassistant.components.knx.async_setup_entry", + return_value=True, + ) as mock_setup_entry, patch( + "homeassistant.components.knx.config_flow.load_keyring", return_value=True + ): + routing_secure_knxkeys = await hass.config_entries.flow.async_configure( + result4["flow_id"], + { + CONF_KNX_KNXKEY_FILENAME: "testcase.knxkeys", + CONF_KNX_KNXKEY_PASSWORD: "password", + }, + ) + await hass.async_block_till_done() + assert routing_secure_knxkeys["type"] == FlowResultType.CREATE_ENTRY + assert routing_secure_knxkeys["title"] == "Secure Routing as 0.0.123" + assert routing_secure_knxkeys["data"] == { + **DEFAULT_ENTRY_DATA, + CONF_KNX_CONNECTION_TYPE: CONF_KNX_ROUTING_SECURE, + CONF_KNX_KNXKEY_FILENAME: "knx/testcase.knxkeys", + CONF_KNX_KNXKEY_PASSWORD: "password", + CONF_KNX_ROUTING_BACKBONE_KEY: None, + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: None, + CONF_KNX_INDIVIDUAL_ADDRESS: "0.0.123", + } + assert len(mock_setup_entry.mock_calls) == 1 + + @pytest.mark.parametrize( "user_input,config_entry_data", [ @@ -506,7 +665,7 @@ async def test_form_with_automatic_connection_handling(hass: HomeAssistant) -> N async def _get_menu_step(hass: HomeAssistant) -> FlowResult: - """Return flow in secure_tunnellinn menu step.""" + """Return flow in secure_tunnelling menu step.""" gateway = _gateway_descriptor( "192.168.0.1", 3675, @@ -538,7 +697,7 @@ async def _get_menu_step(hass: HomeAssistant) -> FlowResult: ) await hass.async_block_till_done() assert result3["type"] == FlowResultType.MENU - assert result3["step_id"] == "secure_tunneling" + assert result3["step_id"] == "secure_key_source" return result3 @@ -588,7 +747,7 @@ async def test_get_secure_menu_step_manual_tunnelling( ) await hass.async_block_till_done() assert result3["type"] == FlowResultType.MENU - assert result3["step_id"] == "secure_tunneling" + assert result3["step_id"] == "secure_key_source" async def test_configure_secure_tunnel_manual(hass: HomeAssistant): @@ -665,6 +824,8 @@ async def test_configure_secure_knxkeys(hass: HomeAssistant): CONF_KNX_CONNECTION_TYPE: CONF_KNX_TUNNELING_TCP_SECURE, CONF_KNX_KNXKEY_FILENAME: "knx/testcase.knxkeys", CONF_KNX_KNXKEY_PASSWORD: "password", + CONF_KNX_ROUTING_BACKBONE_KEY: None, + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: None, CONF_HOST: "192.168.0.1", CONF_PORT: 3675, CONF_KNX_INDIVIDUAL_ADDRESS: "0.0.240", diff --git a/tests/components/knx/test_diagnostic.py b/tests/components/knx/test_diagnostic.py index f40649ee1eb..130f1f70f35 100644 --- a/tests/components/knx/test_diagnostic.py +++ b/tests/components/knx/test_diagnostic.py @@ -14,6 +14,7 @@ from homeassistant.components.knx.const import ( CONF_KNX_MCAST_GRP, CONF_KNX_MCAST_PORT, CONF_KNX_RATE_LIMIT, + CONF_KNX_ROUTING_BACKBONE_KEY, CONF_KNX_SECURE_DEVICE_AUTHENTICATION, CONF_KNX_SECURE_USER_PASSWORD, CONF_KNX_STATE_UPDATER, @@ -107,6 +108,7 @@ async def test_diagnostic_redact( CONF_KNX_KNXKEY_PASSWORD: "password", CONF_KNX_SECURE_USER_PASSWORD: "user_password", CONF_KNX_SECURE_DEVICE_AUTHENTICATION: "device_authentication", + CONF_KNX_ROUTING_BACKBONE_KEY: "bbaacc44bbaacc44bbaacc44bbaacc44", }, ) knx: KNXTestKit = KNXTestKit(hass, mock_config_entry) @@ -128,6 +130,7 @@ async def test_diagnostic_redact( "knxkeys_password": "**REDACTED**", "user_password": "**REDACTED**", "device_authentication": "**REDACTED**", + "backbone_key": "**REDACTED**", }, "configuration_error": None, "configuration_yaml": None, diff --git a/tests/components/knx/test_init.py b/tests/components/knx/test_init.py index 4bda407ace7..3a8a42fdf98 100644 --- a/tests/components/knx/test_init.py +++ b/tests/components/knx/test_init.py @@ -23,6 +23,9 @@ from homeassistant.components.knx.const import ( CONF_KNX_RATE_LIMIT, CONF_KNX_ROUTE_BACK, CONF_KNX_ROUTING, + CONF_KNX_ROUTING_BACKBONE_KEY, + CONF_KNX_ROUTING_SECURE, + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE, CONF_KNX_SECURE_DEVICE_AUTHENTICATION, CONF_KNX_SECURE_USER_ID, CONF_KNX_SECURE_USER_PASSWORD, @@ -167,6 +170,31 @@ from tests.common import MockConfigEntry threaded=True, ), ), + ( + { + CONF_KNX_CONNECTION_TYPE: CONF_KNX_ROUTING_SECURE, + CONF_KNX_LOCAL_IP: "192.168.1.1", + CONF_KNX_RATE_LIMIT: CONF_KNX_DEFAULT_RATE_LIMIT, + CONF_KNX_STATE_UPDATER: CONF_KNX_DEFAULT_STATE_UPDATER, + CONF_KNX_MCAST_PORT: DEFAULT_MCAST_PORT, + CONF_KNX_MCAST_GRP: DEFAULT_MCAST_GRP, + CONF_KNX_INDIVIDUAL_ADDRESS: DEFAULT_ROUTING_IA, + CONF_KNX_ROUTING_BACKBONE_KEY: "bbaacc44bbaacc44bbaacc44bbaacc44", + CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE: 2000, + }, + ConnectionConfig( + connection_type=ConnectionType.ROUTING_SECURE, + individual_address=DEFAULT_ROUTING_IA, + multicast_group=DEFAULT_MCAST_GRP, + multicast_port=DEFAULT_MCAST_PORT, + secure_config=SecureConfig( + backbone_key="bbaacc44bbaacc44bbaacc44bbaacc44", + latency_ms=2000, + ), + local_ip="192.168.1.1", + threaded=True, + ), + ), ], ) async def test_init_connection_handling(