Add websocket commands for refresh tokens (#16559)
* Add websocket commands for refresh tokens * Comment
This commit is contained in:
parent
4e3faf6108
commit
0db13a99aa
3 changed files with 140 additions and 44 deletions
|
@ -2,18 +2,15 @@
|
|||
from datetime import timedelta
|
||||
from unittest.mock import patch
|
||||
|
||||
from homeassistant import const
|
||||
from homeassistant.auth import auth_manager_from_config
|
||||
from homeassistant.auth.models import Credentials
|
||||
from homeassistant.components.auth import RESULT_TYPE_USER
|
||||
from homeassistant.setup import async_setup_component
|
||||
from homeassistant.util.dt import utcnow
|
||||
from homeassistant.components import auth
|
||||
|
||||
from . import async_setup_auth
|
||||
from tests.common import CLIENT_ID, CLIENT_REDIRECT_URI, MockUser
|
||||
|
||||
from tests.common import CLIENT_ID, CLIENT_REDIRECT_URI, MockUser, \
|
||||
ensure_auth_manager_loaded
|
||||
from . import async_setup_auth
|
||||
|
||||
|
||||
async def test_login_new_user_and_trying_refresh_token(hass, aiohttp_client):
|
||||
|
@ -272,28 +269,12 @@ async def test_revoking_refresh_token(hass, aiohttp_client):
|
|||
assert resp.status == 400
|
||||
|
||||
|
||||
async def test_ws_long_lived_access_token(hass, hass_ws_client):
|
||||
async def test_ws_long_lived_access_token(hass, hass_ws_client,
|
||||
hass_access_token):
|
||||
"""Test generate long-lived access token."""
|
||||
hass.auth = await auth_manager_from_config(
|
||||
hass, provider_configs=[{
|
||||
'type': 'insecure_example',
|
||||
'users': [{
|
||||
'username': 'test-user',
|
||||
'password': 'test-pass',
|
||||
'name': 'Test Name',
|
||||
}]
|
||||
}], module_configs=[])
|
||||
ensure_auth_manager_loaded(hass.auth)
|
||||
assert await async_setup_component(hass, 'auth', {'http': {}})
|
||||
assert await async_setup_component(hass, 'api', {'http': {}})
|
||||
|
||||
user = MockUser(id='mock-user').add_to_hass(hass)
|
||||
cred = await hass.auth.auth_providers[0].async_get_or_create_credentials(
|
||||
{'username': 'test-user'})
|
||||
await hass.auth.async_link_user(user, cred)
|
||||
|
||||
ws_client = await hass_ws_client(hass, hass.auth.async_create_access_token(
|
||||
await hass.auth.async_create_refresh_token(user, CLIENT_ID)))
|
||||
ws_client = await hass_ws_client(hass, hass_access_token)
|
||||
|
||||
# verify create long-lived access token
|
||||
await ws_client.send_json({
|
||||
|
@ -315,12 +296,51 @@ async def test_ws_long_lived_access_token(hass, hass_ws_client):
|
|||
assert refresh_token.client_name == 'GPS Logger'
|
||||
assert refresh_token.client_icon is None
|
||||
|
||||
# verify long-lived access token can be used as bearer token
|
||||
api_client = ws_client.client
|
||||
resp = await api_client.get(const.URL_API)
|
||||
assert resp.status == 401
|
||||
|
||||
resp = await api_client.get(const.URL_API, headers={
|
||||
'Authorization': 'Bearer {}'.format(long_lived_access_token)
|
||||
async def test_ws_refresh_tokens(hass, hass_ws_client, hass_access_token):
|
||||
"""Test fetching refresh token metadata."""
|
||||
assert await async_setup_component(hass, 'auth', {'http': {}})
|
||||
|
||||
ws_client = await hass_ws_client(hass, hass_access_token)
|
||||
|
||||
await ws_client.send_json({
|
||||
'id': 5,
|
||||
'type': auth.WS_TYPE_REFRESH_TOKENS,
|
||||
})
|
||||
assert resp.status == 200
|
||||
|
||||
result = await ws_client.receive_json()
|
||||
assert result['success'], result
|
||||
assert len(result['result']) == 1
|
||||
token = result['result'][0]
|
||||
refresh_token = await hass.auth.async_validate_access_token(
|
||||
hass_access_token)
|
||||
assert token['id'] == refresh_token.id
|
||||
assert token['type'] == refresh_token.token_type
|
||||
assert token['client_id'] == refresh_token.client_id
|
||||
assert token['client_name'] == refresh_token.client_name
|
||||
assert token['client_icon'] == refresh_token.client_icon
|
||||
assert token['created_at'] == refresh_token.created_at.isoformat()
|
||||
|
||||
|
||||
async def test_ws_delete_refresh_token(hass, hass_ws_client,
|
||||
hass_access_token):
|
||||
"""Test deleting a refresh token."""
|
||||
assert await async_setup_component(hass, 'auth', {'http': {}})
|
||||
|
||||
refresh_token = await hass.auth.async_validate_access_token(
|
||||
hass_access_token)
|
||||
|
||||
ws_client = await hass_ws_client(hass, hass_access_token)
|
||||
|
||||
# verify create long-lived access token
|
||||
await ws_client.send_json({
|
||||
'id': 5,
|
||||
'type': auth.WS_TYPE_DELETE_REFRESH_TOKEN,
|
||||
'refresh_token_id': refresh_token.id
|
||||
})
|
||||
|
||||
result = await ws_client.receive_json()
|
||||
assert result['success'], result
|
||||
refresh_token = await hass.auth.async_validate_access_token(
|
||||
hass_access_token)
|
||||
assert refresh_token is None
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue