Improve X-Forwarded-* request headers handling (#38696)

Co-authored-by: Paulus Schoutsen <balloob@gmail.com>
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
Co-authored-by: Franck Nijhof <git@frenck.dev>
Co-authored-by: Pascal Vizeli <pvizeli@syshack.ch>
This commit is contained in:
Franck Nijhof 2020-08-11 22:57:50 +02:00 committed by GitHub
parent 4fa346278c
commit cc4ebc925c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 703 additions and 195 deletions

View file

@ -127,7 +127,6 @@ from homeassistant.auth.models import (
User,
)
from homeassistant.components import websocket_api
from homeassistant.components.http import KEY_REAL_IP
from homeassistant.components.http.auth import async_sign_path
from homeassistant.components.http.ban import log_invalid_auth
from homeassistant.components.http.data_validator import RequestDataValidator
@ -252,14 +251,10 @@ class TokenView(HomeAssistantView):
return await self._async_handle_revoke_token(hass, data)
if grant_type == "authorization_code":
return await self._async_handle_auth_code(
hass, data, str(request[KEY_REAL_IP])
)
return await self._async_handle_auth_code(hass, data, request.remote)
if grant_type == "refresh_token":
return await self._async_handle_refresh_token(
hass, data, str(request[KEY_REAL_IP])
)
return await self._async_handle_refresh_token(hass, data, request.remote)
return self.json(
{"error": "unsupported_grant_type"}, status_code=HTTP_BAD_REQUEST

View file

@ -66,12 +66,13 @@ associate with an credential if "type" set to "link_user" in
"version": 1
}
"""
from ipaddress import ip_address
from aiohttp import web
import voluptuous as vol
import voluptuous_serialize
from homeassistant import data_entry_flow
from homeassistant.components.http import KEY_REAL_IP
from homeassistant.components.http.ban import (
log_invalid_auth,
process_success_login,
@ -183,7 +184,7 @@ class LoginFlowIndexView(HomeAssistantView):
result = await self._flow_mgr.async_init(
handler,
context={
"ip_address": request[KEY_REAL_IP],
"ip_address": ip_address(request.remote),
"credential_only": data.get("type") == "link_user",
},
)
@ -231,7 +232,7 @@ class LoginFlowResourceView(HomeAssistantView):
for flow in self._flow_mgr.async_progress():
if flow["flow_id"] == flow_id and flow["context"][
"ip_address"
] != request.get(KEY_REAL_IP):
] != ip_address(request.remote):
return self.json_message("IP address changed", HTTP_BAD_REQUEST)
result = await self._flow_mgr.async_configure(flow_id, data)

View file

@ -5,7 +5,6 @@ from aiohttp import web
import voluptuous as vol
from homeassistant import util
from homeassistant.components.http import real_ip
from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP
from homeassistant.exceptions import HomeAssistantError
import homeassistant.helpers.config_validation as cv
@ -101,7 +100,6 @@ async def async_setup(hass, yaml_config):
app = web.Application()
app["hass"] = hass
real_ip.setup_real_ip(app, False, [])
# We misunderstood the startup signal. You're not allowed to change
# anything during startup. Temp workaround.
# pylint: disable=protected-access

View file

@ -1,6 +1,7 @@
"""Support for a Hue API to control Home Assistant."""
import asyncio
import hashlib
from ipaddress import ip_address
import logging
import time
@ -34,7 +35,6 @@ from homeassistant.components.fan import (
SUPPORT_SET_SPEED,
)
from homeassistant.components.http import HomeAssistantView
from homeassistant.components.http.const import KEY_REAL_IP
from homeassistant.components.humidifier.const import (
ATTR_HUMIDITY,
SERVICE_SET_HUMIDITY,
@ -131,7 +131,7 @@ class HueUsernameView(HomeAssistantView):
async def post(self, request):
"""Handle a POST request."""
if not is_local(request[KEY_REAL_IP]):
if not is_local(ip_address(request.remote)):
return self.json_message("Only local IPs allowed", HTTP_UNAUTHORIZED)
try:
@ -159,7 +159,7 @@ class HueAllGroupsStateView(HomeAssistantView):
@core.callback
def get(self, request, username):
"""Process a request to make the Brilliant Lightpad work."""
if not is_local(request[KEY_REAL_IP]):
if not is_local(ip_address(request.remote)):
return self.json_message("Only local IPs allowed", HTTP_UNAUTHORIZED)
return self.json({})
@ -179,7 +179,7 @@ class HueGroupView(HomeAssistantView):
@core.callback
def put(self, request, username):
"""Process a request to make the Logitech Pop working."""
if not is_local(request[KEY_REAL_IP]):
if not is_local(ip_address(request.remote)):
return self.json_message("Only local IPs allowed", HTTP_UNAUTHORIZED)
return self.json(
@ -209,7 +209,7 @@ class HueAllLightsStateView(HomeAssistantView):
@core.callback
def get(self, request, username):
"""Process a request to get the list of available lights."""
if not is_local(request[KEY_REAL_IP]):
if not is_local(ip_address(request.remote)):
return self.json_message("Only local IPs allowed", HTTP_UNAUTHORIZED)
return self.json(create_list_of_entities(self.config, request))
@ -229,7 +229,7 @@ class HueFullStateView(HomeAssistantView):
@core.callback
def get(self, request, username):
"""Process a request to get the list of available lights."""
if not is_local(request[KEY_REAL_IP]):
if not is_local(ip_address(request.remote)):
return self.json_message("only local IPs allowed", HTTP_UNAUTHORIZED)
if username != HUE_API_USERNAME:
return self.json(UNAUTHORIZED_USER)
@ -256,7 +256,7 @@ class HueConfigView(HomeAssistantView):
@core.callback
def get(self, request, username):
"""Process a request to get the configuration."""
if not is_local(request[KEY_REAL_IP]):
if not is_local(ip_address(request.remote)):
return self.json_message("only local IPs allowed", HTTP_UNAUTHORIZED)
if username != HUE_API_USERNAME:
return self.json(UNAUTHORIZED_USER)
@ -280,7 +280,7 @@ class HueOneLightStateView(HomeAssistantView):
@core.callback
def get(self, request, username, entity_id):
"""Process a request to get the state of an individual light."""
if not is_local(request[KEY_REAL_IP]):
if not is_local(ip_address(request.remote)):
return self.json_message("Only local IPs allowed", HTTP_UNAUTHORIZED)
hass = request.app["hass"]
@ -321,7 +321,7 @@ class HueOneLightChangeView(HomeAssistantView):
async def put(self, request, username, entity_number):
"""Process a request to set the state of an individual light."""
if not is_local(request[KEY_REAL_IP]):
if not is_local(ip_address(request.remote)):
return self.json_message("Only local IPs allowed", HTTP_UNAUTHORIZED)
config = self.config

View file

@ -13,7 +13,7 @@ import voluptuous as vol
from homeassistant.auth.models import User
from homeassistant.components.http import HomeAssistantView
from homeassistant.components.http.const import KEY_HASS_USER, KEY_REAL_IP
from homeassistant.components.http.const import KEY_HASS_USER
from homeassistant.components.http.data_validator import RequestDataValidator
from homeassistant.const import HTTP_OK
from homeassistant.core import callback
@ -63,8 +63,10 @@ class HassIOBaseAuth(HomeAssistantView):
"""Check if this call is from Supervisor."""
# Check caller IP
hassio_ip = os.environ["HASSIO"].split(":")[0]
if request[KEY_REAL_IP] != ip_address(hassio_ip):
_LOGGER.error("Invalid auth request from %s", request[KEY_REAL_IP])
if ip_address(request.transport.get_extra_info("peername")[0]) != ip_address(
hassio_ip
):
_LOGGER.error("Invalid auth request from %s", request.remote)
raise HTTPUnauthorized()
# Check caller token

View file

@ -26,9 +26,9 @@ from homeassistant.util import ssl as ssl_util
from .auth import setup_auth
from .ban import setup_bans
from .const import KEY_AUTHENTICATED, KEY_HASS, KEY_HASS_USER, KEY_REAL_IP # noqa: F401
from .const import KEY_AUTHENTICATED, KEY_HASS, KEY_HASS_USER # noqa: F401
from .cors import setup_cors
from .real_ip import setup_real_ip
from .forwarded import async_setup_forwarded
from .request_context import setup_request_context
from .static import CACHE_HEADERS, CachingStaticResource
from .view import HomeAssistantView # noqa: F401
@ -296,9 +296,13 @@ class HomeAssistantHTTP:
)
app[KEY_HASS] = hass
# This order matters
# Order matters, forwarded middleware needs to go first.
# Only register middleware if `use_x_forwarded_for` is enabled
# and trusted proxies are provided
if use_x_forwarded_for and trusted_proxies:
async_setup_forwarded(app, trusted_proxies)
setup_request_context(app, current_request)
setup_real_ip(app, use_x_forwarded_for, trusted_proxies)
if is_ban_enabled:
setup_bans(hass, app, login_threshold)

View file

@ -9,7 +9,7 @@ import jwt
from homeassistant.core import callback
from homeassistant.util import dt as dt_util
from .const import KEY_AUTHENTICATED, KEY_HASS_USER, KEY_REAL_IP
from .const import KEY_AUTHENTICATED, KEY_HASS_USER
# mypy: allow-untyped-defs, no-check-untyped-defs
@ -118,7 +118,7 @@ def setup_auth(hass, app):
if authenticated:
_LOGGER.debug(
"Authenticated %s for %s using %s",
request[KEY_REAL_IP],
request.remote,
request.path,
auth_type,
)

View file

@ -16,8 +16,6 @@ from homeassistant.exceptions import HomeAssistantError
import homeassistant.helpers.config_validation as cv
from homeassistant.util.yaml import dump
from .const import KEY_REAL_IP
# mypy: allow-untyped-defs, no-check-untyped-defs
_LOGGER = logging.getLogger(__name__)
@ -61,7 +59,7 @@ async def ban_middleware(request, handler):
return await handler(request)
# Verify if IP is not banned
ip_address_ = request[KEY_REAL_IP]
ip_address_ = ip_address(request.remote)
is_banned = any(
ip_ban.ip_address == ip_address_ for ip_ban in request.app[KEY_BANNED_IPS]
)
@ -95,7 +93,7 @@ async def process_wrong_login(request):
Increase failed login attempts counter for remote IP address.
Add ip ban entry if failed login attempts exceeds threshold.
"""
remote_addr = request[KEY_REAL_IP]
remote_addr = ip_address(request.remote)
msg = f"Login attempt or request with invalid authentication from {remote_addr}"
_LOGGER.warning(msg)
@ -144,7 +142,7 @@ async def process_success_login(request):
No release IP address from banned list function, it can only be done by
manual modify ip bans config file.
"""
remote_addr = request[KEY_REAL_IP]
remote_addr = ip_address(request.remote)
# Check if ban middleware is loaded
if KEY_BANNED_IPS not in request.app or request.app[KEY_LOGIN_THRESHOLD] < 1:

View file

@ -2,4 +2,3 @@
KEY_AUTHENTICATED = "ha_authenticated"
KEY_HASS = "hass"
KEY_HASS_USER = "hass_user"
KEY_REAL_IP = "ha_real_ip"

View file

@ -0,0 +1,174 @@
"""Middleware to handle forwarded data by a reverse proxy."""
from ipaddress import ip_address
import logging
from aiohttp.hdrs import X_FORWARDED_FOR, X_FORWARDED_HOST, X_FORWARDED_PROTO
from aiohttp.web import HTTPBadRequest, middleware
from homeassistant.core import callback
_LOGGER = logging.getLogger(__name__)
# mypy: allow-untyped-defs
@callback
def async_setup_forwarded(app, trusted_proxies):
"""Create forwarded middleware for the app.
Process IP addresses, proto and host information in the forwarded for headers.
`X-Forwarded-For: <client>, <proxy1>, <proxy2>`
e.g., `X-Forwarded-For: 203.0.113.195, 70.41.3.18, 150.172.238.178`
We go through the list from the right side, and skip all entries that are in our
trusted proxies list. The first non-trusted IP is used as the client IP. If all
items in the X-Forwarded-For are trusted, including the most left item (client),
the most left item is used. In the latter case, the client connection originated
from an IP that is also listed as a trusted proxy IP or network.
`X-Forwarded-Proto: <client>, <proxy1>, <proxy2>`
e.g., `X-Forwarded-Proto: https, http, http`
OR `X-Forwarded-Proto: https` (one entry, even with multiple proxies)
The X-Forwarded-Proto is determined based on the corresponding entry of the
X-Forwarded-For header that is used/chosen as the client IP. However,
some proxies, for example, Kubernetes NGINX ingress, only retain one element
in the X-Forwarded-Proto header. In that case, we'll just use what we have.
`X-Forwarded-Host: <host>`
e.g., `X-Forwarded-Host: example.com`
If the previous headers are processed successfully, and the X-Forwarded-Host is
present, it will be used.
Additionally:
- If no X-Forwarded-For header is found, the processing of all headers is skipped.
- Log a warning when untrusted connected peer provides X-Forwarded-For headers.
- If multiple instances of X-Forwarded-For, X-Forwarded-Proto or
X-Forwarded-Host are found, an HTTP 400 status code is thrown.
- If malformed or invalid (IP) data in X-Forwarded-For header is found,
an HTTP 400 status code is thrown.
- The connected client peer on the socket of the incoming connection,
must be trusted for any processing to take place.
- If the number of elements in X-Forwarded-Proto does not equal 1 or
is equal to the number of elements in X-Forwarded-For, an HTTP 400
status code is thrown.
- If an empty X-Forwarded-Host is provided, an HTTP 400 status code is thrown.
- If an empty X-Forwarded-Proto is provided, or an empty element in the list,
an HTTP 400 status code is thrown.
"""
@middleware
async def forwarded_middleware(request, handler):
"""Process forwarded data by a reverse proxy."""
overrides = {}
# Handle X-Forwarded-For
forwarded_for_headers = request.headers.getall(X_FORWARDED_FOR, [])
if not forwarded_for_headers:
# No forwarding headers, continue as normal
return await handler(request)
# Ensure the IP of the connected peer is trusted
connected_ip = ip_address(request.transport.get_extra_info("peername")[0])
if not any(connected_ip in trusted_proxy for trusted_proxy in trusted_proxies):
_LOGGER.warning(
"Received X-Forwarded-For header from untrusted proxy %s, headers not processed",
connected_ip,
)
# Not trusted, continue as normal
return await handler(request)
# Multiple X-Forwarded-For headers
if len(forwarded_for_headers) > 1:
_LOGGER.error(
"Too many headers for X-Forwarded-For: %s", forwarded_for_headers
)
raise HTTPBadRequest
# Process X-Forwarded-For from the right side (by reversing the list)
forwarded_for_split = list(reversed(forwarded_for_headers[0].split(",")))
try:
forwarded_for = [ip_address(addr.strip()) for addr in forwarded_for_split]
except ValueError:
_LOGGER.error(
"Invalid IP address in X-Forwarded-For: %s", forwarded_for_headers[0]
)
raise HTTPBadRequest
# Find the last trusted index in the X-Forwarded-For list
forwarded_for_index = 0
for forwarded_ip in forwarded_for:
if any(forwarded_ip in trusted_proxy for trusted_proxy in trusted_proxies):
forwarded_for_index += 1
continue
overrides["remote"] = str(forwarded_ip)
break
else:
# If all the IP addresses are from trusted networks, take the left-most.
forwarded_for_index = -1
overrides["remote"] = str(forwarded_for[-1])
# Handle X-Forwarded-Proto
forwarded_proto_headers = request.headers.getall(X_FORWARDED_PROTO, [])
if forwarded_proto_headers:
if len(forwarded_proto_headers) > 1:
_LOGGER.error(
"Too many headers for X-Forward-Proto: %s", forwarded_proto_headers
)
raise HTTPBadRequest
forwarded_proto_split = list(
reversed(forwarded_proto_headers[0].split(","))
)
forwarded_proto = [proto.strip() for proto in forwarded_proto_split]
# Catch empty values
if "" in forwarded_proto:
_LOGGER.error(
"Empty item received in X-Forward-Proto header: %s",
forwarded_proto_headers[0],
)
raise HTTPBadRequest
# The X-Forwarded-Proto contains either one element, or the equals number
# of elements as X-Forwarded-For
if len(forwarded_proto) not in (1, len(forwarded_for)):
_LOGGER.error(
"Incorrect number of elements in X-Forward-Proto. Expected 1 or %d, got %d: %s",
len(forwarded_for),
len(forwarded_proto),
forwarded_proto_headers[0],
)
raise HTTPBadRequest
# Ideally this should take the scheme corresponding to the entry
# in X-Forwarded-For that was chosen, but some proxies only retain
# one element. In that case, use what we have.
overrides["scheme"] = forwarded_proto[-1]
if len(forwarded_proto) != 1:
overrides["scheme"] = forwarded_proto[forwarded_for_index]
# Handle X-Forwarded-Host
forwarded_host_headers = request.headers.getall(X_FORWARDED_HOST, [])
if forwarded_host_headers:
# Multiple X-Forwarded-Host headers
if len(forwarded_host_headers) > 1:
_LOGGER.error(
"Too many headers for X-Forwarded-Host: %s", forwarded_host_headers
)
raise HTTPBadRequest
forwarded_host = forwarded_host_headers[0].strip()
if not forwarded_host:
_LOGGER.error("Empty value received in X-Forward-Host header")
raise HTTPBadRequest
overrides["host"] = forwarded_host
# Done, create a new request based on gathered data.
request = request.clone(**overrides)
return await handler(request)
app.middlewares.append(forwarded_middleware)

View file

@ -1,41 +0,0 @@
"""Middleware to fetch real IP."""
from ipaddress import ip_address
from aiohttp.hdrs import X_FORWARDED_FOR
from aiohttp.web import middleware
from homeassistant.core import callback
from .const import KEY_REAL_IP
# mypy: allow-untyped-defs
@callback
def setup_real_ip(app, use_x_forwarded_for, trusted_proxies):
"""Create IP Ban middleware for the app."""
@middleware
async def real_ip_middleware(request, handler):
"""Real IP middleware."""
connected_ip = ip_address(request.transport.get_extra_info("peername")[0])
request[KEY_REAL_IP] = connected_ip
# Only use the XFF header if enabled, present, and from a trusted proxy
try:
if (
use_x_forwarded_for
and X_FORWARDED_FOR in request.headers
and any(
connected_ip in trusted_proxy for trusted_proxy in trusted_proxies
)
):
request[KEY_REAL_IP] = ip_address(
request.headers.get(X_FORWARDED_FOR).split(", ")[-1]
)
except ValueError:
pass
return await handler(request)
app.middlewares.append(real_ip_middleware)

View file

@ -18,7 +18,7 @@ from homeassistant.const import CONTENT_TYPE_JSON, HTTP_OK, HTTP_SERVICE_UNAVAIL
from homeassistant.core import Context, is_callback
from homeassistant.helpers.json import JSONEncoder
from .const import KEY_AUTHENTICATED, KEY_HASS, KEY_REAL_IP
from .const import KEY_AUTHENTICATED, KEY_HASS
_LOGGER = logging.getLogger(__name__)
@ -116,10 +116,7 @@ def request_handler_factory(view: HomeAssistantView, handler: Callable) -> Calla
raise HTTPUnauthorized()
_LOGGER.debug(
"Serving %s to %s (auth: %s)",
request.path,
request.get(KEY_REAL_IP),
authenticated,
"Serving %s to %s (auth: %s)", request.path, request.remote, authenticated,
)
try:

View file

@ -1,11 +1,11 @@
"""Support for Telegram bots using webhooks."""
import datetime as dt
from ipaddress import ip_address
import logging
from telegram.error import TimedOut
from homeassistant.components.http import HomeAssistantView
from homeassistant.components.http.const import KEY_REAL_IP
from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP,
HTTP_BAD_REQUEST,
@ -96,7 +96,7 @@ class BotPushReceiver(HomeAssistantView, BaseTelegramBotEntity):
async def post(self, request):
"""Accept the POST from telegram."""
real_ip = request[KEY_REAL_IP]
real_ip = ip_address(request.remote)
if not any(real_ip in net for net in self.trusted_networks):
_LOGGER.warning("Access denied from %s", real_ip)
return self.json_message("Access denied", HTTP_UNAUTHORIZED)

View file

@ -6,7 +6,6 @@ from aiohttp.web import Request, Response
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.components.http.const import KEY_REAL_IP
from homeassistant.components.http.view import HomeAssistantView
from homeassistant.const import HTTP_OK
from homeassistant.core import callback
@ -80,7 +79,7 @@ async def async_handle_webhook(hass, webhook_id, request):
if isinstance(request, MockRequest):
received_from = request.mock_source
else:
received_from = request[KEY_REAL_IP]
received_from = request.remote
_LOGGER.warning(
"Received message for unregistered webhook %s from %s",

View file

@ -1157,7 +1157,7 @@ async def test_external_ip_blocked(hue_client):
postUrls = ["/api"]
putUrls = ["/api/username/lights/light.ceiling_lights/state"]
with patch(
"homeassistant.components.http.real_ip.ip_address",
"homeassistant.components.emulated_hue.hue_api.ip_address",
return_value=ip_address("45.45.45.45"),
):
for getUrl in getUrls:

View file

@ -1,10 +1,6 @@
"""Tests for the HTTP component."""
from ipaddress import ip_address
from aiohttp import web
from homeassistant.components.http.const import KEY_REAL_IP
# Relic from the past. Kept here so we can run negative tests.
HTTP_HEADER_HA_AUTH = "X-HA-access"
@ -25,7 +21,7 @@ def mock_real_ip(app):
"""Mock Real IP middleware."""
nonlocal ip_to_mock
request[KEY_REAL_IP] = ip_address(ip_to_mock)
request = request.clone(remote=ip_to_mock)
return await handler(request)

View file

@ -9,7 +9,7 @@ import pytest
from homeassistant.auth.providers import trusted_networks
from homeassistant.components.http.auth import async_sign_path, setup_auth
from homeassistant.components.http.const import KEY_AUTHENTICATED
from homeassistant.components.http.real_ip import setup_real_ip
from homeassistant.components.http.forwarded import async_setup_forwarded
from homeassistant.setup import async_setup_component
from . import HTTP_HEADER_HA_AUTH, mock_real_ip
@ -54,7 +54,7 @@ def app(hass):
app = web.Application()
app["hass"] = hass
app.router.add_get("/", mock_handler)
setup_real_ip(app, False, [])
async_setup_forwarded(app, [])
return app

View file

@ -0,0 +1,487 @@
"""Test real forwarded middleware."""
from ipaddress import ip_network
from aiohttp import web
from aiohttp.hdrs import X_FORWARDED_FOR, X_FORWARDED_HOST, X_FORWARDED_PROTO
import pytest
from homeassistant.components.http.forwarded import async_setup_forwarded
async def mock_handler(request):
"""Return the real IP as text."""
return web.Response(text=request.remote)
async def test_x_forwarded_for_without_trusted_proxy(aiohttp_client, caplog):
"""Test that we get the IP from the transport."""
async def handler(request):
url = mock_api_client.make_url("/")
assert request.host == f"{url.host}:{url.port}"
assert request.scheme == "http"
assert not request.secure
assert request.remote == "127.0.0.1"
return web.Response()
app = web.Application()
app.router.add_get("/", handler)
async_setup_forwarded(app, [])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: "255.255.255.255"})
assert resp.status == 200
assert (
"Received X-Forwarded-For header from untrusted proxy 127.0.0.1, headers not processed"
in caplog.text
)
@pytest.mark.parametrize(
"trusted_proxies,x_forwarded_for,remote",
[
(
["127.0.0.0/24", "1.1.1.1", "10.10.10.0/24"],
"10.10.10.10, 1.1.1.1",
"10.10.10.10",
),
(["127.0.0.0/24", "1.1.1.1"], "123.123.123.123, 2.2.2.2, 1.1.1.1", "2.2.2.2"),
(["127.0.0.0/24", "1.1.1.1"], "123.123.123.123,2.2.2.2,1.1.1.1", "2.2.2.2"),
(["127.0.0.0/24"], "123.123.123.123, 2.2.2.2, 1.1.1.1", "1.1.1.1"),
(["127.0.0.0/24"], "127.0.0.1", "127.0.0.1"),
(["127.0.0.1", "1.1.1.1"], "123.123.123.123, 1.1.1.1", "123.123.123.123"),
(["127.0.0.1", "1.1.1.1"], "123.123.123.123, 2.2.2.2, 1.1.1.1", "2.2.2.2"),
(["127.0.0.1"], "255.255.255.255", "255.255.255.255"),
],
)
async def test_x_forwarded_for_with_trusted_proxy(
trusted_proxies, x_forwarded_for, remote, aiohttp_client
):
"""Test that we get the IP from the forwarded for header."""
async def handler(request):
url = mock_api_client.make_url("/")
assert request.host == f"{url.host}:{url.port}"
assert request.scheme == "http"
assert not request.secure
assert request.remote == remote
return web.Response()
app = web.Application()
app.router.add_get("/", handler)
async_setup_forwarded(
app, [ip_network(trusted_proxy) for trusted_proxy in trusted_proxies]
)
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: x_forwarded_for})
assert resp.status == 200
async def test_x_forwarded_for_with_untrusted_proxy(aiohttp_client):
"""Test that we get the IP from transport with untrusted proxy."""
async def handler(request):
url = mock_api_client.make_url("/")
assert request.host == f"{url.host}:{url.port}"
assert request.scheme == "http"
assert not request.secure
assert request.remote == "127.0.0.1"
return web.Response()
app = web.Application()
app.router.add_get("/", handler)
async_setup_forwarded(app, [ip_network("1.1.1.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: "255.255.255.255"})
assert resp.status == 200
async def test_x_forwarded_for_with_spoofed_header(aiohttp_client):
"""Test that we get the IP from the transport with a spoofed header."""
async def handler(request):
url = mock_api_client.make_url("/")
assert request.host == f"{url.host}:{url.port}"
assert request.scheme == "http"
assert not request.secure
assert request.remote == "255.255.255.255"
return web.Response()
app = web.Application()
app.router.add_get("/", handler)
async_setup_forwarded(app, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/", headers={X_FORWARDED_FOR: "222.222.222.222, 255.255.255.255"}
)
assert resp.status == 200
@pytest.mark.parametrize(
"x_forwarded_for",
[
"This value is invalid",
"1.1.1.1, , 1.2.3.4",
"1.1.1.1,,1.2.3.4",
"1.1.1.1, batman, 1.2.3.4",
"192.168.0.0/24",
"192.168.0.0/24, 1.1.1.1",
",",
"",
],
)
async def test_x_forwarded_for_with_malformed_header(
x_forwarded_for, aiohttp_client, caplog
):
"""Test that we get a HTTP 400 bad request with a malformed header."""
app = web.Application()
app.router.add_get("/", mock_handler)
async_setup_forwarded(app, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: x_forwarded_for})
assert resp.status == 400
assert "Invalid IP address in X-Forwarded-For" in caplog.text
async def test_x_forwarded_for_with_multiple_headers(aiohttp_client, caplog):
"""Test that we get a HTTP 400 bad request with multiple headers."""
app = web.Application()
app.router.add_get("/", mock_handler)
async_setup_forwarded(app, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/",
headers=[
(X_FORWARDED_FOR, "222.222.222.222"),
(X_FORWARDED_FOR, "123.123.123.123"),
],
)
assert resp.status == 400
assert "Too many headers for X-Forwarded-For" in caplog.text
async def test_x_forwarded_proto_without_trusted_proxy(aiohttp_client):
"""Test that proto header is ignored when untrusted."""
async def handler(request):
url = mock_api_client.make_url("/")
assert request.host == f"{url.host}:{url.port}"
assert request.scheme == "http"
assert not request.secure
assert request.remote == "127.0.0.1"
return web.Response()
app = web.Application()
app.router.add_get("/", handler)
async_setup_forwarded(app, [])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/", headers={X_FORWARDED_FOR: "255.255.255.255", X_FORWARDED_PROTO: "https"}
)
assert resp.status == 200
@pytest.mark.parametrize(
"x_forwarded_for,remote,x_forwarded_proto,secure",
[
("10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "https, http, http", True),
("10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "https,http,http", True),
("10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "http", False),
(
"10.10.10.10, 127.0.0.1, 127.0.0.2",
"10.10.10.10",
"http, https, https",
False,
),
("10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "https", True),
(
"255.255.255.255, 10.10.10.10, 127.0.0.1",
"10.10.10.10",
"http, https, http",
True,
),
(
"255.255.255.255, 10.10.10.10, 127.0.0.1",
"10.10.10.10",
"https, http, https",
False,
),
("255.255.255.255, 10.10.10.10, 127.0.0.1", "10.10.10.10", "https", True),
],
)
async def test_x_forwarded_proto_with_trusted_proxy(
x_forwarded_for, remote, x_forwarded_proto, secure, aiohttp_client
):
"""Test that we get the proto header if proxy is trusted."""
async def handler(request):
assert request.remote == remote
assert request.scheme == ("https" if secure else "http")
assert request.secure == secure
return web.Response()
app = web.Application()
app.router.add_get("/", handler)
async_setup_forwarded(app, [ip_network("127.0.0.0/24")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/",
headers={
X_FORWARDED_FOR: x_forwarded_for,
X_FORWARDED_PROTO: x_forwarded_proto,
},
)
assert resp.status == 200
async def test_x_forwarded_proto_with_trusted_proxy_multiple_for(aiohttp_client):
"""Test that we get the proto with 1 element in the proto, multiple in the for."""
async def handler(request):
url = mock_api_client.make_url("/")
assert request.host == f"{url.host}:{url.port}"
assert request.scheme == "https"
assert request.secure
assert request.remote == "255.255.255.255"
return web.Response()
app = web.Application()
app.router.add_get("/", handler)
async_setup_forwarded(app, [ip_network("127.0.0.0/24")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/",
headers={
X_FORWARDED_FOR: "255.255.255.255, 127.0.0.1, 127.0.0.2",
X_FORWARDED_PROTO: "https",
},
)
assert resp.status == 200
async def test_x_forwarded_proto_not_processed_without_for(aiohttp_client):
"""Test that proto header isn't processed without a for header."""
async def handler(request):
url = mock_api_client.make_url("/")
assert request.host == f"{url.host}:{url.port}"
assert request.scheme == "http"
assert not request.secure
assert request.remote == "127.0.0.1"
return web.Response()
app = web.Application()
app.router.add_get("/", handler)
async_setup_forwarded(app, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get("/", headers={X_FORWARDED_PROTO: "https"})
assert resp.status == 200
async def test_x_forwarded_proto_with_multiple_headers(aiohttp_client, caplog):
"""Test that we get a HTTP 400 bad request with multiple headers."""
app = web.Application()
app.router.add_get("/", mock_handler)
async_setup_forwarded(app, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/",
headers=[
(X_FORWARDED_FOR, "222.222.222.222"),
(X_FORWARDED_PROTO, "https"),
(X_FORWARDED_PROTO, "http"),
],
)
assert resp.status == 400
assert "Too many headers for X-Forward-Proto" in caplog.text
@pytest.mark.parametrize(
"x_forwarded_proto", ["", ",", "https, , https", "https, https, "],
)
async def test_x_forwarded_proto_empty_element(
x_forwarded_proto, aiohttp_client, caplog
):
"""Test that we get a HTTP 400 bad request with empty proto."""
app = web.Application()
app.router.add_get("/", mock_handler)
async_setup_forwarded(app, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/", headers={X_FORWARDED_FOR: "1.1.1.1", X_FORWARDED_PROTO: x_forwarded_proto},
)
assert resp.status == 400
assert "Empty item received in X-Forward-Proto header" in caplog.text
@pytest.mark.parametrize(
"x_forwarded_for,x_forwarded_proto,expected,got",
[
("1.1.1.1, 2.2.2.2", "https, https, https", 2, 3),
("1.1.1.1, 2.2.2.2, 3.3.3.3, 4.4.4.4", "https, https, https", 4, 3),
],
)
async def test_x_forwarded_proto_incorrect_number_of_elements(
x_forwarded_for, x_forwarded_proto, expected, got, aiohttp_client, caplog
):
"""Test that we get a HTTP 400 bad request with incorrect number of elements."""
app = web.Application()
app.router.add_get("/", mock_handler)
async_setup_forwarded(app, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/",
headers={
X_FORWARDED_FOR: x_forwarded_for,
X_FORWARDED_PROTO: x_forwarded_proto,
},
)
assert resp.status == 400
assert (
f"Incorrect number of elements in X-Forward-Proto. Expected 1 or {expected}, got {got}"
in caplog.text
)
async def test_x_forwarded_host_without_trusted_proxy(aiohttp_client):
"""Test that host header is ignored when untrusted."""
async def handler(request):
url = mock_api_client.make_url("/")
assert request.host == f"{url.host}:{url.port}"
assert request.scheme == "http"
assert not request.secure
assert request.remote == "127.0.0.1"
return web.Response()
app = web.Application()
app.router.add_get("/", handler)
async_setup_forwarded(app, [])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/",
headers={X_FORWARDED_FOR: "255.255.255.255", X_FORWARDED_HOST: "example.com"},
)
assert resp.status == 200
async def test_x_forwarded_host_with_trusted_proxy(aiohttp_client):
"""Test that we get the host header if proxy is trusted."""
async def handler(request):
assert request.host == "example.com"
assert request.scheme == "http"
assert not request.secure
assert request.remote == "255.255.255.255"
return web.Response()
app = web.Application()
app.router.add_get("/", handler)
async_setup_forwarded(app, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/",
headers={X_FORWARDED_FOR: "255.255.255.255", X_FORWARDED_HOST: "example.com"},
)
assert resp.status == 200
async def test_x_forwarded_host_not_processed_without_for(aiohttp_client):
"""Test that host header isn't processed without a for header."""
async def handler(request):
url = mock_api_client.make_url("/")
assert request.host == f"{url.host}:{url.port}"
assert request.scheme == "http"
assert not request.secure
assert request.remote == "127.0.0.1"
return web.Response()
app = web.Application()
app.router.add_get("/", handler)
async_setup_forwarded(app, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get("/", headers={X_FORWARDED_HOST: "example.com"})
assert resp.status == 200
async def test_x_forwarded_host_with_multiple_headers(aiohttp_client, caplog):
"""Test that we get a HTTP 400 bad request with multiple headers."""
app = web.Application()
app.router.add_get("/", mock_handler)
async_setup_forwarded(app, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/",
headers=[
(X_FORWARDED_FOR, "222.222.222.222"),
(X_FORWARDED_HOST, "example.com"),
(X_FORWARDED_HOST, "example.spoof"),
],
)
assert resp.status == 400
assert "Too many headers for X-Forwarded-Host" in caplog.text
async def test_x_forwarded_host_with_empty_header(aiohttp_client, caplog):
"""Test that we get a HTTP 400 bad request with empty host value."""
app = web.Application()
app.router.add_get("/", mock_handler)
async_setup_forwarded(app, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/", headers={X_FORWARDED_FOR: "222.222.222.222", X_FORWARDED_HOST: ""}
)
assert resp.status == 400
assert "Empty value received in X-Forward-Host header" in caplog.text

View file

@ -1,101 +0,0 @@
"""Test real IP middleware."""
from ipaddress import ip_network
from aiohttp import web
from aiohttp.hdrs import X_FORWARDED_FOR
from homeassistant.components.http.const import KEY_REAL_IP
from homeassistant.components.http.real_ip import setup_real_ip
async def mock_handler(request):
"""Return the real IP as text."""
return web.Response(text=str(request[KEY_REAL_IP]))
async def test_ignore_x_forwarded_for(aiohttp_client):
"""Test that we get the IP from the transport."""
app = web.Application()
app.router.add_get("/", mock_handler)
setup_real_ip(app, False, [])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: "255.255.255.255"})
assert resp.status == 200
text = await resp.text()
assert text != "255.255.255.255"
async def test_use_x_forwarded_for_without_trusted_proxy(aiohttp_client):
"""Test that we get the IP from the transport."""
app = web.Application()
app.router.add_get("/", mock_handler)
setup_real_ip(app, True, [])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: "255.255.255.255"})
assert resp.status == 200
text = await resp.text()
assert text != "255.255.255.255"
async def test_use_x_forwarded_for_with_trusted_proxy(aiohttp_client):
"""Test that we get the IP from the transport."""
app = web.Application()
app.router.add_get("/", mock_handler)
setup_real_ip(app, True, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: "255.255.255.255"})
assert resp.status == 200
text = await resp.text()
assert text == "255.255.255.255"
async def test_use_x_forwarded_for_with_untrusted_proxy(aiohttp_client):
"""Test that we get the IP from the transport."""
app = web.Application()
app.router.add_get("/", mock_handler)
setup_real_ip(app, True, [ip_network("1.1.1.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: "255.255.255.255"})
assert resp.status == 200
text = await resp.text()
assert text != "255.255.255.255"
async def test_use_x_forwarded_for_with_spoofed_header(aiohttp_client):
"""Test that we get the IP from the transport."""
app = web.Application()
app.router.add_get("/", mock_handler)
setup_real_ip(app, True, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/", headers={X_FORWARDED_FOR: "222.222.222.222, 255.255.255.255"}
)
assert resp.status == 200
text = await resp.text()
assert text == "255.255.255.255"
async def test_use_x_forwarded_for_with_nonsense_header(aiohttp_client):
"""Test that we get the IP from the transport."""
app = web.Application()
app.router.add_get("/", mock_handler)
setup_real_ip(app, True, [ip_network("127.0.0.1")])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get(
"/", headers={X_FORWARDED_FOR: "This value is invalid"}
)
assert resp.status == 200
text = await resp.text()
assert text == "127.0.0.1"