Use Mapping for async_step_reauth (a-e) (#72763)

* Adjust abode

* Adjust airvisual

* Adjust aladdin_connect

* Adjust ambee

* Adjust aussie-broadband

* Adjust brunt

* Adjust cloudflare

* Adjust deconz

* Adjust deluge

* Adjust devolo_home_control

* Adjust efergy

* Adjust esphome
This commit is contained in:
epenet 2022-06-20 09:08:11 +02:00 committed by GitHub
parent 1f4add0119
commit bd29b91867
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 29 additions and 20 deletions

View file

@ -1,6 +1,7 @@
"""Config flow for the Abode Security System component.""" """Config flow for the Abode Security System component."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
from http import HTTPStatus from http import HTTPStatus
from typing import Any, cast from typing import Any, cast
@ -149,7 +150,7 @@ class AbodeFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return await self._async_abode_mfa_login() return await self._async_abode_mfa_login()
async def async_step_reauth(self, config: dict[str, Any]) -> FlowResult: async def async_step_reauth(self, config: Mapping[str, Any]) -> FlowResult:
"""Handle reauthorization request from Abode.""" """Handle reauthorization request from Abode."""
self._username = config[CONF_USERNAME] self._username = config[CONF_USERNAME]

View file

@ -2,6 +2,8 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from collections.abc import Mapping
from typing import Any
from pyairvisual import CloudAPI, NodeSamba from pyairvisual import CloudAPI, NodeSamba
from pyairvisual.errors import ( from pyairvisual.errors import (
@ -70,7 +72,7 @@ class AirVisualFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
def __init__(self) -> None: def __init__(self) -> None:
"""Initialize the config flow.""" """Initialize the config flow."""
self._entry_data_for_reauth: dict[str, str] = {} self._entry_data_for_reauth: Mapping[str, Any] = {}
self._geo_id: str | None = None self._geo_id: str | None = None
@property @property
@ -219,7 +221,7 @@ class AirVisualFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
data={**user_input, CONF_INTEGRATION_TYPE: INTEGRATION_TYPE_NODE_PRO}, data={**user_input, CONF_INTEGRATION_TYPE: INTEGRATION_TYPE_NODE_PRO},
) )
async def async_step_reauth(self, data: dict[str, str]) -> FlowResult: async def async_step_reauth(self, data: Mapping[str, Any]) -> FlowResult:
"""Handle configuration by re-auth.""" """Handle configuration by re-auth."""
self._entry_data_for_reauth = data self._entry_data_for_reauth = data
self._geo_id = async_get_geography_id(data) self._geo_id = async_get_geography_id(data)

View file

@ -1,6 +1,7 @@
"""Config flow for Aladdin Connect cover integration.""" """Config flow for Aladdin Connect cover integration."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
import logging import logging
from typing import Any from typing import Any
@ -44,9 +45,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1 VERSION = 1
entry: config_entries.ConfigEntry | None entry: config_entries.ConfigEntry | None
async def async_step_reauth( async def async_step_reauth(self, data: Mapping[str, Any]) -> FlowResult:
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle re-authentication with Aladdin Connect.""" """Handle re-authentication with Aladdin Connect."""
self.entry = self.hass.config_entries.async_get_entry(self.context["entry_id"]) self.entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])

View file

@ -1,6 +1,7 @@
"""Config flow to configure the Ambee integration.""" """Config flow to configure the Ambee integration."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
from typing import Any from typing import Any
from ambee import Ambee, AmbeeAuthenticationError, AmbeeError from ambee import Ambee, AmbeeAuthenticationError, AmbeeError
@ -71,7 +72,7 @@ class AmbeeFlowHandler(ConfigFlow, domain=DOMAIN):
errors=errors, errors=errors,
) )
async def async_step_reauth(self, data: dict[str, Any]) -> FlowResult: async def async_step_reauth(self, data: Mapping[str, Any]) -> FlowResult:
"""Handle initiation of re-authentication with Ambee.""" """Handle initiation of re-authentication with Ambee."""
self.entry = self.hass.config_entries.async_get_entry(self.context["entry_id"]) self.entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])
return await self.async_step_reauth_confirm() return await self.async_step_reauth_confirm()

View file

@ -1,6 +1,7 @@
"""Config flow for Aussie Broadband integration.""" """Config flow for Aussie Broadband integration."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
from typing import Any from typing import Any
from aiohttp import ClientError from aiohttp import ClientError
@ -76,9 +77,9 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
errors=errors, errors=errors,
) )
async def async_step_reauth(self, user_input: dict[str, str]) -> FlowResult: async def async_step_reauth(self, data: Mapping[str, Any]) -> FlowResult:
"""Handle reauth on credential failure.""" """Handle reauth on credential failure."""
self._reauth_username = user_input[CONF_USERNAME] self._reauth_username = data[CONF_USERNAME]
return await self.async_step_reauth_confirm() return await self.async_step_reauth_confirm()

View file

@ -1,6 +1,7 @@
"""Config flow for brunt integration.""" """Config flow for brunt integration."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
import logging import logging
from typing import Any from typing import Any
@ -77,9 +78,7 @@ class BruntConfigFlow(ConfigFlow, domain=DOMAIN):
data=user_input, data=user_input,
) )
async def async_step_reauth( async def async_step_reauth(self, data: Mapping[str, Any]) -> FlowResult:
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Perform reauth upon an API authentication error.""" """Perform reauth upon an API authentication error."""
self._reauth_entry = self.hass.config_entries.async_get_entry( self._reauth_entry = self.hass.config_entries.async_get_entry(
self.context["entry_id"] self.context["entry_id"]

View file

@ -1,6 +1,7 @@
"""Config flow for Cloudflare integration.""" """Config flow for Cloudflare integration."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
import logging import logging
from typing import Any from typing import Any
@ -97,7 +98,7 @@ class CloudflareConfigFlow(ConfigFlow, domain=DOMAIN):
self.zones: list[str] | None = None self.zones: list[str] | None = None
self.records: list[str] | None = None self.records: list[str] | None = None
async def async_step_reauth(self, data: dict[str, Any]) -> FlowResult: async def async_step_reauth(self, data: Mapping[str, Any]) -> FlowResult:
"""Handle initiation of re-authentication with Cloudflare.""" """Handle initiation of re-authentication with Cloudflare."""
self.entry = self.hass.config_entries.async_get_entry(self.context["entry_id"]) self.entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])
return await self.async_step_reauth_confirm() return await self.async_step_reauth_confirm()

View file

@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from collections.abc import Mapping
from pprint import pformat from pprint import pformat
from typing import Any, cast from typing import Any, cast
from urllib.parse import urlparse from urllib.parse import urlparse
@ -204,7 +205,7 @@ class DeconzFlowHandler(ConfigFlow, domain=DOMAIN):
}, },
) )
async def async_step_reauth(self, config: dict[str, Any]) -> FlowResult: async def async_step_reauth(self, config: Mapping[str, Any]) -> FlowResult:
"""Trigger a reauthentication flow.""" """Trigger a reauthentication flow."""
self.context["title_placeholders"] = {CONF_HOST: config[CONF_HOST]} self.context["title_placeholders"] = {CONF_HOST: config[CONF_HOST]}

View file

@ -1,6 +1,7 @@
"""Config flow for the Deluge integration.""" """Config flow for the Deluge integration."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
import socket import socket
from ssl import SSLError from ssl import SSLError
from typing import Any from typing import Any
@ -75,7 +76,7 @@ class DelugeFlowHandler(ConfigFlow, domain=DOMAIN):
) )
return self.async_show_form(step_id="user", data_schema=schema, errors=errors) return self.async_show_form(step_id="user", data_schema=schema, errors=errors)
async def async_step_reauth(self, config: dict[str, Any]) -> FlowResult: async def async_step_reauth(self, config: Mapping[str, Any]) -> FlowResult:
"""Handle a reauthorization flow request.""" """Handle a reauthorization flow request."""
return await self.async_step_user() return await self.async_step_user()

View file

@ -1,6 +1,7 @@
"""Config flow to configure the devolo home control integration.""" """Config flow to configure the devolo home control integration."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
from typing import Any from typing import Any
import voluptuous as vol import voluptuous as vol
@ -67,14 +68,14 @@ class DevoloHomeControlFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
step_id="zeroconf_confirm", errors={"base": "invalid_auth"} step_id="zeroconf_confirm", errors={"base": "invalid_auth"}
) )
async def async_step_reauth(self, user_input: dict[str, Any]) -> FlowResult: async def async_step_reauth(self, data: Mapping[str, Any]) -> FlowResult:
"""Handle reauthentication.""" """Handle reauthentication."""
self._reauth_entry = self.hass.config_entries.async_get_entry( self._reauth_entry = self.hass.config_entries.async_get_entry(
self.context["entry_id"] self.context["entry_id"]
) )
self._url = user_input[CONF_MYDEVOLO] self._url = data[CONF_MYDEVOLO]
self.data_schema = { self.data_schema = {
vol.Required(CONF_USERNAME, default=user_input[CONF_USERNAME]): str, vol.Required(CONF_USERNAME, default=data[CONF_USERNAME]): str,
vol.Required(CONF_PASSWORD): str, vol.Required(CONF_PASSWORD): str,
} }
return await self.async_step_reauth_confirm() return await self.async_step_reauth_confirm()

View file

@ -1,6 +1,7 @@
"""Config flow for Efergy integration.""" """Config flow for Efergy integration."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
from typing import Any from typing import Any
from pyefergy import Efergy, exceptions from pyefergy import Efergy, exceptions
@ -52,7 +53,7 @@ class EfergyFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
errors=errors, errors=errors,
) )
async def async_step_reauth(self, config: dict[str, Any]) -> FlowResult: async def async_step_reauth(self, config: Mapping[str, Any]) -> FlowResult:
"""Handle a reauthorization flow request.""" """Handle a reauthorization flow request."""
return await self.async_step_user() return await self.async_step_user()

View file

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
from collections import OrderedDict from collections import OrderedDict
from collections.abc import Mapping
from typing import Any from typing import Any
from aioesphomeapi import ( from aioesphomeapi import (
@ -63,7 +64,7 @@ class EsphomeFlowHandler(ConfigFlow, domain=DOMAIN):
"""Handle a flow initialized by the user.""" """Handle a flow initialized by the user."""
return await self._async_step_user_base(user_input=user_input) return await self._async_step_user_base(user_input=user_input)
async def async_step_reauth(self, data: dict[str, Any] | None = None) -> FlowResult: async def async_step_reauth(self, data: Mapping[str, Any]) -> FlowResult:
"""Handle a flow initialized by a reauth event.""" """Handle a flow initialized by a reauth event."""
entry = self.hass.config_entries.async_get_entry(self.context["entry_id"]) entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])
assert entry is not None assert entry is not None