Standardize import step variable name (part 2) (#124679)

This commit is contained in:
epenet 2024-08-27 11:22:35 +02:00 committed by GitHub
parent 0d2f22838a
commit 6b0428774d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 73 additions and 98 deletions

View file

@ -80,11 +80,9 @@ class AirVisualProFlowHandler(ConfigFlow, domain=DOMAIN):
"""Initialize."""
self._reauth_entry: ConfigEntry | None = None
async def async_step_import(
self, import_config: dict[str, Any]
) -> ConfigFlowResult:
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Import a config entry from configuration.yaml."""
return await self.async_step_user(import_config)
return await self.async_step_user(import_data)
async def async_step_reauth(
self, entry_data: Mapping[str, Any]

View file

@ -1,6 +1,5 @@
"""Config flow for AWS component."""
from collections.abc import Mapping
from typing import Any
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
@ -13,11 +12,9 @@ class AWSFlowHandler(ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_import(
self, user_input: Mapping[str, Any]
) -> ConfigFlowResult:
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Import a config entry."""
if self._async_current_entries():
return self.async_abort(reason="single_instance_allowed")
return self.async_create_entry(title="configuration.yaml", data=user_input)
return self.async_create_entry(title="configuration.yaml", data=import_data)

View file

@ -154,17 +154,15 @@ class AEHConfigFlow(ConfigFlow, domain=DOMAIN):
options=self._options,
)
async def async_step_import(
self, import_config: dict[str, Any]
) -> ConfigFlowResult:
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Import config from configuration.yaml."""
if self._async_current_entries():
return self.async_abort(reason="single_instance_allowed")
if CONF_SEND_INTERVAL in import_config:
self._options[CONF_SEND_INTERVAL] = import_config.pop(CONF_SEND_INTERVAL)
if CONF_MAX_DELAY in import_config:
self._options[CONF_MAX_DELAY] = import_config.pop(CONF_MAX_DELAY)
self._data = import_config
if CONF_SEND_INTERVAL in import_data:
self._options[CONF_SEND_INTERVAL] = import_data.pop(CONF_SEND_INTERVAL)
if CONF_MAX_DELAY in import_data:
self._options[CONF_MAX_DELAY] = import_data.pop(CONF_MAX_DELAY)
self._data = import_data
errors = await validate_data(self._data)
if errors:
return self.async_abort(reason=errors["base"])

View file

@ -19,9 +19,7 @@ class FirmataFlowHandler(ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_import(
self, import_config: dict[str, Any]
) -> ConfigFlowResult:
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Import a firmata board as a config entry.
This flow is triggered by `async_setup` for configured boards.
@ -30,14 +28,14 @@ class FirmataFlowHandler(ConfigFlow, domain=DOMAIN):
config entry yet (based on entry_id). It validates a connection
and then adds the entry.
"""
name = f"serial-{import_config[CONF_SERIAL_PORT]}"
import_config[CONF_NAME] = name
name = f"serial-{import_data[CONF_SERIAL_PORT]}"
import_data[CONF_NAME] = name
# Connect to the board to verify connection and then shutdown
# If either fail then we cannot continue
_LOGGER.debug("Connecting to Firmata board %s to test connection", name)
try:
api = await get_board(import_config)
api = await get_board(import_data)
await api.shutdown()
except RuntimeError as err:
_LOGGER.error("Error connecting to PyMata board %s: %s", name, err)
@ -54,6 +52,4 @@ class FirmataFlowHandler(ConfigFlow, domain=DOMAIN):
return self.async_abort(reason="cannot_connect")
_LOGGER.debug("Connection test to Firmata board %s successful", name)
return self.async_create_entry(
title=import_config[CONF_NAME], data=import_config
)
return self.async_create_entry(title=import_data[CONF_NAME], data=import_data)

View file

@ -9,6 +9,7 @@ import pypck
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.config_entries import ConfigFlowResult
from homeassistant.const import (
CONF_BASE,
CONF_DEVICES,
@ -107,12 +108,10 @@ class LcnFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_import(
self, data: ConfigType
) -> config_entries.ConfigFlowResult:
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Import existing configuration from LCN."""
# validate the imported connection parameters
if error := await validate_connection(data):
if error := await validate_connection(import_data):
async_create_issue(
self.hass,
DOMAIN,
@ -144,17 +143,19 @@ class LcnFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
)
# check if we already have a host with the same address configured
if entry := get_config_entry(self.hass, data):
if entry := get_config_entry(self.hass, import_data):
entry.source = config_entries.SOURCE_IMPORT
# Cleanup entity and device registry, if we imported from configuration.yaml to
# remove orphans when entities were removed from configuration
purge_entity_registry(self.hass, entry.entry_id, data)
purge_device_registry(self.hass, entry.entry_id, data)
purge_entity_registry(self.hass, entry.entry_id, import_data)
purge_device_registry(self.hass, entry.entry_id, import_data)
self.hass.config_entries.async_update_entry(entry, data=data)
self.hass.config_entries.async_update_entry(entry, data=import_data)
return self.async_abort(reason="existing_configuration_updated")
return self.async_create_entry(title=f"{data[CONF_HOST]}", data=data)
return self.async_create_entry(
title=f"{import_data[CONF_HOST]}", data=import_data
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None

View file

@ -67,19 +67,17 @@ class MPDConfigFlow(ConfigFlow, domain=DOMAIN):
errors=errors,
)
async def async_step_import(
self, import_config: dict[str, Any]
) -> ConfigFlowResult:
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Attempt to import the existing configuration."""
self._async_abort_entries_match({CONF_HOST: import_config[CONF_HOST]})
self._async_abort_entries_match({CONF_HOST: import_data[CONF_HOST]})
client = MPDClient()
client.timeout = 30
client.idletimeout = 10
try:
async with timeout(35):
await client.connect(import_config[CONF_HOST], import_config[CONF_PORT])
if CONF_PASSWORD in import_config:
await client.password(import_config[CONF_PASSWORD])
await client.connect(import_data[CONF_HOST], import_data[CONF_PORT])
if CONF_PASSWORD in import_data:
await client.password(import_data[CONF_PASSWORD])
with suppress(mpd.ConnectionError):
client.disconnect()
except (
@ -94,10 +92,10 @@ class MPDConfigFlow(ConfigFlow, domain=DOMAIN):
return self.async_abort(reason="unknown")
return self.async_create_entry(
title=import_config.get(CONF_NAME, "Music Player Daemon"),
title=import_data.get(CONF_NAME, "Music Player Daemon"),
data={
CONF_HOST: import_config[CONF_HOST],
CONF_PORT: import_config[CONF_PORT],
CONF_PASSWORD: import_config.get(CONF_PASSWORD),
CONF_HOST: import_data[CONF_HOST],
CONF_PORT: import_data[CONF_PORT],
CONF_PASSWORD: import_data.get(CONF_PASSWORD),
},
)

View file

@ -157,11 +157,9 @@ class PanasonicVieraConfigFlow(ConfigFlow, domain=DOMAIN):
errors=errors,
)
async def async_step_import(
self, import_config: dict[str, Any]
) -> ConfigFlowResult:
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Import a config entry from configuration.yaml."""
return await self.async_step_user(user_input=import_config)
return await self.async_step_user(user_input=import_data)
async def async_load_data(self, config: dict[str, Any]) -> None:
"""Load the data."""

View file

@ -80,13 +80,11 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
step_id="user", data_schema=DATA_SCHEMA, errors=errors
)
async def async_step_import(
self, import_config: dict[str, Any]
) -> ConfigFlowResult:
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Attempt to import the existing configuration."""
self._async_abort_entries_match({CONF_HOST: import_config[CONF_HOST]})
host = import_config[CONF_HOST]
port = import_config.get(CONF_PORT, 9621)
self._async_abort_entries_match({CONF_HOST: import_data[CONF_HOST]})
host = import_data[CONF_HOST]
port = import_data.get(CONF_PORT, 9621)
# Connection logic is repeated here since this method will be removed in future releases
russ = Russound(self.hass.loop, host, port)

View file

@ -28,22 +28,20 @@ class SleepIQFlowHandler(ConfigFlow, domain=DOMAIN):
"""Initialize the config flow."""
self._reauth_entry: ConfigEntry | None = None
async def async_step_import(
self, import_config: dict[str, Any]
) -> ConfigFlowResult:
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Import a SleepIQ account as a config entry.
This flow is triggered by 'async_setup' for configured accounts.
"""
await self.async_set_unique_id(import_config[CONF_USERNAME].lower())
await self.async_set_unique_id(import_data[CONF_USERNAME].lower())
self._abort_if_unique_id_configured()
if error := await try_connection(self.hass, import_config):
if error := await try_connection(self.hass, import_data):
_LOGGER.error("Could not authenticate with SleepIQ server: %s", error)
return self.async_abort(reason=error)
return self.async_create_entry(
title=import_config[CONF_USERNAME], data=import_config
title=import_data[CONF_USERNAME], data=import_data
)
async def async_step_user(

View file

@ -71,11 +71,9 @@ class TileFlowHandler(ConfigFlow, domain=DOMAIN):
return self.async_create_entry(title=self._username, data=data)
async def async_step_import(
self, import_config: dict[str, Any]
) -> ConfigFlowResult:
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Import a config entry from configuration.yaml."""
return await self.async_step_user(import_config)
return await self.async_step_user(import_data)
async def async_step_reauth(
self, entry_data: Mapping[str, Any]

View file

@ -2,7 +2,6 @@
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from pytraccar import ApiClient, ServerModel, TraccarException
@ -161,36 +160,34 @@ class TraccarServerConfigFlow(ConfigFlow, domain=DOMAIN):
errors=errors,
)
async def async_step_import(
self, import_info: Mapping[str, Any]
) -> ConfigFlowResult:
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Import an entry."""
configured_port = str(import_info[CONF_PORT])
configured_port = str(import_data[CONF_PORT])
self._async_abort_entries_match(
{
CONF_HOST: import_info[CONF_HOST],
CONF_HOST: import_data[CONF_HOST],
CONF_PORT: configured_port,
}
)
if "all_events" in (imported_events := import_info.get("event", [])):
if "all_events" in (imported_events := import_data.get("event", [])):
events = list(EVENTS.values())
else:
events = imported_events
return self.async_create_entry(
title=f"{import_info[CONF_HOST]}:{configured_port}",
title=f"{import_data[CONF_HOST]}:{configured_port}",
data={
CONF_HOST: import_info[CONF_HOST],
CONF_HOST: import_data[CONF_HOST],
CONF_PORT: configured_port,
CONF_SSL: import_info.get(CONF_SSL, False),
CONF_VERIFY_SSL: import_info.get(CONF_VERIFY_SSL, True),
CONF_USERNAME: import_info[CONF_USERNAME],
CONF_PASSWORD: import_info[CONF_PASSWORD],
CONF_SSL: import_data.get(CONF_SSL, False),
CONF_VERIFY_SSL: import_data.get(CONF_VERIFY_SSL, True),
CONF_USERNAME: import_data[CONF_USERNAME],
CONF_PASSWORD: import_data[CONF_PASSWORD],
},
options={
CONF_MAX_ACCURACY: import_info[CONF_MAX_ACCURACY],
CONF_MAX_ACCURACY: import_data[CONF_MAX_ACCURACY],
CONF_EVENTS: events,
CONF_CUSTOM_ATTRIBUTES: import_info.get("monitored_conditions", []),
CONF_SKIP_ACCURACY_FILTER_FOR: import_info.get(
CONF_CUSTOM_ATTRIBUTES: import_data.get("monitored_conditions", []),
CONF_SKIP_ACCURACY_FILTER_FOR: import_data.get(
"skip_accuracy_filter_on", []
),
},

View file

@ -285,9 +285,7 @@ class VizioConfigFlow(ConfigFlow, domain=DOMAIN):
return self.async_show_form(step_id="user", data_schema=schema, errors=errors)
async def async_step_import(
self, import_config: dict[str, Any]
) -> ConfigFlowResult:
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Import a config entry from configuration.yaml."""
# Check if new config entry matches any existing config entries
for entry in self._async_current_entries():
@ -296,28 +294,28 @@ class VizioConfigFlow(ConfigFlow, domain=DOMAIN):
continue
if await self.hass.async_add_executor_job(
_host_is_same, entry.data[CONF_HOST], import_config[CONF_HOST]
_host_is_same, entry.data[CONF_HOST], import_data[CONF_HOST]
):
updated_options: dict[str, Any] = {}
updated_data: dict[str, Any] = {}
remove_apps = False
if entry.data[CONF_HOST] != import_config[CONF_HOST]:
updated_data[CONF_HOST] = import_config[CONF_HOST]
if entry.data[CONF_HOST] != import_data[CONF_HOST]:
updated_data[CONF_HOST] = import_data[CONF_HOST]
if entry.data[CONF_NAME] != import_config[CONF_NAME]:
updated_data[CONF_NAME] = import_config[CONF_NAME]
if entry.data[CONF_NAME] != import_data[CONF_NAME]:
updated_data[CONF_NAME] = import_data[CONF_NAME]
# Update entry.data[CONF_APPS] if import_config[CONF_APPS] differs, and
# pop entry.data[CONF_APPS] if import_config[CONF_APPS] is not specified
if entry.data.get(CONF_APPS) != import_config.get(CONF_APPS):
if not import_config.get(CONF_APPS):
if entry.data.get(CONF_APPS) != import_data.get(CONF_APPS):
if not import_data.get(CONF_APPS):
remove_apps = True
else:
updated_options[CONF_APPS] = import_config[CONF_APPS]
updated_options[CONF_APPS] = import_data[CONF_APPS]
if entry.data.get(CONF_VOLUME_STEP) != import_config[CONF_VOLUME_STEP]:
updated_options[CONF_VOLUME_STEP] = import_config[CONF_VOLUME_STEP]
if entry.data.get(CONF_VOLUME_STEP) != import_data[CONF_VOLUME_STEP]:
updated_options[CONF_VOLUME_STEP] = import_data[CONF_VOLUME_STEP]
if updated_options or updated_data or remove_apps:
new_data = entry.data.copy()
@ -345,9 +343,9 @@ class VizioConfigFlow(ConfigFlow, domain=DOMAIN):
self._must_show_form = True
# Store config key/value pairs that are not configurable in user step so they
# don't get lost on user step
if import_config.get(CONF_APPS):
self._apps = copy.deepcopy(import_config[CONF_APPS])
return await self.async_step_user(user_input=import_config)
if import_data.get(CONF_APPS):
self._apps = copy.deepcopy(import_data[CONF_APPS])
return await self.async_step_user(user_input=import_data)
async def async_step_zeroconf(
self, discovery_info: zeroconf.ZeroconfServiceInfo