"""Tests for the Home Assistant auth module."""

from datetime import timedelta
import time
from typing import Any
from unittest.mock import patch

from freezegun import freeze_time
import jwt
import pytest
import voluptuous as vol

from homeassistant import auth, data_entry_flow
from homeassistant.auth import (
    EVENT_USER_UPDATED,
    InvalidAuthError,
    auth_store,
    const as auth_const,
    models as auth_models,
)
from homeassistant.auth.const import GROUP_ID_ADMIN, MFA_SESSION_EXPIRATION
from homeassistant.auth.models import Credentials
from homeassistant.core import HomeAssistant, callback
from homeassistant.util import dt as dt_util

from tests.common import (
    CLIENT_ID,
    MockUser,
    async_capture_events,
    async_fire_time_changed,
    ensure_auth_manager_loaded,
    flush_store,
)


@pytest.fixture
def mock_hass(hass: HomeAssistant) -> HomeAssistant:
    """Home Assistant mock with minimum amount of data set to make it work with auth."""
    return hass


async def test_auth_manager_from_config_validates_config(mock_hass) -> None:
    """Test get auth providers."""
    with pytest.raises(vol.Invalid):
        manager = await auth.auth_manager_from_config(
            mock_hass,
            [
                {"name": "Test Name", "type": "insecure_example", "users": []},
                {
                    "name": "Invalid configuration because no users",
                    "type": "insecure_example",
                    "id": "invalid_config",
                },
            ],
            [],
        )

    manager = await auth.auth_manager_from_config(
        mock_hass,
        [
            {"name": "Test Name", "type": "insecure_example", "users": []},
            {
                "name": "Test Name 2",
                "type": "insecure_example",
                "id": "another",
                "users": [],
            },
        ],
        [],
    )

    providers = [
        {"name": provider.name, "id": provider.id, "type": provider.type}
        for provider in manager.auth_providers
    ]

    assert providers == [
        {"name": "Test Name", "type": "insecure_example", "id": None},
        {"name": "Test Name 2", "type": "insecure_example", "id": "another"},
    ]


async def test_auth_manager_from_config_auth_modules(mock_hass) -> None:
    """Test get auth modules."""
    with pytest.raises(vol.Invalid):
        manager = await auth.auth_manager_from_config(
            mock_hass,
            [
                {"name": "Test Name", "type": "insecure_example", "users": []},
                {
                    "name": "Test Name 2",
                    "type": "insecure_example",
                    "id": "another",
                    "users": [],
                },
            ],
            [
                {"name": "Module 1", "type": "insecure_example", "data": []},
                {
                    "name": "Invalid configuration because no data",
                    "type": "insecure_example",
                    "id": "another",
                },
            ],
        )

    manager = await auth.auth_manager_from_config(
        mock_hass,
        [
            {"name": "Test Name", "type": "insecure_example", "users": []},
            {
                "name": "Test Name 2",
                "type": "insecure_example",
                "id": "another",
                "users": [],
            },
        ],
        [
            {"name": "Module 1", "type": "insecure_example", "data": []},
            {
                "name": "Module 2",
                "type": "insecure_example",
                "id": "another",
                "data": [],
            },
        ],
    )
    providers = [
        {"name": provider.name, "type": provider.type, "id": provider.id}
        for provider in manager.auth_providers
    ]
    assert providers == [
        {"name": "Test Name", "type": "insecure_example", "id": None},
        {"name": "Test Name 2", "type": "insecure_example", "id": "another"},
    ]

    modules = [
        {"name": module.name, "type": module.type, "id": module.id}
        for module in manager.auth_mfa_modules
    ]
    assert modules == [
        {"name": "Module 1", "type": "insecure_example", "id": "insecure_example"},
        {"name": "Module 2", "type": "insecure_example", "id": "another"},
    ]


async def test_create_new_user(hass: HomeAssistant) -> None:
    """Test creating new user."""
    events = []

    @callback
    def user_added(event):
        events.append(event)

    hass.bus.async_listen("user_added", user_added)

    manager = await auth.auth_manager_from_config(
        hass,
        [
            {
                "type": "insecure_example",
                "users": [
                    {
                        "username": "test-user",
                        "password": "test-pass",
                        "name": "Test Name",
                    }
                ],
            }
        ],
        [],
    )

    step = await manager.login_flow.async_init(("insecure_example", None))
    assert step["type"] == data_entry_flow.FlowResultType.FORM

    step = await manager.login_flow.async_configure(
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
    )
    assert step["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
    credential = step["result"]
    assert credential is not None

    user = await manager.async_get_or_create_user(credential)
    assert user is not None
    assert user.is_owner is False
    assert user.name == "Test Name"

    await hass.async_block_till_done()
    assert len(events) == 1
    assert events[0].data["user_id"] == user.id


async def test_login_as_existing_user(mock_hass) -> None:
    """Test login as existing user."""
    manager = await auth.auth_manager_from_config(
        mock_hass,
        [
            {
                "type": "insecure_example",
                "users": [
                    {
                        "username": "test-user",
                        "password": "test-pass",
                        "name": "Test Name",
                    }
                ],
            }
        ],
        [],
    )
    mock_hass.auth = manager
    ensure_auth_manager_loaded(manager)

    # Add a fake user that we're not going to log in with
    user = MockUser(
        id="mock-user2", is_owner=False, is_active=False, name="Not user"
    ).add_to_auth_manager(manager)
    user.credentials.append(
        auth_models.Credentials(
            id="mock-id2",
            auth_provider_type="insecure_example",
            auth_provider_id=None,
            data={"username": "other-user"},
            is_new=False,
        )
    )

    # Add fake user with credentials for example auth provider.
    user = MockUser(
        id="mock-user", is_owner=False, is_active=False, name="Paulus"
    ).add_to_auth_manager(manager)
    user.credentials.append(
        auth_models.Credentials(
            id="mock-id",
            auth_provider_type="insecure_example",
            auth_provider_id=None,
            data={"username": "test-user"},
            is_new=False,
        )
    )

    step = await manager.login_flow.async_init(("insecure_example", None))
    assert step["type"] == data_entry_flow.FlowResultType.FORM

    step = await manager.login_flow.async_configure(
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
    )
    assert step["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY

    credential = step["result"]
    user = await manager.async_get_user_by_credentials(credential)
    assert user is not None
    assert user.id == "mock-user"
    assert user.is_owner is False
    assert user.is_active is False
    assert user.name == "Paulus"


async def test_linking_user_to_two_auth_providers(
    hass: HomeAssistant, hass_storage: dict[str, Any]
) -> None:
    """Test linking user to two auth providers."""
    manager = await auth.auth_manager_from_config(
        hass,
        [
            {
                "type": "insecure_example",
                "users": [{"username": "test-user", "password": "test-pass"}],
            },
            {
                "type": "insecure_example",
                "id": "another-provider",
                "users": [{"username": "another-user", "password": "another-password"}],
            },
        ],
        [],
    )

    step = await manager.login_flow.async_init(("insecure_example", None))
    step = await manager.login_flow.async_configure(
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
    )
    credential = step["result"]
    user = await manager.async_get_or_create_user(credential)
    assert user is not None

    step = await manager.login_flow.async_init(
        ("insecure_example", "another-provider"), context={"credential_only": True}
    )
    step = await manager.login_flow.async_configure(
        step["flow_id"], {"username": "another-user", "password": "another-password"}
    )
    new_credential = step["result"]
    await manager.async_link_user(user, new_credential)
    assert len(user.credentials) == 2

    # Linking it again to same user is a no-op
    await manager.async_link_user(user, new_credential)
    assert len(user.credentials) == 2

    # Linking a credential to a user while the credential is already linked to another user should raise
    user_2 = await manager.async_create_user("User 2")
    with pytest.raises(ValueError):
        await manager.async_link_user(user_2, new_credential)
    assert len(user_2.credentials) == 0


async def test_saving_loading(
    hass: HomeAssistant, hass_storage: dict[str, Any]
) -> None:
    """Test storing and saving data.

    Creates one of each type that we store to test we restore correctly.
    """
    manager = await auth.auth_manager_from_config(
        hass,
        [
            {
                "type": "insecure_example",
                "users": [{"username": "test-user", "password": "test-pass"}],
            }
        ],
        [],
    )

    step = await manager.login_flow.async_init(("insecure_example", None))
    step = await manager.login_flow.async_configure(
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
    )
    credential = step["result"]
    user = await manager.async_get_or_create_user(credential)

    await manager.async_activate_user(user)
    # the first refresh token will be used to create access token
    refresh_token = await manager.async_create_refresh_token(
        user, CLIENT_ID, credential=credential
    )
    manager.async_create_access_token(refresh_token, "192.168.0.1")
    # the second refresh token will not be used
    await manager.async_create_refresh_token(
        user, "dummy-client", credential=credential
    )

    await flush_store(manager._store._store)

    store2 = auth_store.AuthStore(hass)
    await store2.async_load()
    users = await store2.async_get_users()
    assert len(users) == 1
    assert users[0].permissions == user.permissions
    assert users[0] == user
    assert len(users[0].refresh_tokens) == 2
    for r_token in users[0].refresh_tokens.values():
        if r_token.client_id == CLIENT_ID:
            # verify the first refresh token
            assert r_token.last_used_at is not None
            assert r_token.last_used_ip == "192.168.0.1"
        elif r_token.client_id == "dummy-client":
            # verify the second refresh token
            assert r_token.last_used_at is None
            assert r_token.last_used_ip is None
        else:
            pytest.fail(f"Unknown client_id: {r_token.client_id}")


async def test_cannot_retrieve_expired_access_token(hass: HomeAssistant) -> None:
    """Test that we cannot retrieve expired access tokens."""
    manager = await auth.auth_manager_from_config(hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
    assert refresh_token.user.id is user.id
    assert refresh_token.client_id == CLIENT_ID

    access_token = manager.async_create_access_token(refresh_token)
    assert manager.async_validate_access_token(access_token) is refresh_token

    # We patch time directly here because we want the access token to be created with
    # an expired time, but we do not want to freeze time so that jwt will compare it
    # to the patched time. If we freeze time for the test it will be frozen for jwt
    # as well and the token will not be expired.
    with patch(
        "homeassistant.auth.time.time",
        return_value=time.time()
        - auth_const.ACCESS_TOKEN_EXPIRATION.total_seconds()
        - 11,
    ):
        access_token = manager.async_create_access_token(refresh_token)

    assert manager.async_validate_access_token(access_token) is None


async def test_generating_system_user(hass: HomeAssistant) -> None:
    """Test that we can add a system user."""
    events = []

    @callback
    def user_added(event):
        events.append(event)

    hass.bus.async_listen("user_added", user_added)

    manager = await auth.auth_manager_from_config(hass, [], [])
    user = await manager.async_create_system_user("Hass.io")
    token = await manager.async_create_refresh_token(user)
    assert user.system_generated
    assert user.groups == []
    assert not user.local_only
    assert token is not None
    assert token.client_id is None
    assert token.token_type == auth.models.TOKEN_TYPE_SYSTEM
    assert token.expire_at is None

    await hass.async_block_till_done()
    assert len(events) == 1
    assert events[0].data["user_id"] == user.id

    # Passing arguments
    user = await manager.async_create_system_user(
        "Hass.io", group_ids=[GROUP_ID_ADMIN], local_only=True
    )
    token = await manager.async_create_refresh_token(user)
    assert user.system_generated
    assert user.is_admin
    assert user.local_only
    assert token is not None
    assert token.client_id is None
    assert token.token_type == auth.models.TOKEN_TYPE_SYSTEM
    assert token.expire_at is None

    await hass.async_block_till_done()
    assert len(events) == 2
    assert events[1].data["user_id"] == user.id


async def test_refresh_token_requires_client_for_user(hass: HomeAssistant) -> None:
    """Test create refresh token for a user with client_id."""
    manager = await auth.auth_manager_from_config(hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    assert user.system_generated is False

    with pytest.raises(ValueError):
        await manager.async_create_refresh_token(user)

    token = await manager.async_create_refresh_token(user, CLIENT_ID)
    assert token is not None
    assert token.client_id == CLIENT_ID
    assert token.token_type == auth_models.TOKEN_TYPE_NORMAL
    # default access token expiration
    assert token.access_token_expiration == auth_const.ACCESS_TOKEN_EXPIRATION


async def test_refresh_token_not_requires_client_for_system_user(
    hass: HomeAssistant,
) -> None:
    """Test create refresh token for a system user w/o client_id."""
    manager = await auth.auth_manager_from_config(hass, [], [])
    user = await manager.async_create_system_user("Hass.io")
    assert user.system_generated is True

    with pytest.raises(ValueError):
        await manager.async_create_refresh_token(user, CLIENT_ID)

    token = await manager.async_create_refresh_token(user)
    assert token is not None
    assert token.client_id is None
    assert token.token_type == auth_models.TOKEN_TYPE_SYSTEM


async def test_refresh_token_with_specific_access_token_expiration(
    hass: HomeAssistant,
) -> None:
    """Test create a refresh token with specific access token expiration."""
    manager = await auth.auth_manager_from_config(hass, [], [])
    user = MockUser().add_to_auth_manager(manager)

    token = await manager.async_create_refresh_token(
        user, CLIENT_ID, access_token_expiration=timedelta(days=100)
    )
    assert token is not None
    assert token.client_id == CLIENT_ID
    assert token.access_token_expiration == timedelta(days=100)
    assert token.token_type == auth.models.TOKEN_TYPE_NORMAL
    assert token.expire_at is not None


async def test_refresh_token_type(hass: HomeAssistant) -> None:
    """Test create a refresh token with token type."""
    manager = await auth.auth_manager_from_config(hass, [], [])
    user = MockUser().add_to_auth_manager(manager)

    with pytest.raises(ValueError):
        await manager.async_create_refresh_token(
            user, CLIENT_ID, token_type=auth_models.TOKEN_TYPE_SYSTEM
        )

    token = await manager.async_create_refresh_token(
        user, CLIENT_ID, token_type=auth_models.TOKEN_TYPE_NORMAL
    )
    assert token is not None
    assert token.client_id == CLIENT_ID
    assert token.token_type == auth_models.TOKEN_TYPE_NORMAL


async def test_refresh_token_type_long_lived_access_token(hass: HomeAssistant) -> None:
    """Test create a refresh token has long-lived access token type."""
    manager = await auth.auth_manager_from_config(hass, [], [])
    user = MockUser().add_to_auth_manager(manager)

    with pytest.raises(ValueError):
        await manager.async_create_refresh_token(
            user, token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
        )

    token = await manager.async_create_refresh_token(
        user,
        client_name="GPS LOGGER",
        client_icon="mdi:home",
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
    )
    assert token is not None
    assert token.client_id is None
    assert token.client_name == "GPS LOGGER"
    assert token.client_icon == "mdi:home"
    assert token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
    assert token.expire_at is None


async def test_refresh_token_provider_validation(mock_hass) -> None:
    """Test that creating access token from refresh token checks with provider."""
    manager = await auth.auth_manager_from_config(
        mock_hass,
        [
            {
                "type": "insecure_example",
                "users": [{"username": "test-user", "password": "test-pass"}],
            }
        ],
        [],
    )

    credential = auth_models.Credentials(
        id="mock-credential-id",
        auth_provider_type="insecure_example",
        auth_provider_id=None,
        data={"username": "test-user"},
        is_new=False,
    )

    user = MockUser().add_to_auth_manager(manager)
    user.credentials.append(credential)
    refresh_token = await manager.async_create_refresh_token(
        user, CLIENT_ID, credential=credential
    )
    ip = "127.0.0.1"

    assert manager.async_create_access_token(refresh_token, ip) is not None

    with (
        patch(
            "homeassistant.auth.providers.insecure_example.ExampleAuthProvider.async_validate_refresh_token",
            side_effect=InvalidAuthError("Invalid access"),
        ) as call,
        pytest.raises(InvalidAuthError),
    ):
        manager.async_create_access_token(refresh_token, ip)

    call.assert_called_with(refresh_token, ip)


async def test_cannot_deactive_owner(mock_hass) -> None:
    """Test that we cannot deactivate the owner."""
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    owner = MockUser(is_owner=True).add_to_auth_manager(manager)

    with pytest.raises(ValueError):
        await manager.async_deactivate_user(owner)


async def test_remove_refresh_token(hass: HomeAssistant) -> None:
    """Test that we can remove a refresh token."""
    manager = await auth.auth_manager_from_config(hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
    access_token = manager.async_create_access_token(refresh_token)

    manager.async_remove_refresh_token(refresh_token)

    assert manager.async_get_refresh_token(refresh_token.id) is None
    assert manager.async_validate_access_token(access_token) is None


async def test_remove_expired_refresh_token(hass: HomeAssistant) -> None:
    """Test that expired refresh tokens are deleted."""
    manager = await auth.auth_manager_from_config(hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    now = dt_util.utcnow()
    with freeze_time(now):
        refresh_token1 = await manager.async_create_refresh_token(user, CLIENT_ID)
        assert (
            refresh_token1.expire_at
            == now.timestamp() + timedelta(days=90).total_seconds()
        )

    with freeze_time(now + timedelta(days=30)):
        async_fire_time_changed(hass, now + timedelta(days=30))
        refresh_token2 = await manager.async_create_refresh_token(user, CLIENT_ID)
        assert (
            refresh_token2.expire_at
            == now.timestamp() + timedelta(days=120).total_seconds()
        )

    with freeze_time(now + timedelta(days=89, hours=23)):
        async_fire_time_changed(hass, now + timedelta(days=89, hours=23))
        await hass.async_block_till_done()
        assert manager.async_get_refresh_token(refresh_token1.id)
        assert manager.async_get_refresh_token(refresh_token2.id)

    with freeze_time(now + timedelta(days=90, seconds=5)):
        async_fire_time_changed(hass, now + timedelta(days=90, seconds=5))
        await hass.async_block_till_done()
        assert manager.async_get_refresh_token(refresh_token1.id) is None
        assert manager.async_get_refresh_token(refresh_token2.id)

    with freeze_time(now + timedelta(days=120, seconds=5)):
        async_fire_time_changed(hass, now + timedelta(days=120, seconds=5))
        await hass.async_block_till_done()
        assert manager.async_get_refresh_token(refresh_token1.id) is None
        assert manager.async_get_refresh_token(refresh_token2.id) is None


async def test_update_expire_at_refresh_token(hass: HomeAssistant) -> None:
    """Test that expire at is updated when refresh token is used."""
    manager = await auth.auth_manager_from_config(hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    now = dt_util.utcnow()
    with freeze_time(now):
        refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
        assert (
            refresh_token.expire_at
            == now.timestamp() + timedelta(days=90).total_seconds()
        )

    with freeze_time(now + timedelta(days=30)):
        async_fire_time_changed(hass, now + timedelta(days=30))
        await hass.async_block_till_done()
        assert manager.async_create_access_token(refresh_token)
        await hass.async_block_till_done()
        assert (
            refresh_token.expire_at
            == now.timestamp()
            + timedelta(days=30).total_seconds()
            + timedelta(days=90).total_seconds()
        )


async def test_register_revoke_token_callback(mock_hass) -> None:
    """Test that a registered revoke token callback is called."""
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)

    called = False

    def cb():
        nonlocal called
        called = True

    manager.async_register_revoke_token_callback(refresh_token.id, cb)
    manager.async_remove_refresh_token(refresh_token)
    assert called


async def test_unregister_revoke_token_callback(mock_hass) -> None:
    """Test that a revoke token callback can be unregistered."""
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)

    called = False

    def cb():
        nonlocal called
        called = True

    unregister = manager.async_register_revoke_token_callback(refresh_token.id, cb)
    unregister()

    manager.async_remove_refresh_token(refresh_token)
    assert not called


async def test_create_access_token(mock_hass) -> None:
    """Test normal refresh_token's jwt_key keep same after used."""
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
    assert refresh_token.token_type == auth_models.TOKEN_TYPE_NORMAL
    jwt_key = refresh_token.jwt_key
    access_token = manager.async_create_access_token(refresh_token)
    assert access_token is not None
    assert refresh_token.jwt_key == jwt_key
    jwt_payload = jwt.decode(access_token, jwt_key, algorithms=["HS256"])
    assert jwt_payload["iss"] == refresh_token.id
    assert (
        jwt_payload["exp"] - jwt_payload["iat"] == timedelta(minutes=30).total_seconds()
    )


async def test_create_long_lived_access_token(mock_hass) -> None:
    """Test refresh_token's jwt_key changed for long-lived access token."""
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    refresh_token = await manager.async_create_refresh_token(
        user,
        client_name="GPS Logger",
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
        access_token_expiration=timedelta(days=300),
    )
    assert refresh_token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
    access_token = manager.async_create_access_token(refresh_token)
    jwt_payload = jwt.decode(access_token, refresh_token.jwt_key, algorithms=["HS256"])
    assert jwt_payload["iss"] == refresh_token.id
    assert (
        jwt_payload["exp"] - jwt_payload["iat"] == timedelta(days=300).total_seconds()
    )


async def test_one_long_lived_access_token_per_refresh_token(mock_hass) -> None:
    """Test one refresh_token can only have one long-lived access token."""
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    refresh_token = await manager.async_create_refresh_token(
        user,
        client_name="GPS Logger",
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
        access_token_expiration=timedelta(days=3000),
    )
    assert refresh_token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
    access_token = manager.async_create_access_token(refresh_token)
    jwt_key = refresh_token.jwt_key

    rt = manager.async_validate_access_token(access_token)
    assert rt.id == refresh_token.id

    with pytest.raises(ValueError):
        await manager.async_create_refresh_token(
            user,
            client_name="GPS Logger",
            token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
            access_token_expiration=timedelta(days=3000),
        )

    manager.async_remove_refresh_token(refresh_token)
    assert refresh_token.id not in user.refresh_tokens
    rt = manager.async_validate_access_token(access_token)
    assert rt is None, "Previous issued access token has been invoked"

    refresh_token_2 = await manager.async_create_refresh_token(
        user,
        client_name="GPS Logger",
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
        access_token_expiration=timedelta(days=3000),
    )
    assert refresh_token_2.id != refresh_token.id
    assert refresh_token_2.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
    access_token_2 = manager.async_create_access_token(refresh_token_2)
    jwt_key_2 = refresh_token_2.jwt_key

    assert access_token != access_token_2
    assert jwt_key != jwt_key_2

    rt = manager.async_validate_access_token(access_token_2)
    jwt_payload = jwt.decode(access_token_2, rt.jwt_key, algorithms=["HS256"])
    assert jwt_payload["iss"] == refresh_token_2.id
    assert (
        jwt_payload["exp"] - jwt_payload["iat"] == timedelta(days=3000).total_seconds()
    )


async def test_login_with_auth_module(mock_hass) -> None:
    """Test login as existing user with auth module."""
    manager = await auth.auth_manager_from_config(
        mock_hass,
        [
            {
                "type": "insecure_example",
                "users": [
                    {
                        "username": "test-user",
                        "password": "test-pass",
                        "name": "Test Name",
                    }
                ],
            }
        ],
        [
            {
                "type": "insecure_example",
                "data": [{"user_id": "mock-user", "pin": "test-pin"}],
            }
        ],
    )
    mock_hass.auth = manager
    ensure_auth_manager_loaded(manager)

    # Add fake user with credentials for example auth provider.
    user = MockUser(
        id="mock-user", is_owner=False, is_active=False, name="Paulus"
    ).add_to_auth_manager(manager)
    user.credentials.append(
        auth_models.Credentials(
            id="mock-id",
            auth_provider_type="insecure_example",
            auth_provider_id=None,
            data={"username": "test-user"},
            is_new=False,
        )
    )

    step = await manager.login_flow.async_init(("insecure_example", None))
    assert step["type"] == data_entry_flow.FlowResultType.FORM

    step = await manager.login_flow.async_configure(
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
    )

    # After auth_provider validated, request auth module input form
    assert step["type"] == data_entry_flow.FlowResultType.FORM
    assert step["step_id"] == "mfa"

    step = await manager.login_flow.async_configure(
        step["flow_id"], {"pin": "invalid-pin"}
    )

    # Invalid code error
    assert step["type"] == data_entry_flow.FlowResultType.FORM
    assert step["step_id"] == "mfa"
    assert step["errors"] == {"base": "invalid_code"}

    step = await manager.login_flow.async_configure(
        step["flow_id"], {"pin": "test-pin"}
    )

    # Finally passed, get credential
    assert step["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
    assert step["result"]
    assert step["result"].id == "mock-id"


async def test_login_with_multi_auth_module(mock_hass) -> None:
    """Test login as existing user with multiple auth modules."""
    manager = await auth.auth_manager_from_config(
        mock_hass,
        [
            {
                "type": "insecure_example",
                "users": [
                    {
                        "username": "test-user",
                        "password": "test-pass",
                        "name": "Test Name",
                    }
                ],
            }
        ],
        [
            {
                "type": "insecure_example",
                "data": [{"user_id": "mock-user", "pin": "test-pin"}],
            },
            {
                "type": "insecure_example",
                "id": "module2",
                "data": [{"user_id": "mock-user", "pin": "test-pin2"}],
            },
        ],
    )
    mock_hass.auth = manager
    ensure_auth_manager_loaded(manager)

    # Add fake user with credentials for example auth provider.
    user = MockUser(
        id="mock-user", is_owner=False, is_active=False, name="Paulus"
    ).add_to_auth_manager(manager)
    user.credentials.append(
        auth_models.Credentials(
            id="mock-id",
            auth_provider_type="insecure_example",
            auth_provider_id=None,
            data={"username": "test-user"},
            is_new=False,
        )
    )

    step = await manager.login_flow.async_init(("insecure_example", None))
    assert step["type"] == data_entry_flow.FlowResultType.FORM

    step = await manager.login_flow.async_configure(
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
    )

    # After auth_provider validated, request select auth module
    assert step["type"] == data_entry_flow.FlowResultType.FORM
    assert step["step_id"] == "select_mfa_module"

    step = await manager.login_flow.async_configure(
        step["flow_id"], {"multi_factor_auth_module": "module2"}
    )

    assert step["type"] == data_entry_flow.FlowResultType.FORM
    assert step["step_id"] == "mfa"

    step = await manager.login_flow.async_configure(
        step["flow_id"], {"pin": "test-pin2"}
    )

    # Finally passed, get credential
    assert step["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
    assert step["result"]
    assert step["result"].id == "mock-id"


async def test_auth_module_expired_session(mock_hass) -> None:
    """Test login as existing user."""
    manager = await auth.auth_manager_from_config(
        mock_hass,
        [
            {
                "type": "insecure_example",
                "users": [
                    {
                        "username": "test-user",
                        "password": "test-pass",
                        "name": "Test Name",
                    }
                ],
            }
        ],
        [
            {
                "type": "insecure_example",
                "data": [{"user_id": "mock-user", "pin": "test-pin"}],
            }
        ],
    )
    mock_hass.auth = manager
    ensure_auth_manager_loaded(manager)

    # Add fake user with credentials for example auth provider.
    user = MockUser(
        id="mock-user", is_owner=False, is_active=False, name="Paulus"
    ).add_to_auth_manager(manager)
    user.credentials.append(
        auth_models.Credentials(
            id="mock-id",
            auth_provider_type="insecure_example",
            auth_provider_id=None,
            data={"username": "test-user"},
            is_new=False,
        )
    )

    step = await manager.login_flow.async_init(("insecure_example", None))
    assert step["type"] == data_entry_flow.FlowResultType.FORM

    step = await manager.login_flow.async_configure(
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
    )

    assert step["type"] == data_entry_flow.FlowResultType.FORM
    assert step["step_id"] == "mfa"

    with freeze_time(dt_util.utcnow() + MFA_SESSION_EXPIRATION):
        step = await manager.login_flow.async_configure(
            step["flow_id"], {"pin": "test-pin"}
        )
        # login flow abort due session timeout
        assert step["type"] == data_entry_flow.FlowResultType.ABORT
        assert step["reason"] == "login_expired"


async def test_enable_mfa_for_user(
    hass: HomeAssistant, hass_storage: dict[str, Any]
) -> None:
    """Test enable mfa module for user."""
    manager = await auth.auth_manager_from_config(
        hass,
        [
            {
                "type": "insecure_example",
                "users": [{"username": "test-user", "password": "test-pass"}],
            }
        ],
        [{"type": "insecure_example", "data": []}],
    )

    step = await manager.login_flow.async_init(("insecure_example", None))
    step = await manager.login_flow.async_configure(
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
    )
    credential = step["result"]
    user = await manager.async_get_or_create_user(credential)
    assert user is not None

    # new user don't have mfa enabled
    modules = await manager.async_get_enabled_mfa(user)
    assert len(modules) == 0

    module = manager.get_auth_mfa_module("insecure_example")
    # mfa module don't have data
    assert bool(module._data) is False

    # test enable mfa for user
    await manager.async_enable_user_mfa(user, "insecure_example", {"pin": "test-pin"})
    assert len(module._data) == 1
    assert module._data[0] == {"user_id": user.id, "pin": "test-pin"}

    # test get enabled mfa
    modules = await manager.async_get_enabled_mfa(user)
    assert len(modules) == 1
    assert "insecure_example" in modules

    # re-enable mfa for user will override
    await manager.async_enable_user_mfa(
        user, "insecure_example", {"pin": "test-pin-new"}
    )
    assert len(module._data) == 1
    assert module._data[0] == {"user_id": user.id, "pin": "test-pin-new"}
    modules = await manager.async_get_enabled_mfa(user)
    assert len(modules) == 1
    assert "insecure_example" in modules

    # system user cannot enable mfa
    system_user = await manager.async_create_system_user("system-user")
    with pytest.raises(ValueError):
        await manager.async_enable_user_mfa(
            system_user, "insecure_example", {"pin": "test-pin"}
        )
    assert len(module._data) == 1
    modules = await manager.async_get_enabled_mfa(system_user)
    assert len(modules) == 0

    # disable mfa for user
    await manager.async_disable_user_mfa(user, "insecure_example")
    assert bool(module._data) is False

    # test get enabled mfa
    modules = await manager.async_get_enabled_mfa(user)
    assert len(modules) == 0

    # disable mfa for user don't enabled just silent fail
    await manager.async_disable_user_mfa(user, "insecure_example")


async def test_async_remove_user(hass: HomeAssistant) -> None:
    """Test removing a user."""
    events = async_capture_events(hass, "user_removed")
    manager = await auth.auth_manager_from_config(
        hass,
        [
            {
                "type": "insecure_example",
                "users": [
                    {
                        "username": "test-user",
                        "password": "test-pass",
                        "name": "Test Name",
                    }
                ],
            }
        ],
        [],
    )
    hass.auth = manager
    ensure_auth_manager_loaded(manager)

    # Add fake user with credentials for example auth provider.
    user = MockUser(
        id="mock-user", is_owner=False, is_active=False, name="Paulus"
    ).add_to_auth_manager(manager)
    user.credentials.append(
        auth_models.Credentials(
            id="mock-id",
            auth_provider_type="insecure_example",
            auth_provider_id=None,
            data={"username": "test-user"},
            is_new=False,
        )
    )
    assert len(user.credentials) == 1

    await hass.auth.async_remove_user(user)

    assert len(await manager.async_get_users()) == 0
    assert len(user.credentials) == 0

    await hass.async_block_till_done()
    assert len(events) == 1
    assert events[0].data["user_id"] == user.id


async def test_async_remove_user_fail_if_remove_credential_fails(
    hass: HomeAssistant, hass_admin_user: MockUser, hass_admin_credential: Credentials
) -> None:
    """Test removing a user."""
    await hass.auth.async_link_user(hass_admin_user, hass_admin_credential)

    with (
        patch.object(hass.auth, "async_remove_credentials", side_effect=ValueError),
        pytest.raises(ValueError),
    ):
        await hass.auth.async_remove_user(hass_admin_user)


async def test_new_users(mock_hass) -> None:
    """Test newly created users."""
    manager = await auth.auth_manager_from_config(
        mock_hass,
        [
            {
                "type": "insecure_example",
                "users": [
                    {
                        "username": "test-user",
                        "password": "test-pass",
                        "name": "Test Name",
                    },
                    {
                        "username": "test-user-2",
                        "password": "test-pass",
                        "name": "Test Name",
                    },
                    {
                        "username": "test-user-3",
                        "password": "test-pass",
                        "name": "Test Name",
                    },
                ],
            }
        ],
        [],
    )
    ensure_auth_manager_loaded(manager)

    user = await manager.async_create_user("Hello")
    # first user in the system is owner and admin
    assert user.is_owner
    assert user.is_admin
    assert not user.local_only
    assert user.groups == []

    user = await manager.async_create_user("Hello 2")
    assert not user.is_admin
    assert user.groups == []

    user = await manager.async_create_user(
        "Hello 3", group_ids=["system-admin"], local_only=True
    )
    assert user.is_admin
    assert user.groups[0].id == "system-admin"
    assert user.local_only

    user_cred = await manager.async_get_or_create_user(
        auth_models.Credentials(
            id="mock-id",
            auth_provider_type="insecure_example",
            auth_provider_id=None,
            data={"username": "test-user"},
            is_new=True,
        )
    )
    assert user_cred.is_admin


async def test_rename_does_not_change_refresh_token(mock_hass) -> None:
    """Test that we can rename without changing refresh token."""
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    await manager.async_create_refresh_token(user, CLIENT_ID)

    assert len(list(user.refresh_tokens.values())) == 1
    token_before = list(user.refresh_tokens.values())[0]

    await manager.async_update_user(user, name="new name")
    assert user.name == "new name"

    assert len(list(user.refresh_tokens.values())) == 1
    token_after = list(user.refresh_tokens.values())[0]

    assert token_before == token_after


async def test_event_user_updated_fires(hass: HomeAssistant) -> None:
    """Test the user updated event fires."""
    manager = await auth.auth_manager_from_config(hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    await manager.async_create_refresh_token(user, CLIENT_ID)

    assert len(list(user.refresh_tokens.values())) == 1

    events = async_capture_events(hass, EVENT_USER_UPDATED)

    await manager.async_update_user(user, name="new name")
    assert user.name == "new name"

    await hass.async_block_till_done()
    assert len(events) == 1


async def test_access_token_with_invalid_signature(mock_hass) -> None:
    """Test rejecting access tokens with an invalid signature."""
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    refresh_token = await manager.async_create_refresh_token(
        user,
        client_name="Good Client",
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
        access_token_expiration=timedelta(days=3000),
    )
    assert refresh_token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
    access_token = manager.async_create_access_token(refresh_token)

    rt = manager.async_validate_access_token(access_token)
    assert rt.id == refresh_token.id

    # Now we corrupt the signature
    header, payload, signature = access_token.split(".")
    invalid_signature = "a" * len(signature)
    invalid_token = f"{header}.{payload}.{invalid_signature}"

    assert access_token != invalid_token

    result = manager.async_validate_access_token(invalid_token)
    assert result is None


async def test_access_token_with_null_signature(mock_hass) -> None:
    """Test rejecting access tokens with a null signature."""
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    refresh_token = await manager.async_create_refresh_token(
        user,
        client_name="Good Client",
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
        access_token_expiration=timedelta(days=3000),
    )
    assert refresh_token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
    access_token = manager.async_create_access_token(refresh_token)

    rt = manager.async_validate_access_token(access_token)
    assert rt.id == refresh_token.id

    # Now we make the signature all nulls
    header, payload, signature = access_token.split(".")
    invalid_signature = "\0" * len(signature)
    invalid_token = f"{header}.{payload}.{invalid_signature}"

    assert access_token != invalid_token

    result = manager.async_validate_access_token(invalid_token)
    assert result is None


async def test_access_token_with_empty_signature(mock_hass) -> None:
    """Test rejecting access tokens with an empty signature."""
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    refresh_token = await manager.async_create_refresh_token(
        user,
        client_name="Good Client",
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
        access_token_expiration=timedelta(days=3000),
    )
    assert refresh_token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
    access_token = manager.async_create_access_token(refresh_token)

    rt = manager.async_validate_access_token(access_token)
    assert rt.id == refresh_token.id

    # Now we make the signature all nulls
    header, payload, _ = access_token.split(".")
    invalid_token = f"{header}.{payload}."

    assert access_token != invalid_token

    result = manager.async_validate_access_token(invalid_token)
    assert result is None


async def test_access_token_with_empty_key(mock_hass) -> None:
    """Test rejecting access tokens with an empty key."""
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    refresh_token = await manager.async_create_refresh_token(
        user,
        client_name="Good Client",
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
        access_token_expiration=timedelta(days=3000),
    )
    assert refresh_token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN

    access_token = manager.async_create_access_token(refresh_token)

    manager.async_remove_refresh_token(refresh_token)
    # Now remove the token from the keyring
    # so we will get an empty key

    assert manager.async_validate_access_token(access_token) is None


async def test_reject_access_token_with_impossible_large_size(mock_hass) -> None:
    """Test rejecting access tokens with impossible sizes."""
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    assert manager.async_validate_access_token("a" * 10000) is None


async def test_reject_token_with_invalid_json_payload(mock_hass) -> None:
    """Test rejecting access tokens with invalid json payload."""
    jws = jwt.PyJWS()
    token_with_invalid_json = jws.encode(
        b"invalid", b"invalid", "HS256", {"alg": "HS256", "typ": "JWT"}
    )
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    assert manager.async_validate_access_token(token_with_invalid_json) is None


async def test_reject_token_with_not_dict_json_payload(mock_hass) -> None:
    """Test rejecting access tokens with not a dict json payload."""
    jws = jwt.PyJWS()
    token_not_a_dict_json = jws.encode(
        b'["invalid"]', b"invalid", "HS256", {"alg": "HS256", "typ": "JWT"}
    )
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    assert manager.async_validate_access_token(token_not_a_dict_json) is None


async def test_access_token_that_expires_soon(mock_hass) -> None:
    """Test access token from refresh token that expires very soon."""
    now = dt_util.utcnow()
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    refresh_token = await manager.async_create_refresh_token(
        user,
        client_name="Token that expires very soon",
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
        access_token_expiration=timedelta(seconds=1),
    )
    assert refresh_token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
    access_token = manager.async_create_access_token(refresh_token)

    rt = manager.async_validate_access_token(access_token)
    assert rt.id == refresh_token.id

    with freeze_time(now + timedelta(minutes=1)):
        assert manager.async_validate_access_token(access_token) is None


async def test_access_token_from_the_future(mock_hass) -> None:
    """Test we reject an access token from the future."""
    now = dt_util.utcnow()
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
    user = MockUser().add_to_auth_manager(manager)
    with freeze_time(now + timedelta(days=365)):
        refresh_token = await manager.async_create_refresh_token(
            user,
            client_name="Token that expires very soon",
            token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
            access_token_expiration=timedelta(days=10),
        )
        assert (
            refresh_token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
        )
        access_token = manager.async_create_access_token(refresh_token)

    assert manager.async_validate_access_token(access_token) is None

    with freeze_time(now + timedelta(days=365)):
        rt = manager.async_validate_access_token(access_token)
        assert rt.id == refresh_token.id