Convert UpCloud to config flow, improve error handling (#37941)

This commit is contained in:
Ville Skyttä 2020-10-16 00:26:01 +03:00 committed by GitHub
parent d790de1346
commit a9f342ef74
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 479 additions and 99 deletions

View file

@ -1,10 +1,17 @@
"""Support for UpCloud.""" """Support for UpCloud."""
import dataclasses
from datetime import timedelta from datetime import timedelta
import logging import logging
from typing import Dict, List
import requests.exceptions
import upcloud_api import upcloud_api
import voluptuous as vol import voluptuous as vol
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import ( from homeassistant.const import (
CONF_PASSWORD, CONF_PASSWORD,
CONF_SCAN_INTERVAL, CONF_SCAN_INTERVAL,
@ -13,12 +20,20 @@ from homeassistant.const import (
STATE_ON, STATE_ON,
STATE_PROBLEM, STATE_PROBLEM,
) )
from homeassistant.core import callback from homeassistant.core import CALLBACK_TYPE
from homeassistant.exceptions import ConfigEntryNotReady
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect, dispatcher_send from homeassistant.helpers.dispatcher import (
from homeassistant.helpers.entity import Entity async_dispatcher_connect,
from homeassistant.helpers.event import track_time_interval async_dispatcher_send,
from homeassistant.util import dt )
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from .const import CONFIG_ENTRY_UPDATE_SIGNAL_TEMPLATE, DEFAULT_SCAN_INTERVAL, DOMAIN
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -33,14 +48,11 @@ ATTR_ZONE = "zone"
CONF_SERVERS = "servers" CONF_SERVERS = "servers"
DATA_UPCLOUD = "data_upcloud" DATA_UPCLOUD = "data_upcloud"
DOMAIN = "upcloud"
DEFAULT_COMPONENT_NAME = "UpCloud {}" DEFAULT_COMPONENT_NAME = "UpCloud {}"
DEFAULT_COMPONENT_DEVICE_CLASS = "power" DEFAULT_COMPONENT_DEVICE_CLASS = "power"
UPCLOUD_PLATFORMS = ["binary_sensor", "switch"] CONFIG_ENTRY_DOMAINS = {BINARY_SENSOR_DOMAIN, SWITCH_DOMAIN}
SCAN_INTERVAL = timedelta(seconds=60)
SIGNAL_UPDATE_UPCLOUD = "upcloud_update" SIGNAL_UPDATE_UPCLOUD = "upcloud_update"
@ -52,7 +64,9 @@ CONFIG_SCHEMA = vol.Schema(
{ {
vol.Required(CONF_USERNAME): cv.string, vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string, vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_SCAN_INTERVAL, default=SCAN_INTERVAL): cv.time_period, vol.Optional(
CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL
): cv.time_period,
} }
) )
}, },
@ -60,57 +74,191 @@ CONFIG_SCHEMA = vol.Schema(
) )
def setup(hass, config): class UpCloudDataUpdateCoordinator(
"""Set up the UpCloud component.""" DataUpdateCoordinator[Dict[str, upcloud_api.Server]]
conf = config[DOMAIN] ):
username = conf.get(CONF_USERNAME) """UpCloud data update coordinator."""
password = conf.get(CONF_PASSWORD)
scan_interval = conf.get(CONF_SCAN_INTERVAL)
manager = upcloud_api.CloudManager(username, password) def __init__(
self,
hass: HomeAssistantType,
*,
cloud_manager: upcloud_api.CloudManager,
update_interval: timedelta,
username: str,
) -> None:
"""Initialize coordinator."""
super().__init__(
hass, _LOGGER, name=f"{username}@UpCloud", update_interval=update_interval
)
self.cloud_manager = cloud_manager
self.unsub_handlers: List[CALLBACK_TYPE] = []
try: async def async_update_config(self, config_entry: ConfigEntry) -> None:
manager.authenticate() """Handle config update."""
hass.data[DATA_UPCLOUD] = UpCloud(manager) self.update_interval = timedelta(
except upcloud_api.UpCloudAPIError: seconds=config_entry.options[CONF_SCAN_INTERVAL]
_LOGGER.error("Authentication failed") )
return False
def upcloud_update(event_time): async def _async_update_data(self) -> Dict[str, upcloud_api.Server]:
"""Call UpCloud to update information.""" return {
_LOGGER.debug("Updating UpCloud component") x.uuid: x
hass.data[DATA_UPCLOUD].update() for x in await self.hass.async_add_executor_job(
dispatcher_send(hass, SIGNAL_UPDATE_UPCLOUD) self.cloud_manager.get_servers
)
}
# Call the UpCloud API to refresh data
upcloud_update(dt.utcnow()) @dataclasses.dataclass
track_time_interval(hass, upcloud_update, scan_interval) class UpCloudHassData:
"""Home Assistant UpCloud runtime data."""
coordinators: Dict[str, UpCloudDataUpdateCoordinator] = dataclasses.field(
default_factory=dict
)
scan_interval_migrations: Dict[str, int] = dataclasses.field(default_factory=dict)
async def async_setup(hass: HomeAssistantType, config) -> bool:
"""Set up UpCloud component."""
domain_config = config.get(DOMAIN)
if not domain_config:
return True
_LOGGER.warning(
"Loading upcloud via top level config is deprecated and no longer "
"necessary as of 0.117. Please remove it from your YAML configuration."
)
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_USERNAME: domain_config[CONF_USERNAME],
CONF_PASSWORD: domain_config[CONF_PASSWORD],
},
)
)
if domain_config[CONF_SCAN_INTERVAL]:
hass.data[DATA_UPCLOUD] = UpCloudHassData()
hass.data[DATA_UPCLOUD].scan_interval_migrations[
domain_config[CONF_USERNAME]
] = domain_config[CONF_SCAN_INTERVAL]
return True return True
class UpCloud: def _config_entry_update_signal_name(config_entry: ConfigEntry) -> str:
"""Handle all communication with the UpCloud API.""" """Get signal name for updates to a config entry."""
return CONFIG_ENTRY_UPDATE_SIGNAL_TEMPLATE.format(config_entry.unique_id)
def __init__(self, manager):
"""Initialize the UpCloud connection."""
self.data = {}
self.manager = manager
def update(self):
"""Update data from UpCloud API."""
self.data = {server.uuid: server for server in self.manager.get_servers()}
class UpCloudServerEntity(Entity): async def _async_signal_options_update(
hass: HomeAssistantType, config_entry: ConfigEntry
) -> None:
"""Signal config entry options update."""
async_dispatcher_send(
hass, _config_entry_update_signal_name(config_entry), config_entry
)
async def async_setup_entry(hass: HomeAssistantType, config_entry: ConfigEntry) -> bool:
"""Set up the UpCloud config entry."""
manager = upcloud_api.CloudManager(
config_entry.data[CONF_USERNAME], config_entry.data[CONF_PASSWORD]
)
try:
await hass.async_add_executor_job(manager.authenticate)
except upcloud_api.UpCloudAPIError:
_LOGGER.error("Authentication failed", exc_info=True)
return False
except requests.exceptions.RequestException as err:
_LOGGER.error("Failed to connect", exc_info=True)
raise ConfigEntryNotReady from err
upcloud_data = hass.data.setdefault(DATA_UPCLOUD, UpCloudHassData())
# Handle pre config entry (0.117) scan interval migration to options
migrated_scan_interval = upcloud_data.scan_interval_migrations.pop(
config_entry.data[CONF_USERNAME], None
)
if migrated_scan_interval and (
not config_entry.options.get(CONF_SCAN_INTERVAL)
or config_entry.options[CONF_SCAN_INTERVAL] == DEFAULT_SCAN_INTERVAL.seconds
):
update_interval = migrated_scan_interval
hass.config_entries.async_update_entry(
config_entry,
options={CONF_SCAN_INTERVAL: update_interval.seconds},
)
elif config_entry.options.get(CONF_SCAN_INTERVAL):
update_interval = timedelta(seconds=config_entry.options[CONF_SCAN_INTERVAL])
else:
update_interval = DEFAULT_SCAN_INTERVAL
coordinator = UpCloudDataUpdateCoordinator(
hass,
update_interval=update_interval,
cloud_manager=manager,
username=config_entry.data[CONF_USERNAME],
)
# Call the UpCloud API to refresh data
await coordinator.async_request_refresh()
if not coordinator.last_update_success:
raise ConfigEntryNotReady
# Listen to config entry updates
coordinator.unsub_handlers.append(
config_entry.add_update_listener(_async_signal_options_update)
)
coordinator.unsub_handlers.append(
async_dispatcher_connect(
hass,
_config_entry_update_signal_name(config_entry),
coordinator.async_update_config,
)
)
upcloud_data.coordinators[config_entry.data[CONF_USERNAME]] = coordinator
# Forward entry setup
for domain in CONFIG_ENTRY_DOMAINS:
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(config_entry, domain)
)
return True
async def async_unload_entry(hass, config_entry):
"""Unload the config entry."""
for domain in CONFIG_ENTRY_DOMAINS:
await hass.config_entries.async_forward_entry_unload(config_entry, domain)
coordinator: UpCloudDataUpdateCoordinator = hass.data[
DATA_UPCLOUD
].coordinators.pop(config_entry.data[CONF_USERNAME])
while coordinator.unsub_handlers:
coordinator.unsub_handlers.pop()()
return True
class UpCloudServerEntity(CoordinatorEntity):
"""Entity class for UpCloud servers.""" """Entity class for UpCloud servers."""
def __init__(self, upcloud, uuid): def __init__(self, coordinator, uuid):
"""Initialize the UpCloud server entity.""" """Initialize the UpCloud server entity."""
self._upcloud = upcloud super().__init__(coordinator)
self.uuid = uuid self.uuid = uuid
self.data = None
self._unsub_handlers = [] @property
def _server(self) -> upcloud_api.Server:
return self.coordinator.data[self.uuid]
@property @property
def unique_id(self) -> str: def unique_id(self) -> str:
@ -121,29 +269,10 @@ class UpCloudServerEntity(Entity):
def name(self): def name(self):
"""Return the name of the component.""" """Return the name of the component."""
try: try:
return DEFAULT_COMPONENT_NAME.format(self.data.title) return DEFAULT_COMPONENT_NAME.format(self._server.title)
except (AttributeError, KeyError, TypeError): except (AttributeError, KeyError, TypeError):
return DEFAULT_COMPONENT_NAME.format(self.uuid) return DEFAULT_COMPONENT_NAME.format(self.uuid)
async def async_added_to_hass(self):
"""Register callbacks."""
self._unsub_handlers.append(
async_dispatcher_connect(
self.hass, SIGNAL_UPDATE_UPCLOUD, self._update_callback
)
)
async def async_will_remove_from_hass(self) -> None:
"""Invoke unsubscription handlers."""
for unsub in self._unsub_handlers:
unsub()
self._unsub_handlers.clear()
@callback
def _update_callback(self):
"""Call update method."""
self.async_schedule_update_ha_state(True)
@property @property
def icon(self): def icon(self):
"""Return the icon of this server.""" """Return the icon of this server."""
@ -153,7 +282,7 @@ class UpCloudServerEntity(Entity):
def state(self): def state(self):
"""Return state of the server.""" """Return state of the server."""
try: try:
return STATE_MAP.get(self.data.state) return STATE_MAP.get(self._server.state, self._server.state)
except AttributeError: except AttributeError:
return None return None
@ -171,18 +300,13 @@ class UpCloudServerEntity(Entity):
def device_state_attributes(self): def device_state_attributes(self):
"""Return the state attributes of the UpCloud server.""" """Return the state attributes of the UpCloud server."""
return { return {
x: getattr(self.data, x, None) x: getattr(self._server, x, None)
for x in ( for x in (
ATTR_UUID, ATTR_UUID,
ATTR_TITLE, ATTR_TITLE,
ATTR_HOSTNAME, ATTR_HOSTNAME,
ATTR_ZONE, ATTR_ZONE,
ATTR_STATE,
ATTR_CORE_NUMBER, ATTR_CORE_NUMBER,
ATTR_MEMORY_AMOUNT, ATTR_MEMORY_AMOUNT,
) )
} }
def update(self):
"""Update data of the UpCloud server."""
self.data = self._upcloud.data.get(self.uuid)

View file

@ -1,29 +1,23 @@
"""Support for monitoring the state of UpCloud servers.""" """Support for monitoring the state of UpCloud servers."""
import logging
import voluptuous as vol import voluptuous as vol
from homeassistant.components.binary_sensor import PLATFORM_SCHEMA, BinarySensorEntity from homeassistant.components.binary_sensor import PLATFORM_SCHEMA, BinarySensorEntity
from homeassistant.const import CONF_USERNAME
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from . import CONF_SERVERS, DATA_UPCLOUD, UpCloudServerEntity from . import CONF_SERVERS, DATA_UPCLOUD, UpCloudServerEntity
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{vol.Required(CONF_SERVERS): vol.All(cv.ensure_list, [cv.string])} {vol.Required(CONF_SERVERS): vol.All(cv.ensure_list, [cv.string])}
) )
def setup_platform(hass, config, add_entities, discovery_info=None): async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the UpCloud server binary sensor.""" """Set up the UpCloud server binary sensor."""
upcloud = hass.data[DATA_UPCLOUD] coordinator = hass.data[DATA_UPCLOUD].coordinators[config_entry.data[CONF_USERNAME]]
entities = [UpCloudBinarySensor(coordinator, uuid) for uuid in coordinator.data]
servers = config.get(CONF_SERVERS) async_add_entities(entities, True)
devices = [UpCloudBinarySensor(upcloud, uuid) for uuid in servers]
add_entities(devices, True)
class UpCloudBinarySensor(UpCloudServerEntity, BinarySensorEntity): class UpCloudBinarySensor(UpCloudServerEntity, BinarySensorEntity):

View file

@ -0,0 +1,112 @@
"""Config flow for UpCloud."""
import logging
import requests.exceptions
import upcloud_api
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_PASSWORD, CONF_SCAN_INTERVAL, CONF_USERNAME
from homeassistant.core import callback
# pylint: disable=unused-import # for DOMAIN https://github.com/PyCQA/pylint/issues/3202
from .const import DEFAULT_SCAN_INTERVAL, DOMAIN
_LOGGER = logging.getLogger(__name__)
class UpCloudConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""UpCloud config flow."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_CLOUD_POLL
username: str
password: str
async def async_step_user(self, user_input=None):
"""Handle user initiated flow."""
if user_input is None:
return self._async_show_form(step_id="user")
await self.async_set_unique_id(user_input[CONF_USERNAME])
manager = upcloud_api.CloudManager(
user_input[CONF_USERNAME], user_input[CONF_PASSWORD]
)
errors = {}
try:
await self.hass.async_add_executor_job(manager.authenticate)
except upcloud_api.UpCloudAPIError:
errors["base"] = "invalid_auth"
_LOGGER.debug("invalid_auth", exc_info=True)
except requests.exceptions.RequestException:
errors["base"] = "cannot_connect"
_LOGGER.debug("cannot_connect", exc_info=True)
if errors:
return self._async_show_form(
step_id="user", user_input=user_input, errors=errors
)
return self.async_create_entry(title=user_input[CONF_USERNAME], data=user_input)
async def async_step_import(self, user_input=None):
"""Handle import initiated flow."""
await self.async_set_unique_id(user_input[CONF_USERNAME])
self._abort_if_unique_id_configured()
return await self.async_step_user(user_input=user_input)
@callback
def _async_show_form(self, step_id, user_input=None, errors=None):
"""Show our form."""
if user_input is None:
user_input = {}
return self.async_show_form(
step_id=step_id,
data_schema=vol.Schema(
{
vol.Required(
CONF_USERNAME, default=user_input.get(CONF_USERNAME, "")
): str,
vol.Required(
CONF_PASSWORD, default=user_input.get(CONF_PASSWORD, "")
): str,
}
),
errors=errors or {},
)
@staticmethod
@callback
def async_get_options_flow(config_entry):
"""Get options flow."""
return UpCloudOptionsFlow(config_entry)
class UpCloudOptionsFlow(config_entries.OptionsFlow):
"""UpCloud options flow."""
def __init__(self, config_entry: config_entries.ConfigEntry):
"""Initialize options flow."""
self.config_entry = config_entry
async def async_step_init(self, user_input=None):
"""Handle options flow."""
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
data_schema = vol.Schema(
{
vol.Optional(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(CONF_SCAN_INTERVAL)
or DEFAULT_SCAN_INTERVAL.seconds,
): vol.All(vol.Coerce(int), vol.Range(min=30)),
}
)
return self.async_show_form(step_id="init", data_schema=data_schema)

View file

@ -0,0 +1,7 @@
"""UpCloud constants."""
from datetime import timedelta
DOMAIN = "upcloud"
DEFAULT_SCAN_INTERVAL = timedelta(seconds=60)
CONFIG_ENTRY_UPDATE_SIGNAL_TEMPLATE = f"{DOMAIN}_config_entry_update:" "{}"

View file

@ -1,6 +1,7 @@
{ {
"domain": "upcloud", "domain": "upcloud",
"name": "UpCloud", "name": "UpCloud",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/upcloud", "documentation": "https://www.home-assistant.io/integrations/upcloud",
"requirements": ["upcloud-api==0.4.5"], "requirements": ["upcloud-api==0.4.5"],
"codeowners": ["@scop"] "codeowners": ["@scop"]

View file

@ -0,0 +1,25 @@
{
"config": {
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]"
},
"step": {
"user": {
"data": {
"username": "[%key:common::config_flow::data::username%]",
"password": "[%key:common::config_flow::data::password%]"
}
}
}
},
"options": {
"step": {
"init": {
"data": {
"scan_interval": "Update interval in seconds, minimum 30"
}
}
}
}
}

View file

@ -1,31 +1,24 @@
"""Support for interacting with UpCloud servers.""" """Support for interacting with UpCloud servers."""
import logging
import voluptuous as vol import voluptuous as vol
from homeassistant.components.switch import PLATFORM_SCHEMA, SwitchEntity from homeassistant.components.switch import PLATFORM_SCHEMA, SwitchEntity
from homeassistant.const import STATE_OFF from homeassistant.const import CONF_USERNAME, STATE_OFF
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import dispatcher_send from homeassistant.helpers.dispatcher import dispatcher_send
from . import CONF_SERVERS, DATA_UPCLOUD, SIGNAL_UPDATE_UPCLOUD, UpCloudServerEntity from . import CONF_SERVERS, DATA_UPCLOUD, SIGNAL_UPDATE_UPCLOUD, UpCloudServerEntity
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{vol.Required(CONF_SERVERS): vol.All(cv.ensure_list, [cv.string])} {vol.Required(CONF_SERVERS): vol.All(cv.ensure_list, [cv.string])}
) )
def setup_platform(hass, config, add_entities, discovery_info=None): async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the UpCloud server switch.""" """Set up the UpCloud server switch."""
upcloud = hass.data[DATA_UPCLOUD] coordinator = hass.data[DATA_UPCLOUD].coordinators[config_entry.data[CONF_USERNAME]]
entities = [UpCloudSwitch(coordinator, uuid) for uuid in coordinator.data]
servers = config.get(CONF_SERVERS) async_add_entities(entities, True)
devices = [UpCloudSwitch(upcloud, uuid) for uuid in servers]
add_entities(devices, True)
class UpCloudSwitch(UpCloudServerEntity, SwitchEntity): class UpCloudSwitch(UpCloudServerEntity, SwitchEntity):
@ -34,10 +27,10 @@ class UpCloudSwitch(UpCloudServerEntity, SwitchEntity):
def turn_on(self, **kwargs): def turn_on(self, **kwargs):
"""Start the server.""" """Start the server."""
if self.state == STATE_OFF: if self.state == STATE_OFF:
self.data.start() self._server.start()
dispatcher_send(self.hass, SIGNAL_UPDATE_UPCLOUD) dispatcher_send(self.hass, SIGNAL_UPDATE_UPCLOUD)
def turn_off(self, **kwargs): def turn_off(self, **kwargs):
"""Stop the server.""" """Stop the server."""
if self.is_on: if self.is_on:
self.data.stop() self._server.stop()

View file

@ -0,0 +1,25 @@
{
"config": {
"error": {
"cannot_connect": "Failed to connect",
"invalid_auth": "Invalid authentication"
},
"step": {
"user": {
"data": {
"password": "Password",
"username": "Username"
}
}
}
},
"options": {
"step": {
"init": {
"data": {
"scan_interval": "Update interval in seconds, minimum 30"
}
}
}
}
}

View file

@ -205,6 +205,7 @@ FLOWS = [
"twilio", "twilio",
"unifi", "unifi",
"upb", "upb",
"upcloud",
"upnp", "upnp",
"velbus", "velbus",
"vera", "vera",

View file

@ -1046,6 +1046,9 @@ twilio==6.32.0
# homeassistant.components.upb # homeassistant.components.upb
upb_lib==0.4.11 upb_lib==0.4.11
# homeassistant.components.upcloud
upcloud-api==0.4.5
# homeassistant.components.huawei_lte # homeassistant.components.huawei_lte
# homeassistant.components.syncthru # homeassistant.components.syncthru
url-normalize==1.4.1 url-normalize==1.4.1

View file

@ -0,0 +1 @@
"""Tests for the UpCloud integration."""

View file

@ -0,0 +1,94 @@
"""Tests for the UpCloud config flow."""
import requests.exceptions
from requests_mock import ANY
from upcloud_api import UpCloudAPIError
from homeassistant import config_entries, data_entry_flow
from homeassistant.components.upcloud.const import DOMAIN
from homeassistant.const import CONF_PASSWORD, CONF_SCAN_INTERVAL, CONF_USERNAME
from tests.common import MockConfigEntry
FIXTURE_USER_INPUT = {
CONF_USERNAME: "user",
CONF_PASSWORD: "pass",
}
FIXTURE_USER_INPUT_OPTIONS = {
CONF_SCAN_INTERVAL: "120",
}
async def test_show_set_form(hass):
"""Test that the setup form is served."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=None
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
async def test_connection_error(hass, requests_mock):
"""Test we show user form on connection error."""
requests_mock.request(ANY, ANY, exc=requests.exceptions.ConnectionError())
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=FIXTURE_USER_INPUT
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": "cannot_connect"}
async def test_login_error(hass, requests_mock):
"""Test we show user form with appropriate error on response failure."""
requests_mock.request(
ANY,
ANY,
exc=UpCloudAPIError(
error_code="AUTHENTICATION_FAILED",
error_message="Authentication failed using the given username and password.",
),
)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=FIXTURE_USER_INPUT
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": "invalid_auth"}
async def test_success(hass, requests_mock):
"""Test successful flow provides entry creation data."""
requests_mock.request(ANY, ANY, text='{"account":{"username":"user"}}')
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=FIXTURE_USER_INPUT
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"][CONF_USERNAME] == FIXTURE_USER_INPUT[CONF_USERNAME]
assert result["data"][CONF_PASSWORD] == FIXTURE_USER_INPUT[CONF_PASSWORD]
async def test_options(hass):
"""Test options produce expected data."""
config_entry = MockConfigEntry(
domain=DOMAIN, data=FIXTURE_USER_INPUT, options=FIXTURE_USER_INPUT_OPTIONS
)
config_entry.add_to_hass(hass)
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input=FIXTURE_USER_INPUT_OPTIONS,
)
assert result["data"][CONF_SCAN_INTERVAL] == int(
FIXTURE_USER_INPUT_OPTIONS[CONF_SCAN_INTERVAL]
)