Re-work SimpliSafe authentication to only need username/password (#70160)

This commit is contained in:
Aaron Bach 2022-04-27 02:16:28 -06:00 committed by GitHub
parent b9ce236054
commit 031149dfdd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 474 additions and 212 deletions

View file

@ -51,6 +51,7 @@ from homeassistant.const import (
ATTR_DEVICE_ID, ATTR_DEVICE_ID,
CONF_CODE, CONF_CODE,
CONF_TOKEN, CONF_TOKEN,
CONF_USERNAME,
EVENT_HOMEASSISTANT_STOP, EVENT_HOMEASSISTANT_STOP,
Platform, Platform,
) )
@ -90,7 +91,6 @@ from .const import (
ATTR_EXIT_DELAY_HOME, ATTR_EXIT_DELAY_HOME,
ATTR_LIGHT, ATTR_LIGHT,
ATTR_VOICE_PROMPT_VOLUME, ATTR_VOICE_PROMPT_VOLUME,
CONF_USER_ID,
DOMAIN, DOMAIN,
LOGGER, LOGGER,
) )
@ -280,11 +280,13 @@ def _async_standardize_config_entry(hass: HomeAssistant, entry: ConfigEntry) ->
raise ConfigEntryAuthFailed( raise ConfigEntryAuthFailed(
"New SimpliSafe OAuth standard requires re-authentication" "New SimpliSafe OAuth standard requires re-authentication"
) )
if CONF_USERNAME not in entry.data:
raise ConfigEntryAuthFailed("Need to re-auth with username/password")
entry_updates = {} entry_updates = {}
if not entry.unique_id: if not entry.unique_id:
# If the config entry doesn't already have a unique ID, set one: # If the config entry doesn't already have a unique ID, set one:
entry_updates["unique_id"] = entry.data[CONF_USER_ID] entry_updates["unique_id"] = entry.data[CONF_USERNAME]
if CONF_CODE in entry.data: if CONF_CODE in entry.data:
# If an alarm code was provided as part of configuration.yaml, pop it out of # If an alarm code was provided as part of configuration.yaml, pop it out of
# the config entry's data and move it to options: # the config entry's data and move it to options:
@ -598,7 +600,7 @@ class SimpliSafe:
self.coordinator = DataUpdateCoordinator( self.coordinator = DataUpdateCoordinator(
self._hass, self._hass,
LOGGER, LOGGER,
name=self.entry.data[CONF_USER_ID], name=self.entry.title,
update_interval=DEFAULT_SCAN_INTERVAL, update_interval=DEFAULT_SCAN_INTERVAL,
update_method=self.async_update, update_method=self.async_update,
) )

View file

@ -1,56 +1,47 @@
"""Config flow to configure the SimpliSafe component.""" """Config flow to configure the SimpliSafe component."""
from __future__ import annotations from __future__ import annotations
from typing import Any, NamedTuple import asyncio
from typing import Any
import async_timeout
from simplipy import API from simplipy import API
from simplipy.errors import InvalidCredentialsError, SimplipyError from simplipy.api import AuthStates
from simplipy.util.auth import ( from simplipy.errors import InvalidCredentialsError, SimplipyError, Verify2FAPending
get_auth0_code_challenge,
get_auth0_code_verifier,
get_auth_url,
)
import voluptuous as vol import voluptuous as vol
from homeassistant import config_entries from homeassistant import config_entries
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_CODE, CONF_TOKEN, CONF_URL, CONF_USERNAME from homeassistant.const import CONF_CODE, CONF_PASSWORD, CONF_TOKEN, CONF_USERNAME
from homeassistant.core import callback from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import aiohttp_client, config_validation as cv from homeassistant.helpers import aiohttp_client, config_validation as cv
from .const import CONF_USER_ID, DOMAIN, LOGGER from .const import DOMAIN, LOGGER
CONF_AUTH_CODE = "auth_code" DEFAULT_EMAIL_2FA_SLEEP = 3
CONF_DOCS_URL = "docs_url" DEFAULT_EMAIL_2FA_TIMEOUT = 300
AUTH_DOCS_URL = ( STEP_REAUTH_SCHEMA = vol.Schema(
"http://home-assistant.io/integrations/simplisafe#getting-an-authorization-code" {
vol.Required(CONF_PASSWORD): cv.string,
}
)
STEP_SMS_2FA_SCHEMA = vol.Schema(
{
vol.Required(CONF_CODE): cv.string,
}
) )
STEP_USER_SCHEMA = vol.Schema( STEP_USER_SCHEMA = vol.Schema(
{ {
vol.Required(CONF_AUTH_CODE): cv.string, vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
} }
) )
class SimpliSafeOAuthValues(NamedTuple):
"""Define a named tuple to handle SimpliSafe OAuth strings."""
code_verifier: str
auth_url: str
@callback
def async_get_simplisafe_oauth_values() -> SimpliSafeOAuthValues:
"""Get a SimpliSafe OAuth code verifier and auth URL."""
code_verifier = get_auth0_code_verifier()
code_challenge = get_auth0_code_challenge(code_verifier)
auth_url = get_auth_url(code_challenge)
return SimpliSafeOAuthValues(code_verifier, auth_url)
class SimpliSafeFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): class SimpliSafeFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a SimpliSafe config flow.""" """Handle a SimpliSafe config flow."""
@ -58,10 +49,98 @@ class SimpliSafeFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
def __init__(self) -> None: def __init__(self) -> None:
"""Initialize the config flow.""" """Initialize the config flow."""
self._oauth_values: SimpliSafeOAuthValues | None = None self._password: str | None = None
self._reauth: bool = False self._reauth: bool = False
self._simplisafe: API | None = None
self._username: str | None = None self._username: str | None = None
async def _async_authenticate(
self, error_step_id: str, error_schema: vol.Schema
) -> FlowResult:
"""Attempt to authenticate to the SimpliSafe API."""
assert self._password
assert self._username
errors = {}
session = aiohttp_client.async_get_clientsession(self.hass)
try:
self._simplisafe = await API.async_from_credentials(
self._username, self._password, session=session
)
except InvalidCredentialsError:
errors = {"base": "invalid_auth"}
except SimplipyError as err:
LOGGER.error("Unknown error while logging into SimpliSafe: %s", err)
errors = {"base": "unknown"}
if errors:
return self.async_show_form(
step_id=error_step_id,
data_schema=error_schema,
errors=errors,
description_placeholders={CONF_USERNAME: self._username},
)
assert self._simplisafe
if self._simplisafe.auth_state == AuthStates.PENDING_2FA_SMS:
return await self.async_step_sms_2fa()
try:
async with async_timeout.timeout(DEFAULT_EMAIL_2FA_TIMEOUT):
while True:
try:
await self._simplisafe.async_verify_2fa_email()
except Verify2FAPending:
LOGGER.info("Email-based 2FA pending; trying again")
await asyncio.sleep(DEFAULT_EMAIL_2FA_SLEEP)
else:
break
except asyncio.TimeoutError:
return self.async_show_form(
step_id="user",
data_schema=STEP_USER_SCHEMA,
errors={"base": "2fa_timed_out"},
)
return await self._async_finish_setup()
async def _async_finish_setup(self) -> FlowResult:
"""Complete setup with an authenticated API object."""
assert self._simplisafe
assert self._username
data = {
CONF_USERNAME: self._username,
CONF_TOKEN: self._simplisafe.refresh_token,
}
user_id = str(self._simplisafe.user_id)
if self._reauth:
# "Old" config entries utilized the user's email address (username) as the
# unique ID, whereas "new" config entries utilize the SimpliSafe user ID
# only one can exist at a time, but the presence of either one is a
# candidate for re-auth:
if existing_entries := [
entry
for entry in self.hass.config_entries.async_entries()
if entry.unique_id in (self._username, user_id)
]:
existing_entry = existing_entries[0]
self.hass.config_entries.async_update_entry(
existing_entry, unique_id=user_id, title=self._username, data=data
)
self.hass.async_create_task(
self.hass.config_entries.async_reload(existing_entry.entry_id)
)
return self.async_abort(reason="reauth_successful")
await self.async_set_unique_id(user_id)
self._abort_if_unique_id_configured()
return self.async_create_entry(title=self._username, data=data)
@staticmethod @staticmethod
@callback @callback
def async_get_options_flow( def async_get_options_flow(
@ -70,77 +149,65 @@ class SimpliSafeFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Define the config flow to handle options.""" """Define the config flow to handle options."""
return SimpliSafeOptionsFlowHandler(config_entry) return SimpliSafeOptionsFlowHandler(config_entry)
def _async_show_form(self, *, errors: dict[str, Any] | None = None) -> FlowResult:
"""Show the form."""
self._oauth_values = async_get_simplisafe_oauth_values()
return self.async_show_form(
step_id="user",
data_schema=STEP_USER_SCHEMA,
errors=errors or {},
description_placeholders={
CONF_URL: self._oauth_values.auth_url,
CONF_DOCS_URL: AUTH_DOCS_URL,
},
)
async def async_step_reauth(self, config: dict[str, Any]) -> FlowResult: async def async_step_reauth(self, config: dict[str, Any]) -> FlowResult:
"""Handle configuration by re-auth.""" """Handle configuration by re-auth."""
self._username = config.get(CONF_USERNAME)
self._reauth = True self._reauth = True
return await self.async_step_user()
if CONF_USERNAME not in config:
# Old versions of the config flow may not have the username by this point;
# in that case, we reauth them by making them go through the user flow:
return await self.async_step_user()
self._username = config[CONF_USERNAME]
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle re-auth completion."""
if not user_input:
return self.async_show_form(
step_id="reauth_confirm",
data_schema=STEP_REAUTH_SCHEMA,
description_placeholders={CONF_USERNAME: self._username},
)
self._password = user_input[CONF_PASSWORD]
return await self._async_authenticate("reauth_confirm", STEP_REAUTH_SCHEMA)
async def async_step_sms_2fa(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle SMS-based two-factor authentication."""
if not user_input:
return self.async_show_form(
step_id="sms_2fa",
data_schema=STEP_SMS_2FA_SCHEMA,
)
assert self._simplisafe
try:
await self._simplisafe.async_verify_2fa_sms(user_input[CONF_CODE])
except InvalidCredentialsError:
return self.async_show_form(
step_id="sms_2fa",
data_schema=STEP_SMS_2FA_SCHEMA,
errors={CONF_CODE: "invalid_auth"},
)
return await self._async_finish_setup()
async def async_step_user( async def async_step_user(
self, user_input: dict[str, Any] | None = None self, user_input: dict[str, Any] | None = None
) -> FlowResult: ) -> FlowResult:
"""Handle the start of the config flow.""" """Handle the start of the config flow."""
if user_input is None: if user_input is None:
return self._async_show_form() return self.async_show_form(step_id="user", data_schema=STEP_USER_SCHEMA)
assert self._oauth_values self._username = user_input[CONF_USERNAME]
self._password = user_input[CONF_PASSWORD]
errors = {} return await self._async_authenticate("user", STEP_USER_SCHEMA)
session = aiohttp_client.async_get_clientsession(self.hass)
try:
simplisafe = await API.async_from_auth(
user_input[CONF_AUTH_CODE],
self._oauth_values.code_verifier,
session=session,
)
except InvalidCredentialsError:
errors = {"base": "invalid_auth"}
except SimplipyError as err:
LOGGER.error("Unknown error while logging into SimpliSafe: %s", err)
errors = {"base": "unknown"}
if errors:
return self._async_show_form(errors=errors)
data = {CONF_USER_ID: simplisafe.user_id, CONF_TOKEN: simplisafe.refresh_token}
unique_id = str(simplisafe.user_id)
if self._reauth:
# "Old" config entries utilized the user's email address (username) as the
# unique ID, whereas "new" config entries utilize the SimpliSafe user ID
# either one is a candidate for re-auth:
existing_entry = await self.async_set_unique_id(self._username or unique_id)
if not existing_entry:
# If we don't have an entry that matches this user ID, the user logged
# in with different credentials:
return self.async_abort(reason="wrong_account")
self.hass.config_entries.async_update_entry(
existing_entry, unique_id=unique_id, data=data
)
self.hass.async_create_task(
self.hass.config_entries.async_reload(existing_entry.entry_id)
)
return self.async_abort(reason="reauth_successful")
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
return self.async_create_entry(title=unique_id, data=data)
class SimpliSafeOptionsFlowHandler(config_entries.OptionsFlow): class SimpliSafeOptionsFlowHandler(config_entries.OptionsFlow):

View file

@ -14,5 +14,3 @@ ATTR_EXIT_DELAY_AWAY = "exit_delay_away"
ATTR_EXIT_DELAY_HOME = "exit_delay_home" ATTR_EXIT_DELAY_HOME = "exit_delay_home"
ATTR_LIGHT = "light" ATTR_LIGHT = "light"
ATTR_VOICE_PROMPT_VOLUME = "voice_prompt_volume" ATTR_VOICE_PROMPT_VOLUME = "voice_prompt_volume"
CONF_USER_ID = "user_id"

View file

@ -3,7 +3,7 @@
"name": "SimpliSafe", "name": "SimpliSafe",
"config_flow": true, "config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/simplisafe", "documentation": "https://www.home-assistant.io/integrations/simplisafe",
"requirements": ["simplisafe-python==2022.03.3"], "requirements": ["simplisafe-python==2022.04.1"],
"codeowners": ["@bachya"], "codeowners": ["@bachya"],
"iot_class": "cloud_polling", "iot_class": "cloud_polling",
"dhcp": [ "dhcp": [

View file

@ -1,23 +1,35 @@
{ {
"config": { "config": {
"step": { "step": {
"user": { "reauth_confirm": {
"description": "SimpliSafe authenticates with Home Assistant via the SimpliSafe web app. Due to technical limitations, there is a manual step at the end of this process; please ensure that you read the [documentation]({docs_url}) before starting.\n\n1. Click [here]({url}) to open the SimpliSafe web app and input your credentials.\n\n2. When the login process is complete, return here and input the authorization code below.", "title": "[%key:common::config_flow::title::reauth%]",
"description": "Please re-enter the password for {username}.",
"data": { "data": {
"auth_code": "Authorization Code" "password": "[%key:common::config_flow::data::password%]"
}
},
"sms_2fa": {
"description": "Input the two-factor authentication code sent to you via SMS.",
"data": {
"code": "Code"
}
},
"user": {
"description": "Input your username and password.",
"data": {
"username": "[%key:common::config_flow::data::username%]",
"password": "[%key:common::config_flow::data::password%]"
} }
} }
}, },
"error": { "error": {
"identifier_exists": "Account already registered", "2fa_timed_out": "Timed out while waiting for two-factor authentication",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]", "invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"still_awaiting_mfa": "Still awaiting MFA email click",
"unknown": "[%key:common::config_flow::error::unknown%]" "unknown": "[%key:common::config_flow::error::unknown%]"
}, },
"abort": { "abort": {
"already_configured": "This SimpliSafe account is already in use.", "already_configured": "This SimpliSafe account is already in use.",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]", "reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
"wrong_account": "The user credentials provided do not match this SimpliSafe account."
} }
}, },
"options": { "options": {

View file

@ -2,43 +2,33 @@
"config": { "config": {
"abort": { "abort": {
"already_configured": "This SimpliSafe account is already in use.", "already_configured": "This SimpliSafe account is already in use.",
"reauth_successful": "Re-authentication was successful", "reauth_successful": "Re-authentication was successful"
"wrong_account": "The user credentials provided do not match this SimpliSafe account."
}, },
"error": { "error": {
"identifier_exists": "Account already registered", "2fa_timed_out": "Timed out while waiting for two-factor authentication",
"invalid_auth": "Invalid authentication", "invalid_auth": "Invalid authentication",
"still_awaiting_mfa": "Still awaiting MFA email click",
"unknown": "Unexpected error" "unknown": "Unexpected error"
}, },
"step": { "step": {
"input_auth_code": {
"data": {
"auth_code": "Authorization Code"
},
"description": "Input the authorization code from the SimpliSafe web app URL:",
"title": "Finish Authorization"
},
"mfa": {
"description": "Check your email for a link from SimpliSafe. After verifying the link, return here to complete the installation of the integration.",
"title": "SimpliSafe Multi-Factor Authentication"
},
"reauth_confirm": { "reauth_confirm": {
"data": { "data": {
"password": "Password" "password": "Password"
}, },
"description": "Your access has expired or been revoked. Enter your password to re-link your account.", "description": "Please re-enter the password for {username}.",
"title": "Reauthenticate Integration" "title": "Reauthenticate Integration"
}, },
"sms_2fa": {
"data": {
"code": "Code"
},
"description": "Input the two-factor authentication code sent to you via SMS."
},
"user": { "user": {
"data": { "data": {
"auth_code": "Authorization Code",
"code": "Code (used in Home Assistant UI)",
"password": "Password", "password": "Password",
"username": "Email" "username": "Username"
}, },
"description": "SimpliSafe authenticates with Home Assistant via the SimpliSafe web app. Due to technical limitations, there is a manual step at the end of this process; please ensure that you read the [documentation]({docs_url}) before starting.\n\n1. Click [here]({url}) to open the SimpliSafe web app and input your credentials.\n\n2. When the login process is complete, return here and input the authorization code below.", "description": "Input your username and password."
"title": "Fill in your information."
} }
} }
}, },

View file

@ -2150,7 +2150,7 @@ simplehound==0.3
simplepush==1.1.4 simplepush==1.1.4
# homeassistant.components.simplisafe # homeassistant.components.simplisafe
simplisafe-python==2022.03.3 simplisafe-python==2022.04.1
# homeassistant.components.sisyphus # homeassistant.components.sisyphus
sisyphus-control==3.1.2 sisyphus-control==3.1.2

View file

@ -1401,7 +1401,7 @@ sharkiq==0.0.1
simplehound==0.3 simplehound==0.3
# homeassistant.components.simplisafe # homeassistant.components.simplisafe
simplisafe-python==2022.03.3 simplisafe-python==2022.04.1
# homeassistant.components.slack # homeassistant.components.slack
slackclient==2.5.0 slackclient==2.5.0

View file

@ -0,0 +1,4 @@
"""Define common SimpliSafe test constants/etc."""
REFRESH_TOKEN = "token123"
USERNAME = "user@email.com"
USER_ID = "12345"

View file

@ -3,25 +3,36 @@ import json
from unittest.mock import AsyncMock, Mock, patch from unittest.mock import AsyncMock, Mock, patch
import pytest import pytest
from simplipy.api import AuthStates
from simplipy.system.v3 import SystemV3 from simplipy.system.v3 import SystemV3
from homeassistant.components.simplisafe.config_flow import CONF_AUTH_CODE from homeassistant.components.simplisafe.const import DOMAIN
from homeassistant.components.simplisafe.const import CONF_USER_ID, DOMAIN from homeassistant.const import CONF_CODE, CONF_PASSWORD, CONF_TOKEN, CONF_USERNAME
from homeassistant.const import CONF_TOKEN
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
from .common import REFRESH_TOKEN, USER_ID, USERNAME
from tests.common import MockConfigEntry, load_fixture from tests.common import MockConfigEntry, load_fixture
REFRESH_TOKEN = "token123" CODE = "12345"
PASSWORD = "password"
SYSTEM_ID = "system_123" SYSTEM_ID = "system_123"
USER_ID = "12345"
@pytest.fixture(name="api_auth_state")
def api_auth_state_fixture():
"""Define a SimpliSafe API auth state."""
return AuthStates.PENDING_2FA_SMS
@pytest.fixture(name="api") @pytest.fixture(name="api")
def api_fixture(data_subscription, system_v3, websocket): def api_fixture(api_auth_state, data_subscription, system_v3, websocket):
"""Define a fixture for a simplisafe-python API object.""" """Define a simplisafe-python API object."""
return Mock( return Mock(
async_get_systems=AsyncMock(return_value={SYSTEM_ID: system_v3}), async_get_systems=AsyncMock(return_value={SYSTEM_ID: system_v3}),
async_verify_2fa_email=AsyncMock(),
async_verify_2fa_sms=AsyncMock(),
auth_state=api_auth_state,
refresh_token=REFRESH_TOKEN, refresh_token=REFRESH_TOKEN,
subscription_data=data_subscription, subscription_data=data_subscription,
user_id=USER_ID, user_id=USER_ID,
@ -30,27 +41,28 @@ def api_fixture(data_subscription, system_v3, websocket):
@pytest.fixture(name="config_entry") @pytest.fixture(name="config_entry")
def config_entry_fixture(hass, config): def config_entry_fixture(hass, config, unique_id):
"""Define a config entry fixture.""" """Define a config entry."""
entry = MockConfigEntry(domain=DOMAIN, unique_id=USER_ID, data=config) entry = MockConfigEntry(domain=DOMAIN, unique_id=unique_id, data=config)
entry.add_to_hass(hass) entry.add_to_hass(hass)
return entry return entry
@pytest.fixture(name="config") @pytest.fixture(name="config")
def config_fixture(hass): def config_fixture():
"""Define a config entry data fixture.""" """Define config entry data config."""
return { return {
CONF_USER_ID: USER_ID,
CONF_TOKEN: REFRESH_TOKEN, CONF_TOKEN: REFRESH_TOKEN,
CONF_USERNAME: USERNAME,
} }
@pytest.fixture(name="config_code") @pytest.fixture(name="credentials_config")
def config_code_fixture(hass): def credentials_config_fixture():
"""Define a authorization code.""" """Define a username/password config."""
return { return {
CONF_AUTH_CODE: "code123", CONF_USERNAME: USERNAME,
CONF_PASSWORD: PASSWORD,
} }
@ -78,14 +90,23 @@ def data_subscription_fixture():
return json.loads(load_fixture("subscription_data.json", "simplisafe")) return json.loads(load_fixture("subscription_data.json", "simplisafe"))
@pytest.fixture(name="reauth_config")
def reauth_config_fixture():
"""Define a reauth config."""
return {
CONF_PASSWORD: PASSWORD,
}
@pytest.fixture(name="setup_simplisafe") @pytest.fixture(name="setup_simplisafe")
async def setup_simplisafe_fixture(hass, api, config): async def setup_simplisafe_fixture(hass, api, config):
"""Define a fixture to set up SimpliSafe.""" """Define a fixture to set up SimpliSafe."""
with patch( with patch(
"homeassistant.components.simplisafe.config_flow.API.async_from_auth", "homeassistant.components.simplisafe.config_flow.API.async_from_credentials",
return_value=api, return_value=api,
), patch( ), patch(
"homeassistant.components.simplisafe.API.async_from_auth", return_value=api "homeassistant.components.simplisafe.API.async_from_credentials",
return_value=api,
), patch( ), patch(
"homeassistant.components.simplisafe.API.async_from_refresh_token", "homeassistant.components.simplisafe.API.async_from_refresh_token",
return_value=api, return_value=api,
@ -99,9 +120,17 @@ async def setup_simplisafe_fixture(hass, api, config):
yield yield
@pytest.fixture(name="sms_config")
def sms_config_fixture():
"""Define a SMS-based two-factor authentication config."""
return {
CONF_CODE: CODE,
}
@pytest.fixture(name="system_v3") @pytest.fixture(name="system_v3")
def system_v3_fixture(data_latest_event, data_sensor, data_settings, data_subscription): def system_v3_fixture(data_latest_event, data_sensor, data_settings, data_subscription):
"""Define a fixture for a simplisafe-python V3 System object.""" """Define a simplisafe-python V3 System object."""
system = SystemV3(Mock(subscription_data=data_subscription), SYSTEM_ID) system = SystemV3(Mock(subscription_data=data_subscription), SYSTEM_ID)
system.async_get_latest_event = AsyncMock(return_value=data_latest_event) system.async_get_latest_event = AsyncMock(return_value=data_latest_event)
system.sensor_data = data_sensor system.sensor_data = data_sensor
@ -110,9 +139,15 @@ def system_v3_fixture(data_latest_event, data_sensor, data_settings, data_subscr
return system return system
@pytest.fixture(name="unique_id")
def unique_id_fixture():
"""Define a unique ID."""
return USER_ID
@pytest.fixture(name="websocket") @pytest.fixture(name="websocket")
def websocket_fixture(): def websocket_fixture():
"""Define a fixture for a simplisafe-python websocket object.""" """Define a simplisafe-python websocket object."""
return Mock( return Mock(
async_connect=AsyncMock(), async_connect=AsyncMock(),
async_disconnect=AsyncMock(), async_disconnect=AsyncMock(),

View file

@ -2,15 +2,22 @@
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
from simplipy.errors import InvalidCredentialsError, SimplipyError from simplipy.api import AuthStates
from simplipy.errors import InvalidCredentialsError, SimplipyError, Verify2FAPending
from homeassistant import data_entry_flow from homeassistant import data_entry_flow
from homeassistant.components.simplisafe import DOMAIN from homeassistant.components.simplisafe import DOMAIN
from homeassistant.config_entries import SOURCE_REAUTH, SOURCE_USER from homeassistant.config_entries import SOURCE_REAUTH, SOURCE_USER
from homeassistant.const import CONF_CODE from homeassistant.const import CONF_CODE, CONF_TOKEN, CONF_USERNAME
from .common import REFRESH_TOKEN, USER_ID, USERNAME
CONF_USER_ID = "user_id"
async def test_duplicate_error(hass, config_entry, config_code, setup_simplisafe): async def test_duplicate_error(
hass, config_entry, credentials_config, setup_simplisafe, sms_config
):
"""Test that errors are shown when duplicates are added.""" """Test that errors are shown when duplicates are added."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER} DOMAIN, context={"source": SOURCE_USER}
@ -19,35 +26,18 @@ async def test_duplicate_error(hass, config_entry, config_code, setup_simplisafe
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=config_code result["flow_id"], user_input=credentials_config
)
assert result["step_id"] == "sms_2fa"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=sms_config
) )
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured" assert result["reason"] == "already_configured"
@pytest.mark.parametrize(
"exc,error_string",
[(InvalidCredentialsError, "invalid_auth"), (SimplipyError, "unknown")],
)
async def test_errors(hass, config_code, exc, error_string):
"""Test that exceptions show the appropriate error."""
with patch(
"homeassistant.components.simplisafe.API.async_from_auth",
side_effect=exc,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["step_id"] == "user"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=config_code
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"base": error_string}
async def test_options_flow(hass, config_entry): async def test_options_flow(hass, config_entry):
"""Test config flow options.""" """Test config flow options."""
with patch( with patch(
@ -65,82 +55,246 @@ async def test_options_flow(hass, config_entry):
assert config_entry.options == {CONF_CODE: "4321"} assert config_entry.options == {CONF_CODE: "4321"}
async def test_step_reauth_old_format( @pytest.mark.parametrize("unique_id", [USERNAME, USER_ID])
hass, config, config_code, config_entry, setup_simplisafe async def test_step_reauth(
hass, config, config_entry, reauth_config, setup_simplisafe, sms_config
): ):
"""Test the re-auth step with "old" config entries (those with user IDs).""" """Test the re-auth step (testing both username and user ID as unique ID)."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_REAUTH}, data=config DOMAIN, context={"source": SOURCE_REAUTH}, data=config
) )
assert result["step_id"] == "user" assert result["step_id"] == "reauth_confirm"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=config_code result["flow_id"], user_input=reauth_config
)
assert result["step_id"] == "sms_2fa"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=sms_config
) )
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "reauth_successful" assert result["reason"] == "reauth_successful"
assert len(hass.config_entries.async_entries()) == 1 assert len(hass.config_entries.async_entries()) == 1
[config_entry] = hass.config_entries.async_entries(DOMAIN) [config_entry] = hass.config_entries.async_entries(DOMAIN)
assert config_entry.unique_id == USER_ID
assert config_entry.data == config assert config_entry.data == config
async def test_step_reauth_new_format( @pytest.mark.parametrize(
hass, config, config_code, config_entry, setup_simplisafe "exc,error_string",
[(InvalidCredentialsError, "invalid_auth"), (SimplipyError, "unknown")],
)
async def test_step_reauth_errors(hass, config, error_string, exc, reauth_config):
"""Test that errors during the reauth step are handled."""
with patch(
"homeassistant.components.simplisafe.API.async_from_credentials",
side_effect=exc,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_REAUTH}, data=config
)
assert result["step_id"] == "reauth_confirm"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=reauth_config
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"base": error_string}
@pytest.mark.parametrize(
"config,unique_id",
[
(
{
CONF_TOKEN: REFRESH_TOKEN,
CONF_USER_ID: USER_ID,
},
USERNAME,
),
(
{
CONF_TOKEN: REFRESH_TOKEN,
CONF_USER_ID: USER_ID,
},
USER_ID,
),
],
)
async def test_step_reauth_from_scratch(
hass, config, config_entry, credentials_config, setup_simplisafe, sms_config
): ):
"""Test the re-auth step with "new" config entries (those with user IDs).""" """Test the re-auth step when a complete redo is needed."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_REAUTH}, data=config DOMAIN, context={"source": SOURCE_REAUTH}, data=config
) )
assert result["step_id"] == "user" assert result["step_id"] == "user"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=config_code result["flow_id"], user_input=credentials_config
)
assert result["step_id"] == "sms_2fa"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=sms_config
) )
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "reauth_successful" assert result["reason"] == "reauth_successful"
assert len(hass.config_entries.async_entries()) == 1 assert len(hass.config_entries.async_entries()) == 1
[config_entry] = hass.config_entries.async_entries(DOMAIN) [config_entry] = hass.config_entries.async_entries(DOMAIN)
assert config_entry.data == config assert config_entry.unique_id == USER_ID
assert config_entry.data == {
CONF_TOKEN: REFRESH_TOKEN,
CONF_USERNAME: USERNAME,
}
async def test_step_reauth_wrong_account( @pytest.mark.parametrize(
hass, api, config, config_code, config_entry, setup_simplisafe "exc,error_string",
[(InvalidCredentialsError, "invalid_auth"), (SimplipyError, "unknown")],
)
async def test_step_user_errors(hass, credentials_config, error_string, exc):
"""Test that errors during the user step are handled."""
with patch(
"homeassistant.components.simplisafe.API.async_from_credentials",
side_effect=exc,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["step_id"] == "user"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=credentials_config
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"base": error_string}
@pytest.mark.parametrize("api_auth_state", [AuthStates.PENDING_2FA_EMAIL])
async def test_step_user_email_2fa(
api, hass, config, credentials_config, setup_simplisafe
): ):
"""Test the re-auth step returning a different account from this one.""" """Test the user step with email-based 2FA."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_REAUTH}, data=config
)
assert result["step_id"] == "user"
# Simulate the next auth call returning a different user ID than the one we've
# identified as this entry's unique ID:
api.user_id = "67890"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=config_code
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "wrong_account"
assert len(hass.config_entries.async_entries()) == 1
[config_entry] = hass.config_entries.async_entries(DOMAIN)
assert config_entry.unique_id == "12345"
async def test_step_user(hass, config, config_code, setup_simplisafe):
"""Test the user step."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER} DOMAIN, context={"source": SOURCE_USER}
) )
assert result["step_id"] == "user" assert result["step_id"] == "user"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
# Patch API.async_verify_2fa_email to first return pending, then return all done:
api.async_verify_2fa_email.side_effect = [Verify2FAPending, None]
# Patch the amount of time slept between calls so to not slow down this test:
with patch(
"homeassistant.components.simplisafe.config_flow.DEFAULT_EMAIL_2FA_SLEEP", 0
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=credentials_config
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert len(hass.config_entries.async_entries()) == 1
[config_entry] = hass.config_entries.async_entries(DOMAIN)
assert config_entry.unique_id == USER_ID
assert config_entry.data == config
@pytest.mark.parametrize("api_auth_state", [AuthStates.PENDING_2FA_EMAIL])
async def test_step_user_email_2fa_timeout(
api, hass, config, credentials_config, setup_simplisafe
):
"""Test a timeout during the user step with email-based 2FA."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["step_id"] == "user"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
# Patch API.async_verify_2fa_email to return pending:
api.async_verify_2fa_email.side_effect = Verify2FAPending
# Patch the amount of time slept between calls and the timeout duration so to not
# slow down this test:
with patch(
"homeassistant.components.simplisafe.config_flow.DEFAULT_EMAIL_2FA_SLEEP", 0
), patch(
"homeassistant.components.simplisafe.config_flow.DEFAULT_EMAIL_2FA_TIMEOUT", 0
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=credentials_config
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"base": "2fa_timed_out"}
async def test_step_user_sms_2fa(
hass, config, credentials_config, setup_simplisafe, sms_config
):
"""Test the user step with SMS-based 2FA."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["step_id"] == "user"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=config_code result["flow_id"], user_input=credentials_config
)
assert result["step_id"] == "sms_2fa"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=sms_config
) )
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert len(hass.config_entries.async_entries()) == 1 assert len(hass.config_entries.async_entries()) == 1
[config_entry] = hass.config_entries.async_entries(DOMAIN) [config_entry] = hass.config_entries.async_entries(DOMAIN)
assert config_entry.unique_id == USER_ID
assert config_entry.data == config assert config_entry.data == config
@pytest.mark.parametrize(
"exc,error_string", [(InvalidCredentialsError, "invalid_auth")]
)
async def test_step_user_sms_2fa_errors(
api,
hass,
config,
credentials_config,
error_string,
exc,
setup_simplisafe,
sms_config,
):
"""Test that errors during the SMS-based 2FA step are handled."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["step_id"] == "user"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=credentials_config
)
assert result["step_id"] == "sms_2fa"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
# Simulate entering the incorrect SMS code:
api.async_verify_2fa_sms.side_effect = InvalidCredentialsError
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=sms_config
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"code": error_string}