Add option to connect to elkm1 non-secure when secure is discovered (#68735)

Co-authored-by: Glenn Waters <glenn@watrs.ca>
This commit is contained in:
J. Nick Koston 2022-03-28 21:10:49 -10:00 committed by GitHub
parent e85fb87438
commit 14fbda8412
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 190 additions and 33 deletions

View file

@ -37,7 +37,9 @@ from .discovery import (
CONF_DEVICE = "device"
NON_SECURE_PORT = 2101
SECURE_PORT = 2601
STANDARD_PORTS = {NON_SECURE_PORT, SECURE_PORT}
_LOGGER = logging.getLogger(__name__)
@ -48,6 +50,7 @@ PROTOCOL_MAP = {
"serial": "serial://",
}
VALIDATE_TIMEOUT = 35
BASE_SCHEMA = {
@ -60,6 +63,11 @@ ALL_PROTOCOLS = [*SECURE_PROTOCOLS, "non-secure", "serial"]
DEFAULT_SECURE_PROTOCOL = "secure"
DEFAULT_NON_SECURE_PROTOCOL = "non-secure"
PORT_PROTOCOL_MAP = {
NON_SECURE_PORT: DEFAULT_NON_SECURE_PROTOCOL,
SECURE_PORT: DEFAULT_SECURE_PROTOCOL,
}
async def validate_input(data: dict[str, str], mac: str | None) -> dict[str, str]:
"""Validate the user input allows us to connect.
@ -97,6 +105,13 @@ async def validate_input(data: dict[str, str], mac: str | None) -> dict[str, str
return {"title": device_name, CONF_HOST: url, CONF_PREFIX: slugify(prefix)}
def _address_from_discovery(device: ElkSystem):
"""Append the port only if its non-standard."""
if device.port in STANDARD_PORTS:
return device.ip_address
return f"{device.ip_address}:{device.port}"
def _make_url_from_data(data):
if host := data.get(CONF_HOST):
return host
@ -109,7 +124,7 @@ def _make_url_from_data(data):
def _placeholders_from_device(device: ElkSystem) -> dict[str, str]:
return {
"mac_address": _short_mac(device.mac_address),
"host": f"{device.ip_address}:{device.port}",
"host": _address_from_discovery(device),
}
@ -258,26 +273,26 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
device = self._discovered_device
assert device is not None
if user_input is not None:
user_input[CONF_ADDRESS] = f"{device.ip_address}:{device.port}"
user_input[CONF_ADDRESS] = _address_from_discovery(device)
if self._async_current_entries():
user_input[CONF_PREFIX] = _short_mac(device.mac_address)
else:
user_input[CONF_PREFIX] = ""
if device.port != SECURE_PORT:
user_input[CONF_PROTOCOL] = DEFAULT_NON_SECURE_PROTOCOL
errors, result = await self._async_create_or_error(user_input, False)
if not errors:
return result
base_schmea = BASE_SCHEMA.copy()
if device.port == SECURE_PORT:
base_schmea[
vol.Required(CONF_PROTOCOL, default=DEFAULT_SECURE_PROTOCOL)
] = vol.In(SECURE_PROTOCOLS)
default_proto = PORT_PROTOCOL_MAP.get(device.port, DEFAULT_SECURE_PROTOCOL)
return self.async_show_form(
step_id="discovered_connection",
data_schema=vol.Schema(base_schmea),
data_schema=vol.Schema(
{
**BASE_SCHEMA,
vol.Required(CONF_PROTOCOL, default=default_proto): vol.In(
ALL_PROTOCOLS
),
}
),
errors=errors,
description_placeholders=_placeholders_from_device(device),
)
@ -337,7 +352,10 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
)
self._abort_if_unique_id_configured()
return (await self._async_create_or_error(user_input, True))[1]
errors, result = await self._async_create_or_error(user_input, True)
if errors:
return self.async_abort(reason=list(errors.values())[0])
return result
def _url_already_configured(self, url):
"""See if we already have a elkm1 matching user input configured."""

View file

@ -38,6 +38,8 @@
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"unknown": "[%key:common::config_flow::error::unknown%]",
"already_in_progress": "[%key:common::config_flow::abort::already_in_progress%]",
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"already_configured": "An ElkM1 with this prefix is already configured",

View file

@ -4,7 +4,9 @@
"address_already_configured": "An ElkM1 with this address is already configured",
"already_configured": "An ElkM1 with this prefix is already configured",
"already_in_progress": "Configuration flow is already in progress",
"cannot_connect": "Failed to connect"
"cannot_connect": "Failed to connect",
"invalid_auth": "Invalid authentication",
"unknown": "Unexpected error"
},
"error": {
"cannot_connect": "Failed to connect",
@ -37,13 +39,7 @@
},
"user": {
"data": {
"address": "The IP address or domain or serial port if connecting via serial.",
"device": "Device",
"password": "Password",
"prefix": "A unique prefix (leave blank if you only have one ElkM1).",
"protocol": "Protocol",
"temperature_unit": "The temperature unit ElkM1 uses.",
"username": "Username"
"device": "Device"
},
"description": "Choose a discovered system or 'Manual Entry' if no devices have been discovered.",
"title": "Connect to Elk-M1 Control"

View file

@ -9,6 +9,7 @@ MOCK_IP_ADDRESS = "127.0.0.1"
MOCK_MAC = "aa:bb:cc:dd:ee:ff"
ELK_DISCOVERY = ElkSystem(MOCK_MAC, MOCK_IP_ADDRESS, 2601)
ELK_NON_SECURE_DISCOVERY = ElkSystem(MOCK_MAC, MOCK_IP_ADDRESS, 2101)
ELK_DISCOVERY_NON_STANDARD_PORT = ElkSystem(MOCK_MAC, MOCK_IP_ADDRESS, 444)
def mock_elk(invalid_auth=None, sync_complete=None, exception=None):

View file

@ -12,6 +12,7 @@ from homeassistant.data_entry_flow import RESULT_TYPE_ABORT, RESULT_TYPE_FORM
from . import (
ELK_DISCOVERY,
ELK_DISCOVERY_NON_STANDARD_PORT,
ELK_NON_SECURE_DISCOVERY,
MOCK_IP_ADDRESS,
MOCK_MAC,
@ -24,6 +25,8 @@ from tests.common import MockConfigEntry
DHCP_DISCOVERY = dhcp.DhcpServiceInfo(MOCK_IP_ADDRESS, "", MOCK_MAC)
ELK_DISCOVERY_INFO = asdict(ELK_DISCOVERY)
ELK_DISCOVERY_INFO_NON_STANDARD_PORT = asdict(ELK_DISCOVERY_NON_STANDARD_PORT)
MODULE = "homeassistant.components.elkm1"
@ -322,7 +325,7 @@ async def test_form_user_with_secure_elk_with_discovery(hass):
assert result3["title"] == "ElkM1 ddeeff"
assert result3["data"] == {
"auto_configure": True,
"host": "elks://127.0.0.1:2601",
"host": "elks://127.0.0.1",
"password": "test-password",
"prefix": "",
"username": "test-username",
@ -822,6 +825,102 @@ async def test_form_import_device_discovered(hass):
assert len(mock_setup_entry.mock_calls) == 1
async def test_form_import_non_secure_device_discovered(hass):
"""Test we can import non-secure with discovery."""
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
with _patch_discovery(), _patch_elk(elk=mocked_elk), patch(
"homeassistant.components.elkm1.async_setup", return_value=True
) as mock_setup, patch(
"homeassistant.components.elkm1.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
"host": "elk://127.0.0.1:2101",
"username": "",
"password": "",
"auto_configure": True,
"prefix": "ohana",
},
)
await hass.async_block_till_done()
assert result["type"] == "create_entry"
assert result["title"] == "ohana"
assert result["result"].unique_id == MOCK_MAC
assert result["data"] == {
"auto_configure": True,
"host": "elk://127.0.0.1:2101",
"password": "",
"prefix": "ohana",
"username": "",
}
assert len(mock_setup.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1
async def test_form_import_non_secure_non_stanadard_port_device_discovered(hass):
"""Test we can import non-secure non standard port with discovery."""
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
with _patch_discovery(), _patch_elk(elk=mocked_elk), patch(
"homeassistant.components.elkm1.async_setup", return_value=True
) as mock_setup, patch(
"homeassistant.components.elkm1.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
"host": "elk://127.0.0.1:444",
"username": "",
"password": "",
"auto_configure": True,
"prefix": "ohana",
},
)
await hass.async_block_till_done()
assert result["type"] == "create_entry"
assert result["title"] == "ohana"
assert result["result"].unique_id == MOCK_MAC
assert result["data"] == {
"auto_configure": True,
"host": "elk://127.0.0.1:444",
"password": "",
"prefix": "ohana",
"username": "",
}
assert len(mock_setup.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1
async def test_form_import_non_secure_device_discovered_invalid_auth(hass):
"""Test we abort import with invalid auth."""
mocked_elk = mock_elk(invalid_auth=True, sync_complete=False)
with _patch_discovery(), _patch_elk(elk=mocked_elk):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
"host": "elks://127.0.0.1",
"username": "invalid",
"password": "",
"auto_configure": False,
"prefix": "ohana",
},
)
await hass.async_block_till_done()
assert result["type"] == "abort"
assert result["reason"] == "invalid_auth"
async def test_form_import_existing(hass):
"""Test we abort on existing import."""
config_entry = MockConfigEntry(
@ -999,7 +1098,52 @@ async def test_discovered_by_discovery(hass):
assert result2["title"] == "ElkM1 ddeeff"
assert result2["data"] == {
"auto_configure": True,
"host": "elks://127.0.0.1:2601",
"host": "elks://127.0.0.1",
"password": "test-password",
"prefix": "",
"username": "test-username",
}
assert len(mock_setup.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1
async def test_discovered_by_discovery_non_standard_port(hass):
"""Test we can setup when discovered from discovery with a non-standard port."""
with _patch_discovery(), _patch_elk():
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_INTEGRATION_DISCOVERY},
data=ELK_DISCOVERY_INFO_NON_STANDARD_PORT,
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "discovered_connection"
assert result["errors"] == {}
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
with _patch_discovery(), _patch_elk(elk=mocked_elk), patch(
"homeassistant.components.elkm1.async_setup", return_value=True
) as mock_setup, patch(
"homeassistant.components.elkm1.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "test-username",
"password": "test-password",
},
)
await hass.async_block_till_done()
assert result2["type"] == "create_entry"
assert result2["title"] == "ElkM1 ddeeff"
assert result2["data"] == {
"auto_configure": True,
"host": "elks://127.0.0.1:444",
"password": "test-password",
"prefix": "",
"username": "test-username",
@ -1063,7 +1207,7 @@ async def test_discovered_by_dhcp_udp_responds(hass):
assert result2["title"] == "ElkM1 ddeeff"
assert result2["data"] == {
"auto_configure": True,
"host": "elks://127.0.0.1:2601",
"host": "elks://127.0.0.1",
"password": "test-password",
"prefix": "",
"username": "test-username",
@ -1098,8 +1242,7 @@ async def test_discovered_by_dhcp_udp_responds_with_nonsecure_port(hass):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "test-username",
"password": "test-password",
"protocol": "non-secure",
},
)
await hass.async_block_till_done()
@ -1108,10 +1251,10 @@ async def test_discovered_by_dhcp_udp_responds_with_nonsecure_port(hass):
assert result2["title"] == "ElkM1 ddeeff"
assert result2["data"] == {
"auto_configure": True,
"host": "elk://127.0.0.1:2101",
"password": "test-password",
"host": "elk://127.0.0.1",
"password": "",
"prefix": "",
"username": "test-username",
"username": "",
}
assert len(mock_setup.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1
@ -1146,10 +1289,7 @@ async def test_discovered_by_dhcp_udp_responds_existing_config_entry(hass):
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "test-username",
"password": "test-password",
},
{"username": "test-username", "password": "test-password"},
)
await hass.async_block_till_done()
@ -1157,7 +1297,7 @@ async def test_discovered_by_dhcp_udp_responds_existing_config_entry(hass):
assert result2["title"] == "ElkM1 ddeeff"
assert result2["data"] == {
"auto_configure": True,
"host": "elks://127.0.0.1:2601",
"host": "elks://127.0.0.1",
"password": "test-password",
"prefix": "ddeeff",
"username": "test-username",