Refactor KNX config flow and validate user input (#69698)

* validate config flow user input

* test flow for invalid user input

* validate multicast address blocks

* Update homeassistant/components/knx/config_flow.py

Co-authored-by: Marvin Wichmann <me@marvin-wichmann.de>

Co-authored-by: Marvin Wichmann <me@marvin-wichmann.de>
This commit is contained in:
Matthias Alphart 2022-04-10 15:56:45 +02:00 committed by GitHub
parent 4853ce208f
commit b3d1574a71
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 226 additions and 135 deletions

View file

@ -24,7 +24,6 @@ from .const import (
CONF_KNX_DEFAULT_RATE_LIMIT,
CONF_KNX_DEFAULT_STATE_UPDATER,
CONF_KNX_INDIVIDUAL_ADDRESS,
CONF_KNX_INITIAL_CONNECTION_TYPES,
CONF_KNX_KNXKEY_FILENAME,
CONF_KNX_KNXKEY_PASSWORD,
CONF_KNX_LOCAL_IP,
@ -44,18 +43,19 @@ from .const import (
DOMAIN,
KNXConfigEntryData,
)
from .schema import ia_validator, ip_v4_validator
CONF_KNX_GATEWAY: Final = "gateway"
CONF_MAX_RATE_LIMIT: Final = 60
CONF_DEFAULT_LOCAL_IP: Final = "0.0.0.0"
DEFAULT_ENTRY_DATA: KNXConfigEntryData = {
CONF_KNX_STATE_UPDATER: CONF_KNX_DEFAULT_STATE_UPDATER,
CONF_KNX_RATE_LIMIT: CONF_KNX_DEFAULT_RATE_LIMIT,
CONF_KNX_INDIVIDUAL_ADDRESS: XKNX.DEFAULT_ADDRESS,
CONF_KNX_MCAST_GRP: DEFAULT_MCAST_GRP,
CONF_KNX_MCAST_PORT: DEFAULT_MCAST_PORT,
}
DEFAULT_ENTRY_DATA = KNXConfigEntryData(
individual_address=XKNX.DEFAULT_ADDRESS,
multicast_group=DEFAULT_MCAST_GRP,
multicast_port=DEFAULT_MCAST_PORT,
state_updater=CONF_KNX_DEFAULT_STATE_UPDATER,
rate_limit=CONF_KNX_DEFAULT_RATE_LIMIT,
)
CONF_KNX_TUNNELING_TYPE: Final = "tunneling_type"
CONF_KNX_LABEL_TUNNELING_TCP: Final = "TCP"
@ -101,10 +101,9 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
if user_input is not None:
connection_type = user_input[CONF_KNX_CONNECTION_TYPE]
if connection_type == CONF_KNX_AUTOMATIC:
entry_data: KNXConfigEntryData = {
**DEFAULT_ENTRY_DATA, # type: ignore[misc]
CONF_KNX_CONNECTION_TYPE: user_input[CONF_KNX_CONNECTION_TYPE],
}
entry_data = DEFAULT_ENTRY_DATA | KNXConfigEntryData(
connection_type=CONF_KNX_AUTOMATIC
)
return self.async_create_entry(
title=CONF_KNX_AUTOMATIC.capitalize(),
data=entry_data,
@ -118,13 +117,15 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return await self.async_step_manual_tunnel()
errors: dict = {}
supported_connection_types = CONF_KNX_INITIAL_CONNECTION_TYPES.copy()
gateways = await scan_for_gateways()
if gateways:
# add automatic only if a gateway responded
supported_connection_types.insert(0, CONF_KNX_AUTOMATIC)
supported_connection_types = {
CONF_KNX_TUNNELING: CONF_KNX_TUNNELING.capitalize(),
CONF_KNX_ROUTING: CONF_KNX_ROUTING.capitalize(),
}
if gateways := await scan_for_gateways():
# add automatic at first position only if a gateway responded
supported_connection_types = {
CONF_KNX_AUTOMATIC: CONF_KNX_AUTOMATIC.capitalize()
} | supported_connection_types
self._found_tunnels = [
gateway for gateway in gateways if gateway.supports_tunnelling
]
@ -132,10 +133,7 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
fields = {
vol.Required(CONF_KNX_CONNECTION_TYPE): vol.In(supported_connection_types)
}
return self.async_show_form(
step_id="type", data_schema=vol.Schema(fields), errors=errors
)
return self.async_show_form(step_id="type", data_schema=vol.Schema(fields))
async def async_step_tunnel(self, user_input: dict | None = None) -> FlowResult:
"""Select a tunnel from a list. Will be skipped if the gateway scan was unsuccessful or if only one gateway was found."""
@ -164,37 +162,48 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
self, user_input: dict | None = None
) -> FlowResult:
"""Manually configure tunnel connection parameters. Fields default to preselected gateway if one was found."""
errors: dict = {}
if user_input is not None:
connection_type = user_input[CONF_KNX_TUNNELING_TYPE]
try:
_host = ip_v4_validator(user_input[CONF_HOST], multicast=False)
except vol.Invalid:
errors[CONF_HOST] = "invalid_ip_address"
entry_data: KNXConfigEntryData = {
**DEFAULT_ENTRY_DATA, # type: ignore[misc]
CONF_HOST: user_input[CONF_HOST],
CONF_PORT: user_input[CONF_PORT],
CONF_KNX_ROUTE_BACK: (
connection_type == CONF_KNX_LABEL_TUNNELING_UDP_ROUTE_BACK
),
CONF_KNX_LOCAL_IP: user_input.get(CONF_KNX_LOCAL_IP),
CONF_KNX_CONNECTION_TYPE: (
CONF_KNX_TUNNELING_TCP
if connection_type == CONF_KNX_LABEL_TUNNELING_TCP
else CONF_KNX_TUNNELING
),
}
if _local_ip := user_input.get(CONF_KNX_LOCAL_IP):
try:
_local_ip = ip_v4_validator(_local_ip, multicast=False)
except vol.Invalid:
errors[CONF_KNX_LOCAL_IP] = "invalid_ip_address"
if connection_type == CONF_KNX_LABEL_TUNNELING_TCP_SECURE:
self._tunneling_config = entry_data
return self.async_show_menu(
step_id="secure_tunneling",
menu_options=["secure_knxkeys", "secure_manual"],
if not errors:
connection_type = user_input[CONF_KNX_TUNNELING_TYPE]
entry_data = DEFAULT_ENTRY_DATA | KNXConfigEntryData(
host=_host,
port=user_input[CONF_PORT],
route_back=(
connection_type == CONF_KNX_LABEL_TUNNELING_UDP_ROUTE_BACK
),
local_ip=_local_ip,
connection_type=(
CONF_KNX_TUNNELING_TCP
if connection_type == CONF_KNX_LABEL_TUNNELING_TCP
else CONF_KNX_TUNNELING
),
)
return self.async_create_entry(
title=f"{CONF_KNX_TUNNELING.capitalize()} @ {user_input[CONF_HOST]}",
data=entry_data,
)
if connection_type == CONF_KNX_LABEL_TUNNELING_TCP_SECURE:
self._tunneling_config = entry_data
return self.async_show_menu(
step_id="secure_tunneling",
menu_options=["secure_knxkeys", "secure_manual"],
)
return self.async_create_entry(
title=f"Tunneling @ {_host}",
data=entry_data,
)
errors: dict = {}
connection_methods: list[str] = [
CONF_KNX_LABEL_TUNNELING_TCP,
CONF_KNX_LABEL_TUNNELING_UDP,
@ -231,20 +240,15 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
if user_input is not None:
assert self._tunneling_config
entry_data: KNXConfigEntryData = {
**self._tunneling_config, # type: ignore[misc]
CONF_KNX_SECURE_USER_ID: user_input[CONF_KNX_SECURE_USER_ID],
CONF_KNX_SECURE_USER_PASSWORD: user_input[
CONF_KNX_SECURE_USER_PASSWORD
],
CONF_KNX_SECURE_DEVICE_AUTHENTICATION: user_input[
CONF_KNX_SECURE_DEVICE_AUTHENTICATION
],
CONF_KNX_CONNECTION_TYPE: CONF_KNX_TUNNELING_TCP_SECURE,
}
entry_data = self._tunneling_config | KNXConfigEntryData(
connection_type=CONF_KNX_TUNNELING_TCP_SECURE,
device_authentication=user_input[CONF_KNX_SECURE_DEVICE_AUTHENTICATION],
user_id=user_input[CONF_KNX_SECURE_USER_ID],
user_password=user_input[CONF_KNX_SECURE_USER_PASSWORD],
)
return self.async_create_entry(
title=f"Secure {CONF_KNX_TUNNELING.capitalize()} @ {self._tunneling_config[CONF_HOST]}",
title=f"Secure Tunneling @ {self._tunneling_config[CONF_HOST]}",
data=entry_data,
)
@ -272,33 +276,29 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
errors = {}
if user_input is not None:
assert self._tunneling_config
storage_key = CONST_KNX_STORAGE_KEY + user_input[CONF_KNX_KNXKEY_FILENAME]
try:
assert self._tunneling_config
storage_key: str = (
CONST_KNX_STORAGE_KEY + user_input[CONF_KNX_KNXKEY_FILENAME]
)
load_key_ring(
self.hass.config.path(
STORAGE_DIR,
storage_key,
),
user_input[CONF_KNX_KNXKEY_PASSWORD],
path=self.hass.config.path(STORAGE_DIR, storage_key),
password=user_input[CONF_KNX_KNXKEY_PASSWORD],
)
except FileNotFoundError:
errors[CONF_KNX_KNXKEY_FILENAME] = "file_not_found"
except InvalidSignature:
errors[CONF_KNX_KNXKEY_PASSWORD] = "invalid_signature"
if not errors:
entry_data = self._tunneling_config | KNXConfigEntryData(
connection_type=CONF_KNX_TUNNELING_TCP_SECURE,
knxkeys_filename=storage_key,
knxkeys_password=user_input[CONF_KNX_KNXKEY_PASSWORD],
)
entry_data: KNXConfigEntryData = {
**self._tunneling_config, # type: ignore[misc]
CONF_KNX_KNXKEY_FILENAME: storage_key,
CONF_KNX_KNXKEY_PASSWORD: user_input[CONF_KNX_KNXKEY_PASSWORD],
CONF_KNX_CONNECTION_TYPE: CONF_KNX_TUNNELING_TCP_SECURE,
}
return self.async_create_entry(
title=f"Secure {CONF_KNX_TUNNELING.capitalize()} @ {self._tunneling_config[CONF_HOST]}",
title=f"Secure Tunneling @ {self._tunneling_config[CONF_HOST]}",
data=entry_data,
)
except InvalidSignature:
errors["base"] = "invalid_signature"
except FileNotFoundError:
errors["base"] = "file_not_found"
fields = {
vol.Required(CONF_KNX_KNXKEY_FILENAME): selector.selector({"text": {}}),
@ -311,33 +311,55 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
async def async_step_routing(self, user_input: dict | None = None) -> FlowResult:
"""Routing setup."""
if user_input is not None:
return self.async_create_entry(
title=CONF_KNX_ROUTING.capitalize(),
data={
**DEFAULT_ENTRY_DATA,
CONF_KNX_MCAST_GRP: user_input[CONF_KNX_MCAST_GRP],
CONF_KNX_MCAST_PORT: user_input[CONF_KNX_MCAST_PORT],
CONF_KNX_INDIVIDUAL_ADDRESS: user_input[
CONF_KNX_INDIVIDUAL_ADDRESS
],
CONF_KNX_LOCAL_IP: user_input.get(CONF_KNX_LOCAL_IP),
CONF_KNX_CONNECTION_TYPE: CONF_KNX_ROUTING,
},
)
errors: dict = {}
_individual_address = (
user_input[CONF_KNX_INDIVIDUAL_ADDRESS]
if user_input
else XKNX.DEFAULT_ADDRESS
)
_multicast_group = (
user_input[CONF_KNX_MCAST_GRP] if user_input else DEFAULT_MCAST_GRP
)
if user_input is not None:
try:
ia_validator(_individual_address)
except vol.Invalid:
errors[CONF_KNX_INDIVIDUAL_ADDRESS] = "invalid_individual_address"
try:
ip_v4_validator(_multicast_group, multicast=True)
except vol.Invalid:
errors[CONF_KNX_MCAST_GRP] = "invalid_ip_address"
if _local_ip := user_input.get(CONF_KNX_LOCAL_IP):
try:
ip_v4_validator(_local_ip, multicast=False)
except vol.Invalid:
errors[CONF_KNX_LOCAL_IP] = "invalid_ip_address"
if not errors:
entry_data = DEFAULT_ENTRY_DATA | KNXConfigEntryData(
connection_type=CONF_KNX_ROUTING,
individual_address=_individual_address,
multicast_group=_multicast_group,
multicast_port=user_input[CONF_KNX_MCAST_PORT],
local_ip=_local_ip,
)
return self.async_create_entry(
title=CONF_KNX_ROUTING.capitalize(), data=entry_data
)
fields = {
vol.Required(
CONF_KNX_INDIVIDUAL_ADDRESS, default=XKNX.DEFAULT_ADDRESS
CONF_KNX_INDIVIDUAL_ADDRESS, default=_individual_address
): _IA_SELECTOR,
vol.Required(CONF_KNX_MCAST_GRP, default=DEFAULT_MCAST_GRP): _IP_SELECTOR,
vol.Required(CONF_KNX_MCAST_GRP, default=_multicast_group): _IP_SELECTOR,
vol.Required(
CONF_KNX_MCAST_PORT, default=DEFAULT_MCAST_PORT
): _PORT_SELECTOR,
}
if self.show_advanced_options:
# Optional with default doesn't work properly in flow UI
fields[vol.Optional(CONF_KNX_LOCAL_IP)] = _IP_SELECTOR
return self.async_show_form(
@ -477,38 +499,34 @@ class KNXOptionsFlowHandler(OptionsFlow):
last_step=True,
)
entry_data = {
**DEFAULT_ENTRY_DATA,
**self.general_settings,
CONF_KNX_LOCAL_IP: self.general_settings.get(CONF_KNX_LOCAL_IP)
if self.general_settings.get(CONF_KNX_LOCAL_IP) != CONF_DEFAULT_LOCAL_IP
else None,
CONF_HOST: self.current_config.get(CONF_HOST, ""),
}
_local_ip = self.general_settings.get(CONF_KNX_LOCAL_IP)
entry_data = (
DEFAULT_ENTRY_DATA
| self.general_settings
| KNXConfigEntryData(
host=self.current_config.get(CONF_HOST, ""),
local_ip=_local_ip if _local_ip != CONF_DEFAULT_LOCAL_IP else None,
)
)
if user_input is not None:
connection_type = user_input[CONF_KNX_TUNNELING_TYPE]
entry_data = {
**entry_data,
CONF_HOST: user_input[CONF_HOST],
CONF_PORT: user_input[CONF_PORT],
CONF_KNX_ROUTE_BACK: (
connection_type == CONF_KNX_LABEL_TUNNELING_UDP_ROUTE_BACK
),
CONF_KNX_CONNECTION_TYPE: (
entry_data = entry_data | KNXConfigEntryData(
host=user_input[CONF_HOST],
port=user_input[CONF_PORT],
route_back=(connection_type == CONF_KNX_LABEL_TUNNELING_UDP_ROUTE_BACK),
connection_type=(
CONF_KNX_TUNNELING_TCP
if connection_type == CONF_KNX_LABEL_TUNNELING_TCP
else CONF_KNX_TUNNELING
),
}
)
entry_title = str(entry_data[CONF_KNX_CONNECTION_TYPE]).capitalize()
if entry_data[CONF_KNX_CONNECTION_TYPE] == CONF_KNX_TUNNELING:
entry_title = f"{CONF_KNX_TUNNELING.capitalize()} @ {entry_data[CONF_HOST]}"
entry_title = f"Tunneling @ {entry_data[CONF_HOST]}"
if entry_data[CONF_KNX_CONNECTION_TYPE] == CONF_KNX_TUNNELING_TCP:
entry_title = (
f"{CONF_KNX_TUNNELING.capitalize()} (TCP) @ {entry_data[CONF_HOST]}"
)
entry_title = f"Tunneling @ {entry_data[CONF_HOST]} (TCP)"
self.hass.config_entries.async_update_entry(
self.config_entry,