Fix mysensors typing (#51518)
* Fix device * Fix init * Fix gateway * Fix config flow * Fix helpers * Remove mysensors from typing ignore list
This commit is contained in:
parent
7a6d067eb4
commit
e73cdfab2f
7 changed files with 80 additions and 76 deletions
|
@ -27,7 +27,7 @@ from homeassistant.components.mysensors import (
|
|||
)
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.data_entry_flow import RESULT_TYPE_FORM, FlowResult
|
||||
from homeassistant.data_entry_flow import FlowResult
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
|
||||
from . import CONF_RETAIN, CONF_VERSION, DEFAULT_VERSION
|
||||
|
@ -111,7 +111,7 @@ class MySensorsConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
"""Set up config flow."""
|
||||
self._gw_type: str | None = None
|
||||
|
||||
async def async_step_import(self, user_input: dict[str, str] | None = None):
|
||||
async def async_step_import(self, user_input: dict[str, Any]) -> FlowResult:
|
||||
"""Import a config entry.
|
||||
|
||||
This method is called by async_setup and it has already
|
||||
|
@ -131,12 +131,14 @@ class MySensorsConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
else:
|
||||
user_input[CONF_GATEWAY_TYPE] = CONF_GATEWAY_TYPE_SERIAL
|
||||
|
||||
result: dict[str, Any] = await self.async_step_user(user_input=user_input)
|
||||
if result["type"] == RESULT_TYPE_FORM:
|
||||
return self.async_abort(reason=next(iter(result["errors"].values())))
|
||||
result: FlowResult = await self.async_step_user(user_input=user_input)
|
||||
if errors := result.get("errors"):
|
||||
return self.async_abort(reason=next(iter(errors.values())))
|
||||
return result
|
||||
|
||||
async def async_step_user(self, user_input: dict[str, str] | None = None):
|
||||
async def async_step_user(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> FlowResult:
|
||||
"""Create a config entry from frontend user input."""
|
||||
schema = {vol.Required(CONF_GATEWAY_TYPE): vol.In(CONF_GATEWAY_TYPE_ALL)}
|
||||
schema = vol.Schema(schema)
|
||||
|
@ -158,9 +160,11 @@ class MySensorsConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
|
||||
return self.async_show_form(step_id="user", data_schema=schema, errors=errors)
|
||||
|
||||
async def async_step_gw_serial(self, user_input: dict[str, str] | None = None):
|
||||
async def async_step_gw_serial(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> FlowResult:
|
||||
"""Create config entry for a serial gateway."""
|
||||
errors = {}
|
||||
errors: dict[str, str] = {}
|
||||
if user_input is not None:
|
||||
errors.update(
|
||||
await self.validate_common(CONF_GATEWAY_TYPE_SERIAL, errors, user_input)
|
||||
|
@ -187,7 +191,9 @@ class MySensorsConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
step_id="gw_serial", data_schema=schema, errors=errors
|
||||
)
|
||||
|
||||
async def async_step_gw_tcp(self, user_input: dict[str, str] | None = None):
|
||||
async def async_step_gw_tcp(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> FlowResult:
|
||||
"""Create a config entry for a tcp gateway."""
|
||||
errors = {}
|
||||
if user_input is not None:
|
||||
|
@ -225,7 +231,9 @@ class MySensorsConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
return True
|
||||
return False
|
||||
|
||||
async def async_step_gw_mqtt(self, user_input: dict[str, str] | None = None):
|
||||
async def async_step_gw_mqtt(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> FlowResult:
|
||||
"""Create a config entry for a mqtt gateway."""
|
||||
errors = {}
|
||||
if user_input is not None:
|
||||
|
@ -280,9 +288,7 @@ class MySensorsConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
)
|
||||
|
||||
@callback
|
||||
def _async_create_entry(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> FlowResult:
|
||||
def _async_create_entry(self, user_input: dict[str, Any]) -> FlowResult:
|
||||
"""Create the config entry."""
|
||||
return self.async_create_entry(
|
||||
title=f"{user_input[CONF_DEVICE]}",
|
||||
|
@ -296,55 +302,52 @@ class MySensorsConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
self,
|
||||
gw_type: ConfGatewayType,
|
||||
errors: dict[str, str],
|
||||
user_input: dict[str, str] | None = None,
|
||||
user_input: dict[str, Any],
|
||||
) -> dict[str, str]:
|
||||
"""Validate parameters common to all gateway types."""
|
||||
if user_input is not None:
|
||||
errors.update(_validate_version(user_input.get(CONF_VERSION)))
|
||||
errors.update(_validate_version(user_input[CONF_VERSION]))
|
||||
|
||||
if gw_type != CONF_GATEWAY_TYPE_MQTT:
|
||||
if gw_type == CONF_GATEWAY_TYPE_TCP:
|
||||
verification_func = is_socket_address
|
||||
else:
|
||||
verification_func = is_serial_port
|
||||
if gw_type != CONF_GATEWAY_TYPE_MQTT:
|
||||
if gw_type == CONF_GATEWAY_TYPE_TCP:
|
||||
verification_func = is_socket_address
|
||||
else:
|
||||
verification_func = is_serial_port
|
||||
|
||||
try:
|
||||
await self.hass.async_add_executor_job(
|
||||
verification_func, user_input.get(CONF_DEVICE)
|
||||
)
|
||||
except vol.Invalid:
|
||||
errors[CONF_DEVICE] = (
|
||||
"invalid_ip"
|
||||
if gw_type == CONF_GATEWAY_TYPE_TCP
|
||||
else "invalid_serial"
|
||||
)
|
||||
if CONF_PERSISTENCE_FILE in user_input:
|
||||
try:
|
||||
is_persistence_file(user_input[CONF_PERSISTENCE_FILE])
|
||||
except vol.Invalid:
|
||||
errors[CONF_PERSISTENCE_FILE] = "invalid_persistence_file"
|
||||
else:
|
||||
real_persistence_path = user_input[
|
||||
CONF_PERSISTENCE_FILE
|
||||
] = self._normalize_persistence_file(
|
||||
user_input[CONF_PERSISTENCE_FILE]
|
||||
)
|
||||
for other_entry in self._async_current_entries():
|
||||
if CONF_PERSISTENCE_FILE not in other_entry.data:
|
||||
continue
|
||||
if real_persistence_path == self._normalize_persistence_file(
|
||||
other_entry.data[CONF_PERSISTENCE_FILE]
|
||||
):
|
||||
errors[CONF_PERSISTENCE_FILE] = "duplicate_persistence_file"
|
||||
break
|
||||
try:
|
||||
await self.hass.async_add_executor_job(
|
||||
verification_func, user_input.get(CONF_DEVICE)
|
||||
)
|
||||
except vol.Invalid:
|
||||
errors[CONF_DEVICE] = (
|
||||
"invalid_ip"
|
||||
if gw_type == CONF_GATEWAY_TYPE_TCP
|
||||
else "invalid_serial"
|
||||
)
|
||||
if CONF_PERSISTENCE_FILE in user_input:
|
||||
try:
|
||||
is_persistence_file(user_input[CONF_PERSISTENCE_FILE])
|
||||
except vol.Invalid:
|
||||
errors[CONF_PERSISTENCE_FILE] = "invalid_persistence_file"
|
||||
else:
|
||||
real_persistence_path = user_input[
|
||||
CONF_PERSISTENCE_FILE
|
||||
] = self._normalize_persistence_file(user_input[CONF_PERSISTENCE_FILE])
|
||||
for other_entry in self._async_current_entries():
|
||||
if CONF_PERSISTENCE_FILE not in other_entry.data:
|
||||
continue
|
||||
if real_persistence_path == self._normalize_persistence_file(
|
||||
other_entry.data[CONF_PERSISTENCE_FILE]
|
||||
):
|
||||
errors[CONF_PERSISTENCE_FILE] = "duplicate_persistence_file"
|
||||
break
|
||||
|
||||
for other_entry in self._async_current_entries():
|
||||
if _is_same_device(gw_type, user_input, other_entry):
|
||||
errors["base"] = "already_configured"
|
||||
break
|
||||
for other_entry in self._async_current_entries():
|
||||
if _is_same_device(gw_type, user_input, other_entry):
|
||||
errors["base"] = "already_configured"
|
||||
break
|
||||
|
||||
# if no errors so far, try to connect
|
||||
if not errors and not await try_connect(self.hass, user_input):
|
||||
errors["base"] = "cannot_connect"
|
||||
# if no errors so far, try to connect
|
||||
if not errors and not await try_connect(self.hass, user_input):
|
||||
errors["base"] = "cannot_connect"
|
||||
|
||||
return errors
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue