diff --git a/homeassistant/auth/__init__.py b/homeassistant/auth/__init__.py index e0b7b377b1f..952bb3b8352 100644 --- a/homeassistant/auth/__init__.py +++ b/homeassistant/auth/__init__.py @@ -249,13 +249,13 @@ class AuthManager: await module.async_depose_user(user.id) - async def async_get_enabled_mfa(self, user: models.User) -> List[str]: + async def async_get_enabled_mfa(self, user: models.User) -> Dict[str, str]: """List enabled mfa modules for user.""" - module_ids = [] + modules = OrderedDict() # type: Dict[str, str] for module_id, module in self._mfa_modules.items(): if await module.async_is_user_setup(user.id): - module_ids.append(module_id) - return module_ids + modules[module_id] = module.name + return modules async def async_create_refresh_token(self, user: models.User, client_id: Optional[str] = None) \ diff --git a/homeassistant/auth/mfa_modules/totp.py b/homeassistant/auth/mfa_modules/totp.py new file mode 100644 index 00000000000..48531863c1a --- /dev/null +++ b/homeassistant/auth/mfa_modules/totp.py @@ -0,0 +1,212 @@ +"""Time-based One Time Password auth module.""" +import logging +from io import BytesIO +from typing import Any, Dict, Optional, Tuple # noqa: F401 + +import voluptuous as vol + +from homeassistant.auth.models import User +from homeassistant.core import HomeAssistant + +from . import MultiFactorAuthModule, MULTI_FACTOR_AUTH_MODULES, \ + MULTI_FACTOR_AUTH_MODULE_SCHEMA, SetupFlow + +REQUIREMENTS = ['pyotp==2.2.6', 'PyQRCode==1.2.1'] + +CONFIG_SCHEMA = MULTI_FACTOR_AUTH_MODULE_SCHEMA.extend({ +}, extra=vol.PREVENT_EXTRA) + +STORAGE_VERSION = 1 +STORAGE_KEY = 'auth_module.totp' +STORAGE_USERS = 'users' +STORAGE_USER_ID = 'user_id' +STORAGE_OTA_SECRET = 'ota_secret' + +INPUT_FIELD_CODE = 'code' + +DUMMY_SECRET = 'FPPTH34D4E3MI2HG' + +_LOGGER = logging.getLogger(__name__) + + +def _generate_qr_code(data: str) -> str: + """Generate a base64 PNG string represent QR Code image of data.""" + import pyqrcode + + qr_code = pyqrcode.create(data) + + with BytesIO() as buffer: + qr_code.svg(file=buffer, scale=4) + return '{}'.format( + buffer.getvalue().decode("ascii").replace('\n', '') + .replace('' + ' Tuple[str, str, str]: + """Generate a secret, url, and QR code.""" + import pyotp + + ota_secret = pyotp.random_base32() + url = pyotp.totp.TOTP(ota_secret).provisioning_uri( + username, issuer_name="Home Assistant") + image = _generate_qr_code(url) + return ota_secret, url, image + + +@MULTI_FACTOR_AUTH_MODULES.register('totp') +class TotpAuthModule(MultiFactorAuthModule): + """Auth module validate time-based one time password.""" + + DEFAULT_TITLE = 'Time-based One Time Password' + + def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None: + """Initialize the user data store.""" + super().__init__(hass, config) + self._users = None # type: Optional[Dict[str, str]] + self._user_store = hass.helpers.storage.Store( + STORAGE_VERSION, STORAGE_KEY) + + @property + def input_schema(self) -> vol.Schema: + """Validate login flow input data.""" + return vol.Schema({INPUT_FIELD_CODE: str}) + + async def _async_load(self) -> None: + """Load stored data.""" + data = await self._user_store.async_load() + + if data is None: + data = {STORAGE_USERS: {}} + + self._users = data.get(STORAGE_USERS, {}) + + async def _async_save(self) -> None: + """Save data.""" + await self._user_store.async_save({STORAGE_USERS: self._users}) + + def _add_ota_secret(self, user_id: str, + secret: Optional[str] = None) -> str: + """Create a ota_secret for user.""" + import pyotp + + ota_secret = secret or pyotp.random_base32() # type: str + + self._users[user_id] = ota_secret # type: ignore + return ota_secret + + async def async_setup_flow(self, user_id: str) -> SetupFlow: + """Return a data entry flow handler for setup module. + + Mfa module should extend SetupFlow + """ + user = await self.hass.auth.async_get_user(user_id) # type: ignore + return TotpSetupFlow(self, self.input_schema, user) + + async def async_setup_user(self, user_id: str, setup_data: Any) -> str: + """Set up auth module for user.""" + if self._users is None: + await self._async_load() + + result = await self.hass.async_add_executor_job( + self._add_ota_secret, user_id, setup_data.get('secret')) + + await self._async_save() + return result + + async def async_depose_user(self, user_id: str) -> None: + """Depose auth module for user.""" + if self._users is None: + await self._async_load() + + if self._users.pop(user_id, None): # type: ignore + await self._async_save() + + async def async_is_user_setup(self, user_id: str) -> bool: + """Return whether user is setup.""" + if self._users is None: + await self._async_load() + + return user_id in self._users # type: ignore + + async def async_validation( + self, user_id: str, user_input: Dict[str, Any]) -> bool: + """Return True if validation passed.""" + if self._users is None: + await self._async_load() + + # user_input has been validate in caller + return await self.hass.async_add_executor_job( + self._validate_2fa, user_id, user_input[INPUT_FIELD_CODE]) + + def _validate_2fa(self, user_id: str, code: str) -> bool: + """Validate two factor authentication code.""" + import pyotp + + ota_secret = self._users.get(user_id) # type: ignore + if ota_secret is None: + # even we cannot find user, we still do verify + # to make timing the same as if user was found. + pyotp.TOTP(DUMMY_SECRET).verify(code) + return False + + return bool(pyotp.TOTP(ota_secret).verify(code)) + + +class TotpSetupFlow(SetupFlow): + """Handler for the setup flow.""" + + def __init__(self, auth_module: TotpAuthModule, + setup_schema: vol.Schema, + user: User) -> None: + """Initialize the setup flow.""" + super().__init__(auth_module, setup_schema, user.id) + # to fix typing complaint + self._auth_module = auth_module # type: TotpAuthModule + self._user = user + self._ota_secret = None # type: Optional[str] + self._url = None # type Optional[str] + self._image = None # type Optional[str] + + async def async_step_init( + self, user_input: Optional[Dict[str, str]] = None) \ + -> Dict[str, Any]: + """Handle the first step of setup flow. + + Return self.async_show_form(step_id='init') if user_input == None. + Return self.async_create_entry(data={'result': result}) if finish. + """ + import pyotp + + errors = {} # type: Dict[str, str] + + if user_input: + verified = await self.hass.async_add_executor_job( # type: ignore + pyotp.TOTP(self._ota_secret).verify, user_input['code']) + if verified: + result = await self._auth_module.async_setup_user( + self._user_id, {'secret': self._ota_secret}) + return self.async_create_entry( + title=self._auth_module.name, + data={'result': result} + ) + + errors['base'] = 'invalid_code' + + else: + hass = self._auth_module.hass + self._ota_secret, self._url, self._image = \ + await hass.async_add_executor_job( # type: ignore + _generate_secret_and_qr_code, str(self._user.name)) + + return self.async_show_form( + step_id='init', + data_schema=self._setup_schema, + description_placeholders={ + 'code': self._ota_secret, + 'url': self._url, + 'qr_code': self._image + }, + errors=errors + ) diff --git a/homeassistant/auth/providers/__init__.py b/homeassistant/auth/providers/__init__.py index e8ef7cbf3d4..0bcb47d4af9 100644 --- a/homeassistant/auth/providers/__init__.py +++ b/homeassistant/auth/providers/__init__.py @@ -168,7 +168,7 @@ class LoginFlow(data_entry_flow.FlowHandler): self._auth_provider = auth_provider self._auth_module_id = None # type: Optional[str] self._auth_manager = auth_provider.hass.auth # type: ignore - self.available_mfa_modules = [] # type: List + self.available_mfa_modules = {} # type: Dict[str, str] self.created_at = dt_util.utcnow() self.user = None # type: Optional[User] @@ -196,7 +196,7 @@ class LoginFlow(data_entry_flow.FlowHandler): errors['base'] = 'invalid_auth_module' if len(self.available_mfa_modules) == 1: - self._auth_module_id = self.available_mfa_modules[0] + self._auth_module_id = list(self.available_mfa_modules.keys())[0] return await self.async_step_mfa() return self.async_show_form( diff --git a/homeassistant/components/auth/.translations/en.json b/homeassistant/components/auth/.translations/en.json new file mode 100644 index 00000000000..5c1af67b120 --- /dev/null +++ b/homeassistant/components/auth/.translations/en.json @@ -0,0 +1,16 @@ +{ + "mfa_setup": { + "totp": { + "error": { + "invalid_code": "Invalid code, please try again. If you get this error consistently, please make sure the clock on Home Assistant system is accurate." + }, + "step": { + "init": { + "description": "Scan the QR code with your authentication app, such as **Google Authenticator** or **Authy**. If you have problem to scan the QR code, using **`{code}`** to manual setup. \n\n{qr_code}\n\nEnter the six digi code appeared in your app below to verify the setup:", + "title": "Scan this QR code with your app" + } + }, + "title": "TOTP" + } + } +} \ No newline at end of file diff --git a/homeassistant/components/auth/strings.json b/homeassistant/components/auth/strings.json new file mode 100644 index 00000000000..b0083ab577b --- /dev/null +++ b/homeassistant/components/auth/strings.json @@ -0,0 +1,16 @@ +{ + "mfa_setup":{ + "totp": { + "title": "TOTP", + "step": { + "init": { + "title": "Set up two-factor authentication using TOTP", + "description": "To activate two factor authentication using time-based one-time passwords, scan the QR code with your authentication app. If you don't have one, we recommend either [Google Authenticator](https://support.google.com/accounts/answer/1066447) or [Authy](https://authy.com/).\n\n{qr_code}\n\nAfter scanning the code, enter the six digit code from your app to verify the setup. If you have problems scanning the QR code, do a manual setup with code **`{code}`**." + } + }, + "error": { + "invalid_code": "Invalid code, please try again. If you get this error consistently, please make sure the clock of your Home Assistant system is accurate." + } + } + } +} diff --git a/homeassistant/config.py b/homeassistant/config.py index fe8f8ef0f60..a799094c94d 100644 --- a/homeassistant/config.py +++ b/homeassistant/config.py @@ -427,10 +427,14 @@ async def async_process_ha_core_config( if has_trusted_networks: auth_conf.append({'type': 'trusted_networks'}) + mfa_conf = config.get(CONF_AUTH_MFA_MODULES, [ + {'type': 'totp', 'id': 'totp', 'name': 'Authenticator app'} + ]) + setattr(hass, 'auth', await auth.auth_manager_from_config( hass, auth_conf, - config.get(CONF_AUTH_MFA_MODULES, []))) + mfa_conf)) hac = hass.config diff --git a/requirements_all.txt b/requirements_all.txt index 2f27662b2e7..691bbf62246 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -46,6 +46,9 @@ PyMVGLive==1.1.4 # homeassistant.components.arduino PyMata==2.14 +# homeassistant.auth.mfa_modules.totp +PyQRCode==1.2.1 + # homeassistant.components.sensor.rmvtransport PyRMVtransport==0.0.7 @@ -985,6 +988,7 @@ pyopenuv==1.0.1 # homeassistant.components.iota pyota==2.0.5 +# homeassistant.auth.mfa_modules.totp # homeassistant.components.sensor.otp pyotp==2.2.6 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 69a02d5900d..5fa4af21a62 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -154,6 +154,10 @@ pymonoprice==0.3 # homeassistant.components.binary_sensor.nx584 pynx584==0.4 +# homeassistant.auth.mfa_modules.totp +# homeassistant.components.sensor.otp +pyotp==2.2.6 + # homeassistant.components.qwikswitch pyqwikswitch==0.8 diff --git a/script/gen_requirements_all.py b/script/gen_requirements_all.py index e26393bb800..fe23e638e5b 100755 --- a/script/gen_requirements_all.py +++ b/script/gen_requirements_all.py @@ -78,6 +78,7 @@ TEST_REQUIREMENTS = ( 'pylitejet', 'pymonoprice', 'pynx584', + 'pyotp', 'pyqwikswitch', 'PyRMVtransport', 'python-forecastio', diff --git a/tests/auth/mfa_modules/test_totp.py b/tests/auth/mfa_modules/test_totp.py new file mode 100644 index 00000000000..28e6c949bc4 --- /dev/null +++ b/tests/auth/mfa_modules/test_totp.py @@ -0,0 +1,130 @@ +"""Test the Time-based One Time Password (MFA) auth module.""" +from unittest.mock import patch + +from homeassistant import data_entry_flow +from homeassistant.auth import models as auth_models, auth_manager_from_config +from homeassistant.auth.mfa_modules import auth_mfa_module_from_config +from tests.common import MockUser + +MOCK_CODE = '123456' + + +async def test_validating_mfa(hass): + """Test validating mfa code.""" + totp_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'totp' + }) + await totp_auth_module.async_setup_user('test-user', {}) + + with patch('pyotp.TOTP.verify', return_value=True): + assert await totp_auth_module.async_validation( + 'test-user', {'code': MOCK_CODE}) + + +async def test_validating_mfa_invalid_code(hass): + """Test validating an invalid mfa code.""" + totp_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'totp' + }) + await totp_auth_module.async_setup_user('test-user', {}) + + with patch('pyotp.TOTP.verify', return_value=False): + assert await totp_auth_module.async_validation( + 'test-user', {'code': MOCK_CODE}) is False + + +async def test_validating_mfa_invalid_user(hass): + """Test validating an mfa code with invalid user.""" + totp_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'totp' + }) + await totp_auth_module.async_setup_user('test-user', {}) + + assert await totp_auth_module.async_validation( + 'invalid-user', {'code': MOCK_CODE}) is False + + +async def test_setup_depose_user(hass): + """Test despose user.""" + totp_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'totp' + }) + result = await totp_auth_module.async_setup_user('test-user', {}) + assert len(totp_auth_module._users) == 1 + result2 = await totp_auth_module.async_setup_user('test-user', {}) + assert len(totp_auth_module._users) == 1 + assert result != result2 + + await totp_auth_module.async_depose_user('test-user') + assert len(totp_auth_module._users) == 0 + + result = await totp_auth_module.async_setup_user( + 'test-user2', {'secret': 'secret-code'}) + assert result == 'secret-code' + assert len(totp_auth_module._users) == 1 + + +async def test_login_flow_validates_mfa(hass): + """Test login flow with mfa enabled.""" + hass.auth = await auth_manager_from_config(hass, [{ + 'type': 'insecure_example', + 'users': [{'username': 'test-user', 'password': 'test-pass'}], + }], [{ + 'type': 'totp', + }]) + user = MockUser( + id='mock-user', + is_owner=False, + is_active=False, + name='Paulus', + ).add_to_auth_manager(hass.auth) + await hass.auth.async_link_user(user, auth_models.Credentials( + id='mock-id', + auth_provider_type='insecure_example', + auth_provider_id=None, + data={'username': 'test-user'}, + is_new=False, + )) + + await hass.auth.async_enable_user_mfa(user, 'totp', {}) + + provider = hass.auth.auth_providers[0] + + result = await hass.auth.login_flow.async_init( + (provider.type, provider.id)) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + + result = await hass.auth.login_flow.async_configure(result['flow_id'], { + 'username': 'incorrect-user', + 'password': 'test-pass', + }) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + assert result['errors']['base'] == 'invalid_auth' + + result = await hass.auth.login_flow.async_configure(result['flow_id'], { + 'username': 'test-user', + 'password': 'incorrect-pass', + }) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + assert result['errors']['base'] == 'invalid_auth' + + result = await hass.auth.login_flow.async_configure(result['flow_id'], { + 'username': 'test-user', + 'password': 'test-pass', + }) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + assert result['step_id'] == 'mfa' + assert result['data_schema'].schema.get('code') == str + + with patch('pyotp.TOTP.verify', return_value=False): + result = await hass.auth.login_flow.async_configure( + result['flow_id'], {'code': 'invalid-code'}) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + assert result['step_id'] == 'mfa' + assert result['errors']['base'] == 'invalid_auth' + + with patch('pyotp.TOTP.verify', return_value=True): + result = await hass.auth.login_flow.async_configure( + result['flow_id'], {'code': MOCK_CODE}) + assert result['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result['data'].id == 'mock-user' diff --git a/tests/test_config.py b/tests/test_config.py index 76ea576ac28..3cfe67f70b1 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -16,7 +16,7 @@ from homeassistant.const import ( CONF_LATITUDE, CONF_LONGITUDE, CONF_UNIT_SYSTEM, CONF_NAME, CONF_TIME_ZONE, CONF_ELEVATION, CONF_CUSTOMIZE, __version__, CONF_UNIT_SYSTEM_METRIC, CONF_UNIT_SYSTEM_IMPERIAL, CONF_TEMPERATURE_UNIT, - CONF_AUTH_PROVIDERS) + CONF_AUTH_PROVIDERS, CONF_AUTH_MFA_MODULES) from homeassistant.util import location as location_util, dt as dt_util from homeassistant.util.yaml import SECRET_YAML from homeassistant.util.async_ import run_coroutine_threadsafe @@ -805,6 +805,10 @@ async def test_auth_provider_config(hass): CONF_AUTH_PROVIDERS: [ {'type': 'homeassistant'}, {'type': 'legacy_api_password'}, + ], + CONF_AUTH_MFA_MODULES: [ + {'type': 'totp'}, + {'type': 'totp', 'id': 'second'}, ] } if hasattr(hass, 'auth'): @@ -815,6 +819,9 @@ async def test_auth_provider_config(hass): assert hass.auth.auth_providers[0].type == 'homeassistant' assert hass.auth.auth_providers[1].type == 'legacy_api_password' assert hass.auth.active is True + assert len(hass.auth.auth_mfa_modules) == 2 + assert hass.auth.auth_mfa_modules[0].id == 'totp' + assert hass.auth.auth_mfa_modules[1].id == 'second' async def test_auth_provider_config_default(hass): @@ -834,6 +841,8 @@ async def test_auth_provider_config_default(hass): assert len(hass.auth.auth_providers) == 1 assert hass.auth.auth_providers[0].type == 'homeassistant' assert hass.auth.active is True + assert len(hass.auth.auth_mfa_modules) == 1 + assert hass.auth.auth_mfa_modules[0].id == 'totp' async def test_auth_provider_config_default_api_password(hass):