Support blocking trusted network from new ip (#44630)
Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
This commit is contained in:
parent
e4a7692610
commit
38d2cacf7a
21 changed files with 381 additions and 131 deletions
|
@ -7,7 +7,12 @@ import pytest
|
|||
import voluptuous as vol
|
||||
|
||||
from homeassistant import auth, data_entry_flow
|
||||
from homeassistant.auth import auth_store, const as auth_const, models as auth_models
|
||||
from homeassistant.auth import (
|
||||
InvalidAuthError,
|
||||
auth_store,
|
||||
const as auth_const,
|
||||
models as auth_models,
|
||||
)
|
||||
from homeassistant.auth.const import MFA_SESSION_EXPIRATION
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
@ -162,7 +167,10 @@ async def test_create_new_user(hass):
|
|||
step["flow_id"], {"username": "test-user", "password": "test-pass"}
|
||||
)
|
||||
assert step["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
user = step["result"]
|
||||
credential = step["result"]
|
||||
assert credential is not None
|
||||
|
||||
user = await manager.async_get_or_create_user(credential)
|
||||
assert user is not None
|
||||
assert user.is_owner is False
|
||||
assert user.name == "Test Name"
|
||||
|
@ -229,7 +237,8 @@ async def test_login_as_existing_user(mock_hass):
|
|||
)
|
||||
assert step["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
|
||||
user = step["result"]
|
||||
credential = step["result"]
|
||||
user = await manager.async_get_user_by_credentials(credential)
|
||||
assert user is not None
|
||||
assert user.id == "mock-user"
|
||||
assert user.is_owner is False
|
||||
|
@ -259,7 +268,8 @@ async def test_linking_user_to_two_auth_providers(hass, hass_storage):
|
|||
step = await manager.login_flow.async_configure(
|
||||
step["flow_id"], {"username": "test-user", "password": "test-pass"}
|
||||
)
|
||||
user = step["result"]
|
||||
credential = step["result"]
|
||||
user = await manager.async_get_or_create_user(credential)
|
||||
assert user is not None
|
||||
|
||||
step = await manager.login_flow.async_init(
|
||||
|
@ -293,13 +303,19 @@ async def test_saving_loading(hass, hass_storage):
|
|||
step = await manager.login_flow.async_configure(
|
||||
step["flow_id"], {"username": "test-user", "password": "test-pass"}
|
||||
)
|
||||
user = step["result"]
|
||||
credential = step["result"]
|
||||
user = await manager.async_get_or_create_user(credential)
|
||||
|
||||
await manager.async_activate_user(user)
|
||||
# the first refresh token will be used to create access token
|
||||
refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
|
||||
refresh_token = await manager.async_create_refresh_token(
|
||||
user, CLIENT_ID, credential=credential
|
||||
)
|
||||
manager.async_create_access_token(refresh_token, "192.168.0.1")
|
||||
# the second refresh token will not be used
|
||||
await manager.async_create_refresh_token(user, "dummy-client")
|
||||
await manager.async_create_refresh_token(
|
||||
user, "dummy-client", credential=credential
|
||||
)
|
||||
|
||||
await flush_store(manager._store._store)
|
||||
|
||||
|
@ -452,6 +468,46 @@ async def test_refresh_token_type_long_lived_access_token(hass):
|
|||
assert token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
|
||||
|
||||
|
||||
async def test_refresh_token_provider_validation(mock_hass):
|
||||
"""Test that creating access token from refresh token checks with provider."""
|
||||
manager = await auth.auth_manager_from_config(
|
||||
mock_hass,
|
||||
[
|
||||
{
|
||||
"type": "insecure_example",
|
||||
"users": [{"username": "test-user", "password": "test-pass"}],
|
||||
}
|
||||
],
|
||||
[],
|
||||
)
|
||||
|
||||
credential = auth_models.Credentials(
|
||||
id="mock-credential-id",
|
||||
auth_provider_type="insecure_example",
|
||||
auth_provider_id=None,
|
||||
data={"username": "test-user"},
|
||||
is_new=False,
|
||||
)
|
||||
|
||||
user = MockUser().add_to_auth_manager(manager)
|
||||
user.credentials.append(credential)
|
||||
refresh_token = await manager.async_create_refresh_token(
|
||||
user, CLIENT_ID, credential=credential
|
||||
)
|
||||
ip = "127.0.0.1"
|
||||
|
||||
assert manager.async_create_access_token(refresh_token, ip) is not None
|
||||
|
||||
with patch(
|
||||
"homeassistant.auth.providers.insecure_example.ExampleAuthProvider.async_validate_refresh_token",
|
||||
side_effect=InvalidAuthError("Invalid access"),
|
||||
) as call:
|
||||
with pytest.raises(InvalidAuthError):
|
||||
manager.async_create_access_token(refresh_token, ip)
|
||||
|
||||
call.assert_called_with(refresh_token, ip)
|
||||
|
||||
|
||||
async def test_cannot_deactive_owner(mock_hass):
|
||||
"""Test that we cannot deactivate the owner."""
|
||||
manager = await auth.auth_manager_from_config(mock_hass, [], [])
|
||||
|
@ -626,14 +682,10 @@ async def test_login_with_auth_module(mock_hass):
|
|||
step["flow_id"], {"pin": "test-pin"}
|
||||
)
|
||||
|
||||
# Finally passed, get user
|
||||
# Finally passed, get credential
|
||||
assert step["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
user = step["result"]
|
||||
assert user is not None
|
||||
assert user.id == "mock-user"
|
||||
assert user.is_owner is False
|
||||
assert user.is_active is False
|
||||
assert user.name == "Paulus"
|
||||
assert step["result"]
|
||||
assert step["result"].id == "mock-id"
|
||||
|
||||
|
||||
async def test_login_with_multi_auth_module(mock_hass):
|
||||
|
@ -703,14 +755,10 @@ async def test_login_with_multi_auth_module(mock_hass):
|
|||
step["flow_id"], {"pin": "test-pin2"}
|
||||
)
|
||||
|
||||
# Finally passed, get user
|
||||
# Finally passed, get credential
|
||||
assert step["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
user = step["result"]
|
||||
assert user is not None
|
||||
assert user.id == "mock-user"
|
||||
assert user.is_owner is False
|
||||
assert user.is_active is False
|
||||
assert user.name == "Paulus"
|
||||
assert step["result"]
|
||||
assert step["result"].id == "mock-id"
|
||||
|
||||
|
||||
async def test_auth_module_expired_session(mock_hass):
|
||||
|
@ -792,7 +840,8 @@ async def test_enable_mfa_for_user(hass, hass_storage):
|
|||
step = await manager.login_flow.async_configure(
|
||||
step["flow_id"], {"username": "test-user", "password": "test-pass"}
|
||||
)
|
||||
user = step["result"]
|
||||
credential = step["result"]
|
||||
user = await manager.async_get_or_create_user(credential)
|
||||
assert user is not None
|
||||
|
||||
# new user don't have mfa enabled
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue