Add support for iCloud 2FA (#45818)

* Add support for iCloud 2FA

* Updated dependency for iCloud

* Updated dependency and logic fix

* Added logic for handling incorrect 2FA code

* Bug fix on failing test

* Added myself to codeowners

* Added check for 2FA on setup

* Updated error message
This commit is contained in:
Niccolo Zapponi 2021-02-03 18:18:31 +00:00 committed by GitHub
parent 4b208746e5
commit a775b79d4b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 164 additions and 30 deletions

View file

@ -211,7 +211,7 @@ homeassistant/components/hydrawise/* @ptcryan
homeassistant/components/hyperion/* @dermotduffy
homeassistant/components/iammeter/* @lewei50
homeassistant/components/iaqualink/* @flz
homeassistant/components/icloud/* @Quentame
homeassistant/components/icloud/* @Quentame @nzapponi
homeassistant/components/ign_sismologia/* @exxamalte
homeassistant/components/image/* @home-assistant/core
homeassistant/components/incomfort/* @zxdavb

View file

@ -113,6 +113,12 @@ class IcloudAccount:
self._icloud_dir.path,
with_family=self._with_family,
)
if not self.api.is_trusted_session or self.api.requires_2fa:
# Session is no longer trusted
# Trigger a new log in to ensure the user enters the 2FA code again.
raise PyiCloudFailedLoginException
except PyiCloudFailedLoginException:
self.api = None
# Login failed which means credentials need to be updated.
@ -125,16 +131,7 @@ class IcloudAccount:
self._config_entry.data[CONF_USERNAME],
)
self.hass.add_job(
self.hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_REAUTH},
data={
**self._config_entry.data,
"unique_id": self._config_entry.unique_id,
},
)
)
self._require_reauth()
return
try:
@ -165,6 +162,10 @@ class IcloudAccount:
if self.api is None:
return
if not self.api.is_trusted_session or self.api.requires_2fa:
self._require_reauth()
return
api_devices = {}
try:
api_devices = self.api.devices
@ -228,6 +229,19 @@ class IcloudAccount:
utcnow() + timedelta(minutes=self._fetch_interval),
)
def _require_reauth(self):
"""Require the user to log in again."""
self.hass.add_job(
self.hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_REAUTH},
data={
**self._config_entry.data,
"unique_id": self._config_entry.unique_id,
},
)
)
def _determine_interval(self) -> int:
"""Calculate new interval between two API fetch (in minutes)."""
intervals = {"default": self._max_interval}

View file

@ -125,6 +125,9 @@ class IcloudFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
errors = {CONF_PASSWORD: "invalid_auth"}
return self._show_setup_form(user_input, errors, step_id)
if self.api.requires_2fa:
return await self.async_step_verification_code()
if self.api.requires_2sa:
return await self.async_step_trusted_device()
@ -243,22 +246,29 @@ class IcloudFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
errors=errors or {},
)
async def async_step_verification_code(self, user_input=None):
async def async_step_verification_code(self, user_input=None, errors=None):
"""Ask the verification code to the user."""
errors = {}
if errors is None:
errors = {}
if user_input is None:
return await self._show_verification_code_form(user_input)
return await self._show_verification_code_form(user_input, errors)
self._verification_code = user_input[CONF_VERIFICATION_CODE]
try:
if not await self.hass.async_add_executor_job(
self.api.validate_verification_code,
self._trusted_device,
self._verification_code,
):
raise PyiCloudException("The code you entered is not valid.")
if self.api.requires_2fa:
if not await self.hass.async_add_executor_job(
self.api.validate_2fa_code, self._verification_code
):
raise PyiCloudException("The code you entered is not valid.")
else:
if not await self.hass.async_add_executor_job(
self.api.validate_verification_code,
self._trusted_device,
self._verification_code,
):
raise PyiCloudException("The code you entered is not valid.")
except PyiCloudException as error:
# Reset to the initial 2FA state to allow the user to retry
_LOGGER.error("Failed to verify verification code: %s", error)
@ -266,7 +276,27 @@ class IcloudFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
self._verification_code = None
errors["base"] = "validate_verification_code"
return await self.async_step_trusted_device(None, errors)
if self.api.requires_2fa:
try:
self.api = await self.hass.async_add_executor_job(
PyiCloudService,
self._username,
self._password,
self.hass.helpers.storage.Store(
STORAGE_VERSION, STORAGE_KEY
).path,
True,
None,
self._with_family,
)
return await self.async_step_verification_code(None, errors)
except PyiCloudFailedLoginException as error:
_LOGGER.error("Error logging into iCloud service: %s", error)
self.api = None
errors = {CONF_PASSWORD: "invalid_auth"}
return self._show_setup_form(user_input, errors, "user")
else:
return await self.async_step_trusted_device(None, errors)
return await self.async_step_user(
{
@ -278,11 +308,11 @@ class IcloudFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
}
)
async def _show_verification_code_form(self, user_input=None):
async def _show_verification_code_form(self, user_input=None, errors=None):
"""Show the verification_code form to the user."""
return self.async_show_form(
step_id=CONF_VERIFICATION_CODE,
data_schema=vol.Schema({vol.Required(CONF_VERIFICATION_CODE): str}),
errors=None,
errors=errors or {},
)

View file

@ -12,7 +12,7 @@ DEFAULT_GPS_ACCURACY_THRESHOLD = 500 # meters
# to store the cookie
STORAGE_KEY = DOMAIN
STORAGE_VERSION = 1
STORAGE_VERSION = 2
PLATFORMS = ["device_tracker", "sensor"]

View file

@ -3,6 +3,6 @@
"name": "Apple iCloud",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/icloud",
"requirements": ["pyicloud==0.9.7"],
"codeowners": ["@Quentame"]
"requirements": ["pyicloud==0.10.2"],
"codeowners": ["@Quentame", "@nzapponi"]
}

View file

@ -35,7 +35,7 @@
"error": {
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"send_verification_code": "Failed to send verification code",
"validate_verification_code": "Failed to verify your verification code, choose a trust device and start the verification again"
"validate_verification_code": "Failed to verify your verification code, try again"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]",

View file

@ -8,7 +8,7 @@
"error": {
"invalid_auth": "Invalid authentication",
"send_verification_code": "Failed to send verification code",
"validate_verification_code": "Failed to verify your verification code, choose a trust device and start the verification again"
"validate_verification_code": "Failed to verify your verification code, try again"
},
"step": {
"reauth": {

View file

@ -1443,7 +1443,7 @@ pyhomematic==0.1.71
pyhomeworks==0.0.6
# homeassistant.components.icloud
pyicloud==0.9.7
pyicloud==0.10.2
# homeassistant.components.insteon
pyinsteon==1.0.9

View file

@ -745,7 +745,7 @@ pyheos==0.7.2
pyhomematic==0.1.71
# homeassistant.components.icloud
pyicloud==0.9.7
pyicloud==0.10.2
# homeassistant.components.insteon
pyinsteon==1.0.9

View file

@ -51,6 +51,7 @@ def mock_controller_service():
with patch(
"homeassistant.components.icloud.config_flow.PyiCloudService"
) as service_mock:
service_mock.return_value.requires_2fa = False
service_mock.return_value.requires_2sa = True
service_mock.return_value.trusted_devices = TRUSTED_DEVICES
service_mock.return_value.send_verification_code = Mock(return_value=True)
@ -58,15 +59,31 @@ def mock_controller_service():
yield service_mock
@pytest.fixture(name="service_2fa")
def mock_controller_2fa_service():
"""Mock a successful 2fa service."""
with patch(
"homeassistant.components.icloud.config_flow.PyiCloudService"
) as service_mock:
service_mock.return_value.requires_2fa = True
service_mock.return_value.requires_2sa = True
service_mock.return_value.validate_2fa_code = Mock(return_value=True)
service_mock.return_value.is_trusted_session = False
yield service_mock
@pytest.fixture(name="service_authenticated")
def mock_controller_service_authenticated():
"""Mock a successful service while already authenticate."""
with patch(
"homeassistant.components.icloud.config_flow.PyiCloudService"
) as service_mock:
service_mock.return_value.requires_2fa = False
service_mock.return_value.requires_2sa = False
service_mock.return_value.is_trusted_session = True
service_mock.return_value.trusted_devices = TRUSTED_DEVICES
service_mock.return_value.send_verification_code = Mock(return_value=True)
service_mock.return_value.validate_2fa_code = Mock(return_value=True)
service_mock.return_value.validate_verification_code = Mock(return_value=True)
yield service_mock
@ -77,6 +94,7 @@ def mock_controller_service_authenticated_no_device():
with patch(
"homeassistant.components.icloud.config_flow.PyiCloudService"
) as service_mock:
service_mock.return_value.requires_2fa = False
service_mock.return_value.requires_2sa = False
service_mock.return_value.trusted_devices = TRUSTED_DEVICES
service_mock.return_value.send_verification_code = Mock(return_value=True)
@ -85,24 +103,53 @@ def mock_controller_service_authenticated_no_device():
yield service_mock
@pytest.fixture(name="service_authenticated_not_trusted")
def mock_controller_service_authenticated_not_trusted():
"""Mock a successful service while already authenticated, but the session is not trusted."""
with patch(
"homeassistant.components.icloud.config_flow.PyiCloudService"
) as service_mock:
service_mock.return_value.requires_2fa = False
service_mock.return_value.requires_2sa = False
service_mock.return_value.is_trusted_session = False
service_mock.return_value.trusted_devices = TRUSTED_DEVICES
service_mock.return_value.send_verification_code = Mock(return_value=True)
service_mock.return_value.validate_2fa_code = Mock(return_value=True)
service_mock.return_value.validate_verification_code = Mock(return_value=True)
yield service_mock
@pytest.fixture(name="service_send_verification_code_failed")
def mock_controller_service_send_verification_code_failed():
"""Mock a failed service during sending verification code step."""
with patch(
"homeassistant.components.icloud.config_flow.PyiCloudService"
) as service_mock:
service_mock.return_value.requires_2fa = False
service_mock.return_value.requires_2sa = True
service_mock.return_value.trusted_devices = TRUSTED_DEVICES
service_mock.return_value.send_verification_code = Mock(return_value=False)
yield service_mock
@pytest.fixture(name="service_validate_2fa_code_failed")
def mock_controller_service_validate_2fa_code_failed():
"""Mock a failed service during validation of 2FA verification code step."""
with patch(
"homeassistant.components.icloud.config_flow.PyiCloudService"
) as service_mock:
service_mock.return_value.requires_2fa = True
service_mock.return_value.validate_2fa_code = Mock(return_value=False)
yield service_mock
@pytest.fixture(name="service_validate_verification_code_failed")
def mock_controller_service_validate_verification_code_failed():
"""Mock a failed service during validation of verification code step."""
with patch(
"homeassistant.components.icloud.config_flow.PyiCloudService"
) as service_mock:
service_mock.return_value.requires_2fa = False
service_mock.return_value.requires_2sa = True
service_mock.return_value.trusted_devices = TRUSTED_DEVICES
service_mock.return_value.send_verification_code = Mock(return_value=True)
@ -409,6 +456,49 @@ async def test_validate_verification_code_failed(
assert result["errors"] == {"base": "validate_verification_code"}
async def test_2fa_code_success(hass: HomeAssistantType, service_2fa: MagicMock):
"""Test 2fa step success."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
data={CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD},
)
service_2fa.return_value.requires_2fa = False
service_2fa.return_value.requires_2sa = False
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_VERIFICATION_CODE: "0"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["result"].unique_id == USERNAME
assert result["title"] == USERNAME
assert result["data"][CONF_USERNAME] == USERNAME
assert result["data"][CONF_PASSWORD] == PASSWORD
assert result["data"][CONF_WITH_FAMILY] == DEFAULT_WITH_FAMILY
assert result["data"][CONF_MAX_INTERVAL] == DEFAULT_MAX_INTERVAL
assert result["data"][CONF_GPS_ACCURACY_THRESHOLD] == DEFAULT_GPS_ACCURACY_THRESHOLD
async def test_validate_2fa_code_failed(
hass: HomeAssistantType, service_validate_2fa_code_failed: MagicMock
):
"""Test when we have errors during validate_verification_code."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
data={CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD},
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_VERIFICATION_CODE: "0"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == CONF_VERIFICATION_CODE
assert result["errors"] == {"base": "validate_verification_code"}
async def test_password_update(
hass: HomeAssistantType, service_authenticated: MagicMock
):