Use http.HTTPStatus in components/[gh]* (#58246)
This commit is contained in:
parent
583ae3c953
commit
b52c5c82b1
38 changed files with 272 additions and 247 deletions
|
@ -1,5 +1,6 @@
|
|||
"""The tests for the Home Assistant HTTP component."""
|
||||
from datetime import timedelta
|
||||
from http import HTTPStatus
|
||||
from ipaddress import ip_network
|
||||
from unittest.mock import patch
|
||||
|
||||
|
@ -94,10 +95,10 @@ async def test_cant_access_with_password_in_header(
|
|||
client = await aiohttp_client(app)
|
||||
|
||||
req = await client.get("/", headers={HTTP_HEADER_HA_AUTH: API_PASSWORD})
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
req = await client.get("/", headers={HTTP_HEADER_HA_AUTH: "wrong-pass"})
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
|
||||
async def test_cant_access_with_password_in_query(
|
||||
|
@ -108,13 +109,13 @@ async def test_cant_access_with_password_in_query(
|
|||
client = await aiohttp_client(app)
|
||||
|
||||
resp = await client.get("/", params={"api_password": API_PASSWORD})
|
||||
assert resp.status == 401
|
||||
assert resp.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
resp = await client.get("/")
|
||||
assert resp.status == 401
|
||||
assert resp.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
resp = await client.get("/", params={"api_password": "wrong-password"})
|
||||
assert resp.status == 401
|
||||
assert resp.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
|
||||
async def test_basic_auth_does_not_work(app, aiohttp_client, hass, legacy_auth):
|
||||
|
@ -123,16 +124,16 @@ async def test_basic_auth_does_not_work(app, aiohttp_client, hass, legacy_auth):
|
|||
client = await aiohttp_client(app)
|
||||
|
||||
req = await client.get("/", auth=BasicAuth("homeassistant", API_PASSWORD))
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
req = await client.get("/", auth=BasicAuth("wrong_username", API_PASSWORD))
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
req = await client.get("/", auth=BasicAuth("homeassistant", "wrong password"))
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
req = await client.get("/", headers={"authorization": "NotBasic abcdefg"})
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
|
||||
async def test_cannot_access_with_trusted_ip(
|
||||
|
@ -147,12 +148,16 @@ async def test_cannot_access_with_trusted_ip(
|
|||
for remote_addr in UNTRUSTED_ADDRESSES:
|
||||
set_mock_ip(remote_addr)
|
||||
resp = await client.get("/")
|
||||
assert resp.status == 401, f"{remote_addr} shouldn't be trusted"
|
||||
assert (
|
||||
resp.status == HTTPStatus.UNAUTHORIZED
|
||||
), f"{remote_addr} shouldn't be trusted"
|
||||
|
||||
for remote_addr in TRUSTED_ADDRESSES:
|
||||
set_mock_ip(remote_addr)
|
||||
resp = await client.get("/")
|
||||
assert resp.status == 401, f"{remote_addr} shouldn't be trusted"
|
||||
assert (
|
||||
resp.status == HTTPStatus.UNAUTHORIZED
|
||||
), f"{remote_addr} shouldn't be trusted"
|
||||
|
||||
|
||||
async def test_auth_active_access_with_access_token_in_header(
|
||||
|
@ -165,27 +170,27 @@ async def test_auth_active_access_with_access_token_in_header(
|
|||
refresh_token = await hass.auth.async_validate_access_token(hass_access_token)
|
||||
|
||||
req = await client.get("/", headers={"Authorization": f"Bearer {token}"})
|
||||
assert req.status == 200
|
||||
assert req.status == HTTPStatus.OK
|
||||
assert await req.json() == {"user_id": refresh_token.user.id}
|
||||
|
||||
req = await client.get("/", headers={"AUTHORIZATION": f"Bearer {token}"})
|
||||
assert req.status == 200
|
||||
assert req.status == HTTPStatus.OK
|
||||
assert await req.json() == {"user_id": refresh_token.user.id}
|
||||
|
||||
req = await client.get("/", headers={"authorization": f"Bearer {token}"})
|
||||
assert req.status == 200
|
||||
assert req.status == HTTPStatus.OK
|
||||
assert await req.json() == {"user_id": refresh_token.user.id}
|
||||
|
||||
req = await client.get("/", headers={"Authorization": token})
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
req = await client.get("/", headers={"Authorization": f"BEARER {token}"})
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
refresh_token = await hass.auth.async_validate_access_token(hass_access_token)
|
||||
refresh_token.user.is_active = False
|
||||
req = await client.get("/", headers={"Authorization": f"Bearer {token}"})
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
|
||||
async def test_auth_active_access_with_trusted_ip(
|
||||
|
@ -200,12 +205,16 @@ async def test_auth_active_access_with_trusted_ip(
|
|||
for remote_addr in UNTRUSTED_ADDRESSES:
|
||||
set_mock_ip(remote_addr)
|
||||
resp = await client.get("/")
|
||||
assert resp.status == 401, f"{remote_addr} shouldn't be trusted"
|
||||
assert (
|
||||
resp.status == HTTPStatus.UNAUTHORIZED
|
||||
), f"{remote_addr} shouldn't be trusted"
|
||||
|
||||
for remote_addr in TRUSTED_ADDRESSES:
|
||||
set_mock_ip(remote_addr)
|
||||
resp = await client.get("/")
|
||||
assert resp.status == 401, f"{remote_addr} shouldn't be trusted"
|
||||
assert (
|
||||
resp.status == HTTPStatus.UNAUTHORIZED
|
||||
), f"{remote_addr} shouldn't be trusted"
|
||||
|
||||
|
||||
async def test_auth_legacy_support_api_password_cannot_access(
|
||||
|
@ -216,13 +225,13 @@ async def test_auth_legacy_support_api_password_cannot_access(
|
|||
client = await aiohttp_client(app)
|
||||
|
||||
req = await client.get("/", headers={HTTP_HEADER_HA_AUTH: API_PASSWORD})
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
resp = await client.get("/", params={"api_password": API_PASSWORD})
|
||||
assert resp.status == 401
|
||||
assert resp.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
req = await client.get("/", auth=BasicAuth("homeassistant", API_PASSWORD))
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
|
||||
async def test_auth_access_signed_path(hass, app, aiohttp_client, hass_access_token):
|
||||
|
@ -237,17 +246,17 @@ async def test_auth_access_signed_path(hass, app, aiohttp_client, hass_access_to
|
|||
signed_path = async_sign_path(hass, refresh_token.id, "/", timedelta(seconds=5))
|
||||
|
||||
req = await client.get(signed_path)
|
||||
assert req.status == 200
|
||||
assert req.status == HTTPStatus.OK
|
||||
data = await req.json()
|
||||
assert data["user_id"] == refresh_token.user.id
|
||||
|
||||
# Use signature on other path
|
||||
req = await client.get("/another_path?{}".format(signed_path.split("?")[1]))
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
# We only allow GET
|
||||
req = await client.post(signed_path)
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
# Never valid as expired in the past.
|
||||
expired_signed_path = async_sign_path(
|
||||
|
@ -255,9 +264,9 @@ async def test_auth_access_signed_path(hass, app, aiohttp_client, hass_access_to
|
|||
)
|
||||
|
||||
req = await client.get(expired_signed_path)
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
# refresh token gone should also invalidate signature
|
||||
await hass.auth.async_remove_refresh_token(refresh_token)
|
||||
req = await client.get(signed_path)
|
||||
assert req.status == 401
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue