Add 2FA support for Subaru integration setup (#68753)

* Add 2FA support for Subaru integration setup

* Update config flow to abort with 2FA request fail
This commit is contained in:
Garrett 2022-03-30 07:53:03 -04:00 committed by GitHub
parent bb7593351b
commit ab0abdc988
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 238 additions and 20 deletions

View file

@ -19,6 +19,8 @@ from homeassistant.helpers import aiohttp_client, config_validation as cv
from .const import CONF_COUNTRY, CONF_UPDATE_ENABLED, DOMAIN
_LOGGER = logging.getLogger(__name__)
CONF_CONTACT_METHOD = "contact_method"
CONF_VALIDATION_CODE = "validation_code"
PIN_SCHEMA = vol.Schema({vol.Required(CONF_PIN): str})
@ -47,6 +49,9 @@ class SubaruConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
_LOGGER.error("Unable to communicate with Subaru API: %s", ex.message)
return self.async_abort(reason="cannot_connect")
else:
if not self.controller.device_registered:
_LOGGER.debug("2FA validation is required")
return await self.async_step_two_factor()
if self.controller.is_pin_required():
return await self.async_step_pin()
return self.async_create_entry(
@ -103,13 +108,60 @@ class SubaruConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
device_name=device_name,
country=data[CONF_COUNTRY],
)
_LOGGER.debug(
"Setting up first time connection to Subaru API. This may take up to 20 seconds"
)
_LOGGER.debug("Setting up first time connection to Subaru API")
if await self.controller.connect():
_LOGGER.debug("Successfully authenticated and authorized with Subaru API")
_LOGGER.debug("Successfully authenticated with Subaru API")
self.config_data.update(data)
async def async_step_two_factor(self, user_input=None):
"""Select contact method and request 2FA code from Subaru."""
error = None
if user_input:
# self.controller.contact_methods is a dict:
# {"phone":"555-555-5555", "userName":"my@email.com"}
selected_method = next(
k
for k, v in self.controller.contact_methods.items()
if v == user_input[CONF_CONTACT_METHOD]
)
if await self.controller.request_auth_code(selected_method):
return await self.async_step_two_factor_validate()
return self.async_abort(reason="two_factor_request_failed")
data_schema = vol.Schema(
{
vol.Required(CONF_CONTACT_METHOD): vol.In(
list(self.controller.contact_methods.values())
)
}
)
return self.async_show_form(
step_id="two_factor", data_schema=data_schema, errors=error
)
async def async_step_two_factor_validate(self, user_input=None):
"""Validate received 2FA code with Subaru."""
error = None
if user_input:
try:
vol.Match(r"^[0-9]{6}$")(user_input[CONF_VALIDATION_CODE])
if await self.controller.submit_auth_code(
user_input[CONF_VALIDATION_CODE]
):
if self.controller.is_pin_required():
return await self.async_step_pin()
return self.async_create_entry(
title=self.config_data[CONF_USERNAME], data=self.config_data
)
error = {"base": "incorrect_validation_code"}
except vol.Invalid:
error = {"base": "bad_validation_code_format"}
data_schema = vol.Schema({vol.Required(CONF_VALIDATION_CODE): str})
return self.async_show_form(
step_id="two_factor_validate", data_schema=data_schema, errors=error
)
async def async_step_pin(self, user_input=None):
"""Handle second part of config flow, if required."""
error = None

View file

@ -3,7 +3,7 @@
"name": "Subaru",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/subaru",
"requirements": ["subarulink==0.4.2"],
"requirements": ["subarulink==0.5.0"],
"codeowners": ["@G-Two"],
"iot_class": "cloud_polling",
"loggers": ["stdiomask", "subarulink"]

View file

@ -10,6 +10,20 @@
"country": "Select country"
}
},
"two_factor": {
"title": "Subaru Starlink Configuration",
"description": "Two factor authentication required",
"data": {
"contact_method": "Please select a contact method:"
}
},
"two_factor_validate": {
"title": "Subaru Starlink Configuration",
"description": "Please enter validation code received",
"data": {
"validation_code": "Validation code"
}
},
"pin": {
"title": "Subaru Starlink Configuration",
"description": "Please enter your MySubaru PIN\nNOTE: All vehicles in account must have the same PIN",
@ -22,7 +36,10 @@
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"incorrect_pin": "Incorrect PIN",
"bad_pin_format": "PIN should be 4 digits"
"bad_pin_format": "PIN should be 4 digits",
"two_factor_request_failed": "Request for 2FA code failed, please try again",
"bad_validation_code_format": "Validation code should be 6 digits",
"incorrect_validation_code": "Incorrect validation code"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]",

View file

@ -2224,7 +2224,7 @@ streamlabswater==1.0.1
stringcase==1.2.0
# homeassistant.components.subaru
subarulink==0.4.2
subarulink==0.5.0
# homeassistant.components.ecovacs
sucks==0.9.4

View file

@ -1440,7 +1440,7 @@ stookalert==0.1.4
stringcase==1.2.0
# homeassistant.components.subaru
subarulink==0.4.2
subarulink==0.5.0
# homeassistant.components.solarlog
sunwatcher==0.2.1

View file

@ -28,6 +28,10 @@ from .api_responses import TEST_VIN_2_EV, VEHICLE_DATA, VEHICLE_STATUS_EV
from tests.common import MockConfigEntry, async_fire_time_changed
MOCK_API = "homeassistant.components.subaru.SubaruAPI."
MOCK_API_DEVICE_REGISTERED = f"{MOCK_API}device_registered"
MOCK_API_2FA_CONTACTS = f"{MOCK_API}contact_methods"
MOCK_API_2FA_REQUEST = f"{MOCK_API}request_auth_code"
MOCK_API_2FA_VERIFY = f"{MOCK_API}submit_auth_code"
MOCK_API_CONNECT = f"{MOCK_API}connect"
MOCK_API_IS_PIN_REQUIRED = f"{MOCK_API}is_pin_required"
MOCK_API_TEST_PIN = f"{MOCK_API}test_pin"

View file

@ -2,7 +2,7 @@
# pylint: disable=redefined-outer-name
from copy import deepcopy
from unittest import mock
from unittest.mock import patch
from unittest.mock import PropertyMock, patch
import pytest
from subarulink.exceptions import InvalidCredentials, InvalidPIN, SubaruException
@ -14,7 +14,11 @@ from homeassistant.const import CONF_DEVICE_ID, CONF_PIN
from homeassistant.setup import async_setup_component
from .conftest import (
MOCK_API_2FA_CONTACTS,
MOCK_API_2FA_REQUEST,
MOCK_API_2FA_VERIFY,
MOCK_API_CONNECT,
MOCK_API_DEVICE_REGISTERED,
MOCK_API_IS_PIN_REQUIRED,
MOCK_API_TEST_PIN,
MOCK_API_UPDATE_SAVED_PIN,
@ -28,6 +32,10 @@ from .conftest import (
from tests.common import MockConfigEntry
ASYNC_SETUP_ENTRY = "homeassistant.components.subaru.async_setup_entry"
MOCK_2FA_CONTACTS = {
"phone": "123-123-1234",
"userName": "email@addr.com",
}
async def test_user_form_init(user_form):
@ -89,19 +97,22 @@ async def test_user_form_invalid_auth(hass, user_form):
assert result["errors"] == {"base": "invalid_auth"}
async def test_user_form_pin_not_required(hass, user_form):
async def test_user_form_pin_not_required(hass, two_factor_verify_form):
"""Test successful login when no PIN is required."""
with patch(MOCK_API_CONNECT, return_value=True,) as mock_connect, patch(
with patch(
MOCK_API_2FA_VERIFY,
return_value=True,
) as mock_two_factor_verify, patch(
MOCK_API_IS_PIN_REQUIRED,
return_value=False,
) as mock_is_pin_required, patch(
ASYNC_SETUP_ENTRY, return_value=True
) as mock_setup_entry:
result = await hass.config_entries.flow.async_configure(
user_form["flow_id"],
TEST_CREDS,
two_factor_verify_form["flow_id"],
user_input={config_flow.CONF_VALIDATION_CODE: "123456"},
)
assert len(mock_connect.mock_calls) == 1
assert len(mock_two_factor_verify.mock_calls) == 1
assert len(mock_is_pin_required.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1
@ -117,11 +128,118 @@ async def test_user_form_pin_not_required(hass, user_form):
"data": deepcopy(TEST_CONFIG),
"options": {},
}
expected["data"][CONF_PIN] = None
result["data"][CONF_DEVICE_ID] = TEST_DEVICE_ID
assert result == expected
async def test_registered_pin_required(hass, user_form):
"""Test if the device is already registered and PIN required."""
with patch(MOCK_API_CONNECT, return_value=True), patch(
MOCK_API_DEVICE_REGISTERED, new_callable=PropertyMock
) as mock_device_registered, patch(MOCK_API_IS_PIN_REQUIRED, return_value=True):
mock_device_registered.return_value = True
await hass.config_entries.flow.async_configure(
user_form["flow_id"], user_input=TEST_CREDS
)
async def test_registered_no_pin_required(hass, user_form):
"""Test if the device is already registered and PIN not required."""
with patch(MOCK_API_CONNECT, return_value=True), patch(
MOCK_API_DEVICE_REGISTERED, new_callable=PropertyMock
) as mock_device_registered, patch(MOCK_API_IS_PIN_REQUIRED, return_value=False):
mock_device_registered.return_value = True
await hass.config_entries.flow.async_configure(
user_form["flow_id"], user_input=TEST_CREDS
)
async def test_two_factor_request_success(hass, two_factor_start_form):
"""Test two factor contact method selection."""
with patch(
MOCK_API_2FA_REQUEST,
return_value=True,
) as mock_two_factor_request, patch(
MOCK_API_2FA_CONTACTS, new_callable=PropertyMock
) as mock_contacts:
mock_contacts.return_value = MOCK_2FA_CONTACTS
await hass.config_entries.flow.async_configure(
two_factor_start_form["flow_id"],
user_input={config_flow.CONF_CONTACT_METHOD: "email@addr.com"},
)
assert len(mock_two_factor_request.mock_calls) == 1
async def test_two_factor_request_fail(hass, two_factor_start_form):
"""Test two factor auth request failure."""
with patch(
MOCK_API_2FA_REQUEST,
return_value=False,
) as mock_two_factor_request, patch(
MOCK_API_2FA_CONTACTS, new_callable=PropertyMock
) as mock_contacts:
mock_contacts.return_value = MOCK_2FA_CONTACTS
result = await hass.config_entries.flow.async_configure(
two_factor_start_form["flow_id"],
user_input={config_flow.CONF_CONTACT_METHOD: "email@addr.com"},
)
assert len(mock_two_factor_request.mock_calls) == 1
assert result["type"] == "abort"
assert result["reason"] == "two_factor_request_failed"
async def test_two_factor_verify_success(hass, two_factor_verify_form):
"""Test two factor verification."""
with patch(
MOCK_API_2FA_VERIFY,
return_value=True,
) as mock_two_factor_verify, patch(
MOCK_API_IS_PIN_REQUIRED, return_value=True
) as mock_is_in_required:
await hass.config_entries.flow.async_configure(
two_factor_verify_form["flow_id"],
user_input={config_flow.CONF_VALIDATION_CODE: "123456"},
)
assert len(mock_two_factor_verify.mock_calls) == 1
assert len(mock_is_in_required.mock_calls) == 1
async def test_two_factor_verify_bad_format(hass, two_factor_verify_form):
"""Test two factor verification bad format."""
with patch(
MOCK_API_2FA_VERIFY,
return_value=False,
) as mock_two_factor_verify, patch(
MOCK_API_IS_PIN_REQUIRED, return_value=True
) as mock_is_pin_required:
result = await hass.config_entries.flow.async_configure(
two_factor_verify_form["flow_id"],
user_input={config_flow.CONF_VALIDATION_CODE: "1234567"},
)
assert len(mock_two_factor_verify.mock_calls) == 0
assert len(mock_is_pin_required.mock_calls) == 0
assert result["errors"] == {"base": "bad_validation_code_format"}
async def test_two_factor_verify_fail(hass, two_factor_verify_form):
"""Test two factor verification failure."""
with patch(
MOCK_API_2FA_VERIFY,
return_value=False,
) as mock_two_factor_verify, patch(
MOCK_API_IS_PIN_REQUIRED, return_value=True
) as mock_is_pin_required:
result = await hass.config_entries.flow.async_configure(
two_factor_verify_form["flow_id"],
user_input={config_flow.CONF_VALIDATION_CODE: "123456"},
)
assert len(mock_two_factor_verify.mock_calls) == 1
assert len(mock_is_pin_required.mock_calls) == 0
assert result["errors"] == {"base": "incorrect_validation_code"}
async def test_pin_form_init(pin_form):
"""Test the pin entry form for second step of the config flow."""
expected = {
@ -232,17 +350,44 @@ async def user_form(hass):
@pytest.fixture
async def pin_form(hass, user_form):
"""Return second form (PIN input) for Subaru config flow."""
with patch(MOCK_API_CONNECT, return_value=True,), patch(
MOCK_API_IS_PIN_REQUIRED,
return_value=True,
):
async def two_factor_start_form(hass, user_form):
"""Return two factor form for Subaru config flow."""
with patch(MOCK_API_CONNECT, return_value=True), patch(
MOCK_API_2FA_CONTACTS, new_callable=PropertyMock
) as mock_contacts:
mock_contacts.return_value = MOCK_2FA_CONTACTS
return await hass.config_entries.flow.async_configure(
user_form["flow_id"], user_input=TEST_CREDS
)
@pytest.fixture
async def two_factor_verify_form(hass, two_factor_start_form):
"""Return two factor form for Subaru config flow."""
with patch(
MOCK_API_2FA_REQUEST,
return_value=True,
), patch(MOCK_API_2FA_CONTACTS, new_callable=PropertyMock) as mock_contacts:
mock_contacts.return_value = MOCK_2FA_CONTACTS
return await hass.config_entries.flow.async_configure(
two_factor_start_form["flow_id"],
user_input={config_flow.CONF_CONTACT_METHOD: "email@addr.com"},
)
@pytest.fixture
async def pin_form(hass, two_factor_verify_form):
"""Return PIN input form for Subaru config flow."""
with patch(
MOCK_API_2FA_VERIFY,
return_value=True,
), patch(MOCK_API_IS_PIN_REQUIRED, return_value=True):
return await hass.config_entries.flow.async_configure(
two_factor_verify_form["flow_id"],
user_input={config_flow.CONF_VALIDATION_CODE: "123456"},
)
@pytest.fixture
async def options_form(hass):
"""Return options form for Subaru config flow."""