Use IndieAuth for client ID (#15369)

* Use IndieAuth for client ID

* Lint

* Lint & Fix tests

* Allow local IP addresses

* Update comment
This commit is contained in:
Paulus Schoutsen 2018-07-09 18:24:46 +02:00 committed by GitHub
parent f7d7d825b0
commit 0d4841cbea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 329 additions and 347 deletions

View file

@ -115,7 +115,8 @@ from homeassistant.helpers.data_entry_flow import (
from homeassistant.components.http.view import HomeAssistantView
from homeassistant.components.http.data_validator import RequestDataValidator
from .client import verify_client
from . import indieauth
DOMAIN = 'auth'
DEPENDENCIES = ['http']
@ -143,8 +144,7 @@ class AuthProvidersView(HomeAssistantView):
name = 'api:auth:providers'
requires_auth = False
@verify_client
async def get(self, request, client):
async def get(self, request):
"""Get available auth providers."""
return self.json([{
'name': provider.name,
@ -164,16 +164,16 @@ class LoginFlowIndexView(FlowManagerIndexView):
"""Do not allow index of flows in progress."""
return aiohttp.web.Response(status=405)
# pylint: disable=arguments-differ
@verify_client
@RequestDataValidator(vol.Schema({
vol.Required('client_id'): str,
vol.Required('handler'): vol.Any(str, list),
vol.Required('redirect_uri'): str,
}))
async def post(self, request, client, data):
async def post(self, request, data):
"""Create a new login flow."""
if data['redirect_uri'] not in client.redirect_uris:
return self.json_message('invalid redirect uri', )
if not indieauth.verify_redirect_uri(data['client_id'],
data['redirect_uri']):
return self.json_message('invalid client id or redirect uri', 400)
# pylint: disable=no-value-for-parameter
return await super().post(request)
@ -191,16 +191,20 @@ class LoginFlowResourceView(FlowManagerResourceView):
super().__init__(flow_mgr)
self._store_credentials = store_credentials
# pylint: disable=arguments-differ
async def get(self, request):
async def get(self, request, flow_id):
"""Do not allow getting status of a flow in progress."""
return self.json_message('Invalid flow specified', 404)
# pylint: disable=arguments-differ
@verify_client
@RequestDataValidator(vol.Schema(dict), allow_empty=True)
async def post(self, request, client, flow_id, data):
@RequestDataValidator(vol.Schema({
'client_id': str
}, extra=vol.ALLOW_EXTRA))
async def post(self, request, flow_id, data):
"""Handle progressing a login flow request."""
client_id = data.pop('client_id')
if not indieauth.verify_client_id(client_id):
return self.json_message('Invalid client id', 400)
try:
result = await self._flow_mgr.async_configure(flow_id, data)
except data_entry_flow.UnknownFlow:
@ -212,7 +216,7 @@ class LoginFlowResourceView(FlowManagerResourceView):
return self.json(self._prepare_result_json(result))
result.pop('data')
result['result'] = self._store_credentials(client.id, result['result'])
result['result'] = self._store_credentials(client_id, result['result'])
return self.json(result)
@ -228,24 +232,31 @@ class GrantTokenView(HomeAssistantView):
"""Initialize the grant token view."""
self._retrieve_credentials = retrieve_credentials
@verify_client
async def post(self, request, client):
async def post(self, request):
"""Grant a token."""
hass = request.app['hass']
data = await request.post()
client_id = data.get('client_id')
if client_id is None or not indieauth.verify_client_id(client_id):
return self.json({
'error': 'invalid_request',
}, status_code=400)
grant_type = data.get('grant_type')
if grant_type == 'authorization_code':
return await self._async_handle_auth_code(hass, client, data)
return await self._async_handle_auth_code(hass, client_id, data)
elif grant_type == 'refresh_token':
return await self._async_handle_refresh_token(hass, client, data)
return await self._async_handle_refresh_token(
hass, client_id, data)
return self.json({
'error': 'unsupported_grant_type',
}, status_code=400)
async def _async_handle_auth_code(self, hass, client, data):
async def _async_handle_auth_code(self, hass, client_id, data):
"""Handle authorization code request."""
code = data.get('code')
@ -254,7 +265,7 @@ class GrantTokenView(HomeAssistantView):
'error': 'invalid_request',
}, status_code=400)
credentials = self._retrieve_credentials(client.id, code)
credentials = self._retrieve_credentials(client_id, code)
if credentials is None:
return self.json({
@ -263,7 +274,7 @@ class GrantTokenView(HomeAssistantView):
user = await hass.auth.async_get_or_create_user(credentials)
refresh_token = await hass.auth.async_create_refresh_token(user,
client)
client_id)
access_token = hass.auth.async_create_access_token(refresh_token)
return self.json({
@ -274,7 +285,7 @@ class GrantTokenView(HomeAssistantView):
int(refresh_token.access_token_expiration.total_seconds()),
})
async def _async_handle_refresh_token(self, hass, client, data):
async def _async_handle_refresh_token(self, hass, client_id, data):
"""Handle authorization code request."""
token = data.get('refresh_token')
@ -285,7 +296,7 @@ class GrantTokenView(HomeAssistantView):
refresh_token = await hass.auth.async_get_refresh_token(token)
if refresh_token is None or refresh_token.client_id != client.id:
if refresh_token is None or refresh_token.client_id != client_id:
return self.json({
'error': 'invalid_grant',
}, status_code=400)

View file

@ -1,79 +0,0 @@
"""Helpers to resolve client ID/secret."""
import base64
from functools import wraps
import hmac
import aiohttp.hdrs
def verify_client(method):
"""Decorator to verify client id/secret on requests."""
@wraps(method)
async def wrapper(view, request, *args, **kwargs):
"""Verify client id/secret before doing request."""
client = await _verify_client(request)
if client is None:
return view.json({
'error': 'invalid_client',
}, status_code=401)
return await method(
view, request, *args, **kwargs, client=client)
return wrapper
async def _verify_client(request):
"""Method to verify the client id/secret in consistent time.
By using a consistent time for looking up client id and comparing the
secret, we prevent attacks by malicious actors trying different client ids
and are able to derive from the time it takes to process the request if
they guessed the client id correctly.
"""
if aiohttp.hdrs.AUTHORIZATION not in request.headers:
return None
auth_type, auth_value = \
request.headers.get(aiohttp.hdrs.AUTHORIZATION).split(' ', 1)
if auth_type != 'Basic':
return None
decoded = base64.b64decode(auth_value).decode('utf-8')
try:
client_id, client_secret = decoded.split(':', 1)
except ValueError:
# If no ':' in decoded
client_id, client_secret = decoded, None
return await async_secure_get_client(
request.app['hass'], client_id, client_secret)
async def async_secure_get_client(hass, client_id, client_secret):
"""Get a client id/secret in consistent time."""
client = await hass.auth.async_get_client(client_id)
if client is None:
if client_secret is not None:
# Still do a compare so we run same time as if a client was found.
hmac.compare_digest(client_secret.encode('utf-8'),
client_secret.encode('utf-8'))
return None
if client.secret is None:
return client
elif client_secret is None:
# Still do a compare so we run same time as if a secret was passed.
hmac.compare_digest(client.secret.encode('utf-8'),
client.secret.encode('utf-8'))
return None
elif hmac.compare_digest(client_secret.encode('utf-8'),
client.secret.encode('utf-8')):
return client
return None

View file

@ -0,0 +1,130 @@
"""Helpers to resolve client ID/secret."""
from ipaddress import ip_address, ip_network
from urllib.parse import urlparse
# IP addresses of loopback interfaces
ALLOWED_IPS = (
ip_address('127.0.0.1'),
ip_address('::1'),
)
# RFC1918 - Address allocation for Private Internets
ALLOWED_NETWORKS = (
ip_network('10.0.0.0/8'),
ip_network('172.16.0.0/12'),
ip_network('192.168.0.0/16'),
)
def verify_redirect_uri(client_id, redirect_uri):
"""Verify that the client and redirect uri match."""
try:
client_id_parts = _parse_client_id(client_id)
except ValueError:
return False
redirect_parts = _parse_url(redirect_uri)
# IndieAuth 4.2.2 allows for redirect_uri to be on different domain
# but needs to be specified in link tag when fetching `client_id`.
# This is not implemented.
# Verify redirect url and client url have same scheme and domain.
return (
client_id_parts.scheme == redirect_parts.scheme and
client_id_parts.netloc == redirect_parts.netloc
)
def verify_client_id(client_id):
"""Verify that the client id is valid."""
try:
_parse_client_id(client_id)
return True
except ValueError:
return False
def _parse_url(url):
"""Parse a url in parts and canonicalize according to IndieAuth."""
parts = urlparse(url)
# Canonicalize a url according to IndieAuth 3.2.
# SHOULD convert the hostname to lowercase
parts = parts._replace(netloc=parts.netloc.lower())
# If a URL with no path component is ever encountered,
# it MUST be treated as if it had the path /.
if parts.path == '':
parts = parts._replace(path='/')
return parts
def _parse_client_id(client_id):
"""Test if client id is a valid URL according to IndieAuth section 3.2.
https://indieauth.spec.indieweb.org/#client-identifier
"""
parts = _parse_url(client_id)
# Client identifier URLs
# MUST have either an https or http scheme
if parts.scheme not in ('http', 'https'):
raise ValueError()
# MUST contain a path component
# Handled by url canonicalization.
# MUST NOT contain single-dot or double-dot path segments
if any(segment in ('.', '..') for segment in parts.path.split('/')):
raise ValueError(
'Client ID cannot contain single-dot or double-dot path segments')
# MUST NOT contain a fragment component
if parts.fragment != '':
raise ValueError('Client ID cannot contain a fragment')
# MUST NOT contain a username or password component
if parts.username is not None:
raise ValueError('Client ID cannot contain username')
if parts.password is not None:
raise ValueError('Client ID cannot contain password')
# MAY contain a port
try:
# parts raises ValueError when port cannot be parsed as int
parts.port
except ValueError:
raise ValueError('Client ID contains invalid port')
# Additionally, hostnames
# MUST be domain names or a loopback interface and
# MUST NOT be IPv4 or IPv6 addresses except for IPv4 127.0.0.1
# or IPv6 [::1]
# We are not goint to follow the spec here. We are going to allow
# any internal network IP to be used inside a client id.
address = None
try:
netloc = parts.netloc
# Strip the [, ] from ipv6 addresses before parsing
if netloc[0] == '[' and netloc[-1] == ']':
netloc = netloc[1:-1]
address = ip_address(netloc)
except ValueError:
# Not an ip address
pass
if (address is None or
address in ALLOWED_IPS or
any(address in network for network in ALLOWED_NETWORKS)):
return parts
raise ValueError('Hostname should be a domain name or local IP address')