Deprecate http.api_password (#21884)
* Deprecated http.api_password * Deprecated ApiConfig.api_password GitHub Drafted PR would trigger CI after changed it to normal PR. I have to commit a comment change to trigger it * Trigger CI * Adjust if- elif chain in auth middleware
This commit is contained in:
parent
7ec7e51f70
commit
fe1840f901
27 changed files with 304 additions and 324 deletions
|
@ -1,6 +1,5 @@
|
|||
"""Authentication for HTTP component."""
|
||||
import base64
|
||||
import hmac
|
||||
import logging
|
||||
|
||||
from aiohttp import hdrs
|
||||
|
@ -13,7 +12,11 @@ from homeassistant.const import HTTP_HEADER_HA_AUTH
|
|||
from homeassistant.core import callback
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from .const import KEY_AUTHENTICATED, KEY_REAL_IP
|
||||
from .const import (
|
||||
KEY_AUTHENTICATED,
|
||||
KEY_HASS_USER,
|
||||
KEY_REAL_IP,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
@ -40,10 +43,125 @@ def async_sign_path(hass, refresh_token_id, path, expiration):
|
|||
|
||||
|
||||
@callback
|
||||
def setup_auth(app, trusted_networks, api_password):
|
||||
def setup_auth(hass, app):
|
||||
"""Create auth middleware for the app."""
|
||||
old_auth_warning = set()
|
||||
|
||||
support_legacy = hass.auth.support_legacy
|
||||
if support_legacy:
|
||||
_LOGGER.warning("legacy_api_password support has been enabled.")
|
||||
|
||||
trusted_networks = []
|
||||
for prv in hass.auth.auth_providers:
|
||||
if prv.type == 'trusted_networks':
|
||||
trusted_networks += prv.trusted_networks
|
||||
|
||||
async def async_validate_auth_header(request):
|
||||
"""
|
||||
Test authorization header against access token.
|
||||
|
||||
Basic auth_type is legacy code, should be removed with api_password.
|
||||
"""
|
||||
try:
|
||||
auth_type, auth_val = \
|
||||
request.headers.get(hdrs.AUTHORIZATION).split(' ', 1)
|
||||
except ValueError:
|
||||
# If no space in authorization header
|
||||
return False
|
||||
|
||||
if auth_type == 'Bearer':
|
||||
refresh_token = await hass.auth.async_validate_access_token(
|
||||
auth_val)
|
||||
if refresh_token is None:
|
||||
return False
|
||||
|
||||
request[KEY_HASS_USER] = refresh_token.user
|
||||
return True
|
||||
|
||||
if auth_type == 'Basic' and support_legacy:
|
||||
decoded = base64.b64decode(auth_val).decode('utf-8')
|
||||
try:
|
||||
username, password = decoded.split(':', 1)
|
||||
except ValueError:
|
||||
# If no ':' in decoded
|
||||
return False
|
||||
|
||||
if username != 'homeassistant':
|
||||
return False
|
||||
|
||||
user = await legacy_api_password.async_validate_password(
|
||||
hass, password)
|
||||
if user is None:
|
||||
return False
|
||||
|
||||
request[KEY_HASS_USER] = user
|
||||
_LOGGER.info(
|
||||
'Basic auth with api_password is going to deprecate,'
|
||||
' please use a bearer token to access %s from %s',
|
||||
request.path, request[KEY_REAL_IP])
|
||||
old_auth_warning.add(request.path)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
async def async_validate_signed_request(request):
|
||||
"""Validate a signed request."""
|
||||
secret = hass.data.get(DATA_SIGN_SECRET)
|
||||
|
||||
if secret is None:
|
||||
return False
|
||||
|
||||
signature = request.query.get(SIGN_QUERY_PARAM)
|
||||
|
||||
if signature is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
claims = jwt.decode(
|
||||
signature,
|
||||
secret,
|
||||
algorithms=['HS256'],
|
||||
options={'verify_iss': False}
|
||||
)
|
||||
except jwt.InvalidTokenError:
|
||||
return False
|
||||
|
||||
if claims['path'] != request.path:
|
||||
return False
|
||||
|
||||
refresh_token = await hass.auth.async_get_refresh_token(claims['iss'])
|
||||
|
||||
if refresh_token is None:
|
||||
return False
|
||||
|
||||
request[KEY_HASS_USER] = refresh_token.user
|
||||
return True
|
||||
|
||||
async def async_validate_trusted_networks(request):
|
||||
"""Test if request is from a trusted ip."""
|
||||
ip_addr = request[KEY_REAL_IP]
|
||||
|
||||
if not any(ip_addr in trusted_network
|
||||
for trusted_network in trusted_networks):
|
||||
return False
|
||||
|
||||
user = await hass.auth.async_get_owner()
|
||||
if user is None:
|
||||
return False
|
||||
|
||||
request[KEY_HASS_USER] = user
|
||||
return True
|
||||
|
||||
async def async_validate_legacy_api_password(request, password):
|
||||
"""Validate api_password."""
|
||||
user = await legacy_api_password.async_validate_password(
|
||||
hass, password)
|
||||
if user is None:
|
||||
return False
|
||||
|
||||
request[KEY_HASS_USER] = user
|
||||
return True
|
||||
|
||||
@middleware
|
||||
async def auth_middleware(request, handler):
|
||||
"""Authenticate as middleware."""
|
||||
|
@ -53,13 +171,14 @@ def setup_auth(app, trusted_networks, api_password):
|
|||
DATA_API_PASSWORD in request.query):
|
||||
if request.path not in old_auth_warning:
|
||||
_LOGGER.log(
|
||||
logging.INFO if api_password else logging.WARNING,
|
||||
'You need to use a bearer token to access %s from %s',
|
||||
logging.INFO if support_legacy else logging.WARNING,
|
||||
'api_password is going to deprecate. You need to use a'
|
||||
' bearer token to access %s from %s',
|
||||
request.path, request[KEY_REAL_IP])
|
||||
old_auth_warning.add(request.path)
|
||||
|
||||
if (hdrs.AUTHORIZATION in request.headers and
|
||||
await async_validate_auth_header(request, api_password)):
|
||||
await async_validate_auth_header(request)):
|
||||
# it included both use_auth and api_password Basic auth
|
||||
authenticated = True
|
||||
|
||||
|
@ -69,133 +188,21 @@ def setup_auth(app, trusted_networks, api_password):
|
|||
await async_validate_signed_request(request)):
|
||||
authenticated = True
|
||||
|
||||
elif (api_password and HTTP_HEADER_HA_AUTH in request.headers and
|
||||
hmac.compare_digest(
|
||||
api_password.encode('utf-8'),
|
||||
request.headers[HTTP_HEADER_HA_AUTH].encode('utf-8'))):
|
||||
# A valid auth header has been set
|
||||
elif (trusted_networks and
|
||||
await async_validate_trusted_networks(request)):
|
||||
authenticated = True
|
||||
request['hass_user'] = await legacy_api_password.async_get_user(
|
||||
app['hass'])
|
||||
|
||||
elif (api_password and DATA_API_PASSWORD in request.query and
|
||||
hmac.compare_digest(
|
||||
api_password.encode('utf-8'),
|
||||
request.query[DATA_API_PASSWORD].encode('utf-8'))):
|
||||
elif (support_legacy and HTTP_HEADER_HA_AUTH in request.headers and
|
||||
await async_validate_legacy_api_password(
|
||||
request, request.headers[HTTP_HEADER_HA_AUTH])):
|
||||
authenticated = True
|
||||
request['hass_user'] = await legacy_api_password.async_get_user(
|
||||
app['hass'])
|
||||
|
||||
elif _is_trusted_ip(request, trusted_networks):
|
||||
users = await app['hass'].auth.async_get_users()
|
||||
for user in users:
|
||||
if user.is_owner:
|
||||
request['hass_user'] = user
|
||||
authenticated = True
|
||||
break
|
||||
elif (support_legacy and DATA_API_PASSWORD in request.query and
|
||||
await async_validate_legacy_api_password(
|
||||
request, request.query[DATA_API_PASSWORD])):
|
||||
authenticated = True
|
||||
|
||||
request[KEY_AUTHENTICATED] = authenticated
|
||||
return await handler(request)
|
||||
|
||||
app.middlewares.append(auth_middleware)
|
||||
|
||||
|
||||
def _is_trusted_ip(request, trusted_networks):
|
||||
"""Test if request is from a trusted ip."""
|
||||
ip_addr = request[KEY_REAL_IP]
|
||||
|
||||
return any(
|
||||
ip_addr in trusted_network for trusted_network
|
||||
in trusted_networks)
|
||||
|
||||
|
||||
def validate_password(request, api_password):
|
||||
"""Test if password is valid."""
|
||||
return hmac.compare_digest(
|
||||
api_password.encode('utf-8'),
|
||||
request.app['hass'].http.api_password.encode('utf-8'))
|
||||
|
||||
|
||||
async def async_validate_auth_header(request, api_password=None):
|
||||
"""
|
||||
Test authorization header against access token.
|
||||
|
||||
Basic auth_type is legacy code, should be removed with api_password.
|
||||
"""
|
||||
if hdrs.AUTHORIZATION not in request.headers:
|
||||
return False
|
||||
|
||||
try:
|
||||
auth_type, auth_val = \
|
||||
request.headers.get(hdrs.AUTHORIZATION).split(' ', 1)
|
||||
except ValueError:
|
||||
# If no space in authorization header
|
||||
return False
|
||||
|
||||
hass = request.app['hass']
|
||||
|
||||
if auth_type == 'Bearer':
|
||||
refresh_token = await hass.auth.async_validate_access_token(auth_val)
|
||||
if refresh_token is None:
|
||||
return False
|
||||
|
||||
request['hass_refresh_token'] = refresh_token
|
||||
request['hass_user'] = refresh_token.user
|
||||
return True
|
||||
|
||||
if auth_type == 'Basic' and api_password is not None:
|
||||
decoded = base64.b64decode(auth_val).decode('utf-8')
|
||||
try:
|
||||
username, password = decoded.split(':', 1)
|
||||
except ValueError:
|
||||
# If no ':' in decoded
|
||||
return False
|
||||
|
||||
if username != 'homeassistant':
|
||||
return False
|
||||
|
||||
if not hmac.compare_digest(api_password.encode('utf-8'),
|
||||
password.encode('utf-8')):
|
||||
return False
|
||||
|
||||
request['hass_user'] = await legacy_api_password.async_get_user(hass)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
async def async_validate_signed_request(request):
|
||||
"""Validate a signed request."""
|
||||
hass = request.app['hass']
|
||||
secret = hass.data.get(DATA_SIGN_SECRET)
|
||||
|
||||
if secret is None:
|
||||
return False
|
||||
|
||||
signature = request.query.get(SIGN_QUERY_PARAM)
|
||||
|
||||
if signature is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
claims = jwt.decode(
|
||||
signature,
|
||||
secret,
|
||||
algorithms=['HS256'],
|
||||
options={'verify_iss': False}
|
||||
)
|
||||
except jwt.InvalidTokenError:
|
||||
return False
|
||||
|
||||
if claims['path'] != request.path:
|
||||
return False
|
||||
|
||||
refresh_token = await hass.auth.async_get_refresh_token(claims['iss'])
|
||||
|
||||
if refresh_token is None:
|
||||
return False
|
||||
|
||||
request['hass_refresh_token'] = refresh_token
|
||||
request['hass_user'] = refresh_token.user
|
||||
|
||||
return True
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue