By default to use access_token if hass.auth.active (#15212)
* Force to use access_token if hass.auth.active * Not allow Basic auth with api_password if hass.auth.active * Block websocket api_password auth when hass.auth.active * Add legacy_api_password auth provider * lint * lint
This commit is contained in:
parent
3da4642194
commit
f874efb224
8 changed files with 468 additions and 83 deletions
|
@ -17,37 +17,44 @@ _LOGGER = logging.getLogger(__name__)
|
|||
|
||||
|
||||
@callback
|
||||
def setup_auth(app, trusted_networks, api_password):
|
||||
def setup_auth(app, trusted_networks, use_auth,
|
||||
support_legacy=False, api_password=None):
|
||||
"""Create auth middleware for the app."""
|
||||
@middleware
|
||||
async def auth_middleware(request, handler):
|
||||
"""Authenticate as middleware."""
|
||||
# If no password set, just always set authenticated=True
|
||||
if api_password is None:
|
||||
request[KEY_AUTHENTICATED] = True
|
||||
return await handler(request)
|
||||
|
||||
# Check authentication
|
||||
authenticated = False
|
||||
|
||||
if (HTTP_HEADER_HA_AUTH in request.headers and
|
||||
hmac.compare_digest(
|
||||
api_password.encode('utf-8'),
|
||||
request.headers[HTTP_HEADER_HA_AUTH].encode('utf-8'))):
|
||||
if use_auth and (HTTP_HEADER_HA_AUTH in request.headers or
|
||||
DATA_API_PASSWORD in request.query):
|
||||
_LOGGER.warning('Please use access_token instead api_password.')
|
||||
|
||||
legacy_auth = (not use_auth or support_legacy) and api_password
|
||||
if (hdrs.AUTHORIZATION in request.headers and
|
||||
await async_validate_auth_header(
|
||||
request, api_password if legacy_auth else None)):
|
||||
# it included both use_auth and api_password Basic auth
|
||||
authenticated = True
|
||||
|
||||
elif (legacy_auth and HTTP_HEADER_HA_AUTH in request.headers and
|
||||
hmac.compare_digest(
|
||||
api_password.encode('utf-8'),
|
||||
request.headers[HTTP_HEADER_HA_AUTH].encode('utf-8'))):
|
||||
# A valid auth header has been set
|
||||
authenticated = True
|
||||
|
||||
elif (DATA_API_PASSWORD in request.query and
|
||||
elif (legacy_auth and DATA_API_PASSWORD in request.query and
|
||||
hmac.compare_digest(
|
||||
api_password.encode('utf-8'),
|
||||
request.query[DATA_API_PASSWORD].encode('utf-8'))):
|
||||
authenticated = True
|
||||
|
||||
elif (hdrs.AUTHORIZATION in request.headers and
|
||||
await async_validate_auth_header(api_password, request)):
|
||||
elif _is_trusted_ip(request, trusted_networks):
|
||||
authenticated = True
|
||||
|
||||
elif _is_trusted_ip(request, trusted_networks):
|
||||
elif not use_auth and api_password is None:
|
||||
# If neither password nor auth_providers set,
|
||||
# just always set authenticated=True
|
||||
authenticated = True
|
||||
|
||||
request[KEY_AUTHENTICATED] = authenticated
|
||||
|
@ -76,8 +83,12 @@ def validate_password(request, api_password):
|
|||
request.app['hass'].http.api_password.encode('utf-8'))
|
||||
|
||||
|
||||
async def async_validate_auth_header(api_password, request):
|
||||
"""Test an authorization header if valid password."""
|
||||
async def async_validate_auth_header(request, api_password=None):
|
||||
"""
|
||||
Test authorization header against access token.
|
||||
|
||||
Basic auth_type is legacy code, should be removed with api_password.
|
||||
"""
|
||||
if hdrs.AUTHORIZATION not in request.headers:
|
||||
return False
|
||||
|
||||
|
@ -88,7 +99,16 @@ async def async_validate_auth_header(api_password, request):
|
|||
# If no space in authorization header
|
||||
return False
|
||||
|
||||
if auth_type == 'Basic':
|
||||
if auth_type == 'Bearer':
|
||||
hass = request.app['hass']
|
||||
access_token = hass.auth.async_get_access_token(auth_val)
|
||||
if access_token is None:
|
||||
return False
|
||||
|
||||
request['hass_user'] = access_token.refresh_token.user
|
||||
return True
|
||||
|
||||
elif auth_type == 'Basic' and api_password is not None:
|
||||
decoded = base64.b64decode(auth_val).decode('utf-8')
|
||||
try:
|
||||
username, password = decoded.split(':', 1)
|
||||
|
@ -102,13 +122,5 @@ async def async_validate_auth_header(api_password, request):
|
|||
return hmac.compare_digest(api_password.encode('utf-8'),
|
||||
password.encode('utf-8'))
|
||||
|
||||
if auth_type != 'Bearer':
|
||||
else:
|
||||
return False
|
||||
|
||||
hass = request.app['hass']
|
||||
access_token = hass.auth.async_get_access_token(auth_val)
|
||||
if access_token is None:
|
||||
return False
|
||||
|
||||
request['hass_user'] = access_token.refresh_token.user
|
||||
return True
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue