Use HTTPStatus instead of HTTP_ consts and magic values in components/a* (#57988)

This commit is contained in:
Ville Skyttä 2021-10-22 15:21:34 +03:00 committed by GitHub
parent 55ffc85a0a
commit a598d9f353
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 145 additions and 147 deletions

View file

@ -1,4 +1,6 @@
"""Config flow for the Abode Security System component."""
from http import HTTPStatus
from abodepy import Abode
from abodepy.exceptions import AbodeAuthenticationException, AbodeException
from abodepy.helpers.errors import MFA_CODE_REQUIRED
@ -6,7 +8,7 @@ from requests.exceptions import ConnectTimeout, HTTPError
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME, HTTP_BAD_REQUEST
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from .const import DEFAULT_CACHEDB, DOMAIN, LOGGER
@ -51,7 +53,7 @@ class AbodeFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
LOGGER.error("Unable to connect to Abode: %s", ex)
if ex.errcode == HTTP_BAD_REQUEST:
if ex.errcode == HTTPStatus.BAD_REQUEST:
errors = {"base": "invalid_auth"}
else:

View file

@ -1,6 +1,7 @@
"""Support for non-delivered packages recorded in AfterShip."""
from __future__ import annotations
from http import HTTPStatus
import logging
from typing import Any, Final
@ -11,7 +12,7 @@ from homeassistant.components.sensor import (
PLATFORM_SCHEMA as BASE_PLATFORM_SCHEMA,
SensorEntity,
)
from homeassistant.const import CONF_API_KEY, CONF_NAME, HTTP_OK
from homeassistant.const import CONF_API_KEY, CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
@ -64,7 +65,7 @@ async def async_setup_platform(
await aftership.get_trackings()
if not aftership.meta or aftership.meta["code"] != HTTP_OK:
if not aftership.meta or aftership.meta["code"] != HTTPStatus.OK:
_LOGGER.error(
"No tracking data found. Check API key is correct: %s", aftership.meta
)
@ -151,7 +152,7 @@ class AfterShipSensor(SensorEntity):
if not self.aftership.meta:
_LOGGER.error("Unknown errors when querying")
return
if self.aftership.meta["code"] != HTTP_OK:
if self.aftership.meta["code"] != HTTPStatus.OK:
_LOGGER.error(
"Errors when querying AfterShip. %s", str(self.aftership.meta)
)

View file

@ -1,6 +1,7 @@
"""Adds config flow for Airly."""
from __future__ import annotations
from http import HTTPStatus
from typing import Any
from aiohttp import ClientSession
@ -10,14 +11,7 @@ import async_timeout
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import (
CONF_API_KEY,
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_NAME,
HTTP_NOT_FOUND,
HTTP_UNAUTHORIZED,
)
from homeassistant.const import CONF_API_KEY, CONF_LATITUDE, CONF_LONGITUDE, CONF_NAME
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
@ -60,9 +54,9 @@ class AirlyFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
use_nearest=True,
)
except AirlyError as err:
if err.status_code == HTTP_UNAUTHORIZED:
if err.status_code == HTTPStatus.UNAUTHORIZED:
errors["base"] = "invalid_api_key"
if err.status_code == HTTP_NOT_FOUND:
if err.status_code == HTTPStatus.NOT_FOUND:
errors["base"] = "wrong_location"
else:
if not location_point_valid:

View file

@ -1,13 +1,14 @@
"""Support for Alexa skill auth."""
import asyncio
from datetime import timedelta
from http import HTTPStatus
import json
import logging
import aiohttp
import async_timeout
from homeassistant.const import CONF_CLIENT_ID, CONF_CLIENT_SECRET, HTTP_OK
from homeassistant.const import CONF_CLIENT_ID, CONF_CLIENT_SECRET
from homeassistant.core import callback
from homeassistant.helpers import aiohttp_client
from homeassistant.util import dt
@ -119,7 +120,7 @@ class Auth:
_LOGGER.debug("LWA response header: %s", response.headers)
_LOGGER.debug("LWA response status: %s", response.status)
if response.status != HTTP_OK:
if response.status != HTTPStatus.OK:
_LOGGER.error("Error calling LWA to get auth token")
return None

View file

@ -1,11 +1,12 @@
"""Support for Alexa skill service end point."""
import copy
import hmac
from http import HTTPStatus
import logging
import uuid
from homeassistant.components import http
from homeassistant.const import CONF_PASSWORD, HTTP_NOT_FOUND, HTTP_UNAUTHORIZED
from homeassistant.const import CONF_PASSWORD
from homeassistant.core import callback
from homeassistant.helpers import template
import homeassistant.util.dt as dt_util
@ -58,7 +59,7 @@ class AlexaFlashBriefingView(http.HomeAssistantView):
if request.query.get(API_PASSWORD) is None:
err = "No password provided for Alexa flash briefing: %s"
_LOGGER.error(err, briefing_id)
return b"", HTTP_UNAUTHORIZED
return b"", HTTPStatus.UNAUTHORIZED
if not hmac.compare_digest(
request.query[API_PASSWORD].encode("utf-8"),
@ -66,12 +67,12 @@ class AlexaFlashBriefingView(http.HomeAssistantView):
):
err = "Wrong password for Alexa flash briefing: %s"
_LOGGER.error(err, briefing_id)
return b"", HTTP_UNAUTHORIZED
return b"", HTTPStatus.UNAUTHORIZED
if not isinstance(self.flash_briefings.get(briefing_id), list):
err = "No configured Alexa flash briefing was found for: %s"
_LOGGER.error(err, briefing_id)
return b"", HTTP_NOT_FOUND
return b"", HTTPStatus.NOT_FOUND
briefing = []

View file

@ -2,13 +2,14 @@
from __future__ import annotations
import asyncio
from http import HTTPStatus
import json
import logging
import aiohttp
import async_timeout
from homeassistant.const import HTTP_ACCEPTED, MATCH_ALL, STATE_ON
from homeassistant.const import MATCH_ALL, STATE_ON
from homeassistant.core import HomeAssistant, State, callback
from homeassistant.helpers.significant_change import create_checker
import homeassistant.util.dt as dt_util
@ -148,7 +149,7 @@ async def async_send_changereport_message(
_LOGGER.debug("Sent: %s", json.dumps(message_serialized))
_LOGGER.debug("Received (%s): %s", response.status, response_text)
if response.status == HTTP_ACCEPTED:
if response.status == HTTPStatus.ACCEPTED:
return
response_json = json.loads(response_text)
@ -279,7 +280,7 @@ async def async_send_doorbell_event_message(hass, config, alexa_entity):
_LOGGER.debug("Sent: %s", json.dumps(message_serialized))
_LOGGER.debug("Received (%s): %s", response.status, response_text)
if response.status == HTTP_ACCEPTED:
if response.status == HTTPStatus.ACCEPTED:
return
response_json = json.loads(response_text)

View file

@ -1,5 +1,6 @@
"""Support for an exposed aREST RESTful API of a device."""
from datetime import timedelta
from http import HTTPStatus
import logging
import requests
@ -10,13 +11,7 @@ from homeassistant.components.binary_sensor import (
PLATFORM_SCHEMA,
BinarySensorEntity,
)
from homeassistant.const import (
CONF_DEVICE_CLASS,
CONF_NAME,
CONF_PIN,
CONF_RESOURCE,
HTTP_OK,
)
from homeassistant.const import CONF_DEVICE_CLASS, CONF_NAME, CONF_PIN, CONF_RESOURCE
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
@ -78,7 +73,7 @@ class ArestBinarySensor(BinarySensorEntity):
if pin is not None:
request = requests.get(f"{resource}/mode/{pin}/i", timeout=10)
if request.status_code != HTTP_OK:
if request.status_code != HTTPStatus.OK:
_LOGGER.error("Can't set mode of %s", resource)
def update(self):

View file

@ -1,5 +1,6 @@
"""Support for an exposed aREST RESTful API of a device."""
from datetime import timedelta
from http import HTTPStatus
import logging
import requests
@ -12,7 +13,6 @@ from homeassistant.const import (
CONF_RESOURCE,
CONF_UNIT_OF_MEASUREMENT,
CONF_VALUE_TEMPLATE,
HTTP_OK,
)
from homeassistant.exceptions import TemplateError
import homeassistant.helpers.config_validation as cv
@ -146,7 +146,7 @@ class ArestSensor(SensorEntity):
if pin is not None:
request = requests.get(f"{resource}/mode/{pin}/i", timeout=10)
if request.status_code != HTTP_OK:
if request.status_code != HTTPStatus.OK:
_LOGGER.error("Can't set mode of %s", resource)
def update(self):

View file

@ -1,12 +1,13 @@
"""Support for an exposed aREST RESTful API of a device."""
from http import HTTPStatus
import logging
import requests
import voluptuous as vol
from homeassistant.components.switch import PLATFORM_SCHEMA, SwitchEntity
from homeassistant.const import CONF_NAME, CONF_RESOURCE, HTTP_OK
from homeassistant.const import CONF_NAME, CONF_RESOURCE
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
@ -101,7 +102,7 @@ class ArestSwitchFunction(ArestSwitchBase):
request = requests.get(f"{self._resource}/{self._func}", timeout=10)
if request.status_code != HTTP_OK:
if request.status_code != HTTPStatus.OK:
_LOGGER.error("Can't find function")
return
@ -118,7 +119,7 @@ class ArestSwitchFunction(ArestSwitchBase):
f"{self._resource}/{self._func}", timeout=10, params={"params": "1"}
)
if request.status_code == HTTP_OK:
if request.status_code == HTTPStatus.OK:
self._attr_is_on = True
else:
_LOGGER.error("Can't turn on function %s at %s", self._func, self._resource)
@ -129,7 +130,7 @@ class ArestSwitchFunction(ArestSwitchBase):
f"{self._resource}/{self._func}", timeout=10, params={"params": "0"}
)
if request.status_code == HTTP_OK:
if request.status_code == HTTPStatus.OK:
self._attr_is_on = False
else:
_LOGGER.error(
@ -157,7 +158,7 @@ class ArestSwitchPin(ArestSwitchBase):
self.invert = invert
request = requests.get(f"{resource}/mode/{pin}/o", timeout=10)
if request.status_code != HTTP_OK:
if request.status_code != HTTPStatus.OK:
_LOGGER.error("Can't set mode")
self._attr_available = False
@ -167,7 +168,7 @@ class ArestSwitchPin(ArestSwitchBase):
request = requests.get(
f"{self._resource}/digital/{self._pin}/{turn_on_payload}", timeout=10
)
if request.status_code == HTTP_OK:
if request.status_code == HTTPStatus.OK:
self._attr_is_on = True
else:
_LOGGER.error("Can't turn on pin %s at %s", self._pin, self._resource)
@ -178,7 +179,7 @@ class ArestSwitchPin(ArestSwitchBase):
request = requests.get(
f"{self._resource}/digital/{self._pin}/{turn_off_payload}", timeout=10
)
if request.status_code == HTTP_OK:
if request.status_code == HTTPStatus.OK:
self._attr_is_on = False
else:
_LOGGER.error("Can't turn off pin %s at %s", self._pin, self._resource)

View file

@ -1,6 +1,7 @@
"""Handle August connection setup and authentication."""
import asyncio
from http import HTTPStatus
import logging
import os
@ -8,12 +9,7 @@ from aiohttp import ClientError, ClientResponseError
from yalexs.api_async import ApiAsync
from yalexs.authenticator_async import AuthenticationState, AuthenticatorAsync
from homeassistant.const import (
CONF_PASSWORD,
CONF_TIMEOUT,
CONF_USERNAME,
HTTP_UNAUTHORIZED,
)
from homeassistant.const import CONF_PASSWORD, CONF_TIMEOUT, CONF_USERNAME
from homeassistant.helpers import aiohttp_client
from .const import (
@ -97,7 +93,7 @@ class AugustGateway:
# by have no access
await self.api.async_get_operable_locks(self.access_token)
except ClientResponseError as ex:
if ex.status == HTTP_UNAUTHORIZED:
if ex.status == HTTPStatus.UNAUTHORIZED:
raise InvalidAuth from ex
raise CannotConnect from ex

View file

@ -1,4 +1,5 @@
"""Tests for the Abode config flow."""
from http import HTTPStatus
from unittest.mock import patch
from abodepy.exceptions import AbodeAuthenticationException
@ -8,12 +9,7 @@ from homeassistant import data_entry_flow
from homeassistant.components.abode import config_flow
from homeassistant.components.abode.const import DOMAIN
from homeassistant.config_entries import SOURCE_REAUTH, SOURCE_USER
from homeassistant.const import (
CONF_PASSWORD,
CONF_USERNAME,
HTTP_BAD_REQUEST,
HTTP_INTERNAL_SERVER_ERROR,
)
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from tests.common import MockConfigEntry
@ -56,7 +52,9 @@ async def test_invalid_credentials(hass):
with patch(
"homeassistant.components.abode.config_flow.Abode",
side_effect=AbodeAuthenticationException((HTTP_BAD_REQUEST, "auth error")),
side_effect=AbodeAuthenticationException(
(HTTPStatus.BAD_REQUEST, "auth error")
),
):
result = await flow.async_step_user(user_input=conf)
assert result["errors"] == {"base": "invalid_auth"}
@ -72,7 +70,7 @@ async def test_connection_error(hass):
with patch(
"homeassistant.components.abode.config_flow.Abode",
side_effect=AbodeAuthenticationException(
(HTTP_INTERNAL_SERVER_ERROR, "connection error")
(HTTPStatus.INTERNAL_SERVER_ERROR, "connection error")
),
):
result = await flow.async_step_user(user_input=conf)
@ -117,7 +115,9 @@ async def test_step_mfa(hass):
with patch(
"homeassistant.components.abode.config_flow.Abode",
side_effect=AbodeAuthenticationException((HTTP_BAD_REQUEST, "invalid mfa")),
side_effect=AbodeAuthenticationException(
(HTTPStatus.BAD_REQUEST, "invalid mfa")
),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={"mfa_code": "123456"}

View file

@ -1,4 +1,5 @@
"""Tests for the Abode module."""
from http import HTTPStatus
from unittest.mock import patch
from abodepy.exceptions import AbodeAuthenticationException, AbodeException
@ -12,7 +13,7 @@ from homeassistant.components.abode import (
)
from homeassistant.components.alarm_control_panel import DOMAIN as ALARM_DOMAIN
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_USERNAME, HTTP_BAD_REQUEST
from homeassistant.const import CONF_USERNAME
from .common import setup_platform
@ -68,7 +69,9 @@ async def test_invalid_credentials(hass):
"""Test Abode credentials changing."""
with patch(
"homeassistant.components.abode.Abode",
side_effect=AbodeAuthenticationException((HTTP_BAD_REQUEST, "auth error")),
side_effect=AbodeAuthenticationException(
(HTTPStatus.BAD_REQUEST, "auth error")
),
), patch(
"homeassistant.components.abode.config_flow.AbodeFlowHandler.async_step_reauth",
return_value={"type": data_entry_flow.RESULT_TYPE_FORM},

View file

@ -1,17 +1,12 @@
"""Define tests for the Airly config flow."""
from http import HTTPStatus
from airly.exceptions import AirlyError
from homeassistant import data_entry_flow
from homeassistant.components.airly.const import CONF_USE_NEAREST, DOMAIN
from homeassistant.config_entries import SOURCE_USER
from homeassistant.const import (
CONF_API_KEY,
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_NAME,
HTTP_NOT_FOUND,
HTTP_UNAUTHORIZED,
)
from homeassistant.const import CONF_API_KEY, CONF_LATITUDE, CONF_LONGITUDE, CONF_NAME
from . import API_NEAREST_URL, API_POINT_URL
@ -40,7 +35,7 @@ async def test_invalid_api_key(hass, aioclient_mock):
aioclient_mock.get(
API_POINT_URL,
exc=AirlyError(
HTTP_UNAUTHORIZED, {"message": "Invalid authentication credentials"}
HTTPStatus.UNAUTHORIZED, {"message": "Invalid authentication credentials"}
),
)
@ -57,7 +52,7 @@ async def test_invalid_location(hass, aioclient_mock):
aioclient_mock.get(
API_NEAREST_URL,
exc=AirlyError(HTTP_NOT_FOUND, {"message": "Installation was not found"}),
exc=AirlyError(HTTPStatus.NOT_FOUND, {"message": "Installation was not found"}),
)
result = await hass.config_entries.flow.async_init(

View file

@ -1,12 +1,12 @@
"""The tests for the Alexa component."""
# pylint: disable=protected-access
import datetime
from http import HTTPStatus
import pytest
from homeassistant.components import alexa
from homeassistant.components.alexa import const
from homeassistant.const import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED
from homeassistant.core import callback
from homeassistant.setup import async_setup_component
@ -74,7 +74,7 @@ def _flash_briefing_req(client, briefing_id, password="pass%2Fabc"):
async def test_flash_briefing_invalid_id(alexa_client):
"""Test an invalid Flash Briefing ID."""
req = await _flash_briefing_req(alexa_client, 10000)
assert req.status == HTTP_NOT_FOUND
assert req.status == HTTPStatus.NOT_FOUND
text = await req.text()
assert text == ""
@ -82,7 +82,7 @@ async def test_flash_briefing_invalid_id(alexa_client):
async def test_flash_briefing_no_password(alexa_client):
"""Test for no Flash Briefing password."""
req = await _flash_briefing_req(alexa_client, "weather", password=None)
assert req.status == HTTP_UNAUTHORIZED
assert req.status == HTTPStatus.UNAUTHORIZED
text = await req.text()
assert text == ""
@ -90,7 +90,7 @@ async def test_flash_briefing_no_password(alexa_client):
async def test_flash_briefing_invalid_password(alexa_client):
"""Test an invalid Flash Briefing password."""
req = await _flash_briefing_req(alexa_client, "weather", password="wrongpass")
assert req.status == HTTP_UNAUTHORIZED
assert req.status == HTTPStatus.UNAUTHORIZED
text = await req.text()
assert text == ""
@ -98,7 +98,7 @@ async def test_flash_briefing_invalid_password(alexa_client):
async def test_flash_briefing_request_for_password(alexa_client):
"""Test for "password" Flash Briefing."""
req = await _flash_briefing_req(alexa_client, "password")
assert req.status == HTTP_NOT_FOUND
assert req.status == HTTPStatus.NOT_FOUND
text = await req.text()
assert text == ""
@ -106,7 +106,7 @@ async def test_flash_briefing_request_for_password(alexa_client):
async def test_flash_briefing_date_from_str(alexa_client):
"""Test the response has a valid date parsed from string."""
req = await _flash_briefing_req(alexa_client, "weather")
assert req.status == 200
assert req.status == HTTPStatus.OK
data = await req.json()
assert isinstance(
datetime.datetime.strptime(
@ -130,7 +130,7 @@ async def test_flash_briefing_valid(alexa_client):
]
req = await _flash_briefing_req(alexa_client, "news_audio")
assert req.status == 200
assert req.status == HTTPStatus.OK
json = await req.json()
assert isinstance(
datetime.datetime.strptime(

View file

@ -1,5 +1,6 @@
"""The tests for the Alexa component."""
# pylint: disable=protected-access
from http import HTTPStatus
import json
import pytest
@ -134,7 +135,7 @@ async def test_intent_launch_request(alexa_client):
},
}
req = await _intent_req(alexa_client, data)
assert req.status == 200
assert req.status == HTTPStatus.OK
data = await req.json()
text = data.get("response", {}).get("outputSpeech", {}).get("text")
assert text == "LaunchRequest has been received."
@ -160,7 +161,7 @@ async def test_intent_launch_request_not_configured(alexa_client):
},
}
req = await _intent_req(alexa_client, data)
assert req.status == 200
assert req.status == HTTPStatus.OK
data = await req.json()
text = data.get("response", {}).get("outputSpeech", {}).get("text")
assert text == "This intent is not yet configured within Home Assistant."
@ -194,7 +195,7 @@ async def test_intent_request_with_slots(alexa_client):
},
}
req = await _intent_req(alexa_client, data)
assert req.status == 200
assert req.status == HTTPStatus.OK
data = await req.json()
text = data.get("response", {}).get("outputSpeech", {}).get("text")
assert text == "You told us your sign is virgo."
@ -247,7 +248,7 @@ async def test_intent_request_with_slots_and_synonym_resolution(alexa_client):
},
}
req = await _intent_req(alexa_client, data)
assert req.status == 200
assert req.status == HTTPStatus.OK
data = await req.json()
text = data.get("response", {}).get("outputSpeech", {}).get("text")
assert text == "You told us your sign is Virgo."
@ -300,7 +301,7 @@ async def test_intent_request_with_slots_and_multi_synonym_resolution(alexa_clie
},
}
req = await _intent_req(alexa_client, data)
assert req.status == 200
assert req.status == HTTPStatus.OK
data = await req.json()
text = data.get("response", {}).get("outputSpeech", {}).get("text")
assert text == "You told us your sign is V zodiac."
@ -334,7 +335,7 @@ async def test_intent_request_with_slots_but_no_value(alexa_client):
},
}
req = await _intent_req(alexa_client, data)
assert req.status == 200
assert req.status == HTTPStatus.OK
data = await req.json()
text = data.get("response", {}).get("outputSpeech", {}).get("text")
assert text == "You told us your sign is ."
@ -365,7 +366,7 @@ async def test_intent_request_without_slots(hass, alexa_client):
},
}
req = await _intent_req(alexa_client, data)
assert req.status == 200
assert req.status == HTTPStatus.OK
json = await req.json()
text = json.get("response", {}).get("outputSpeech", {}).get("text")
@ -375,7 +376,7 @@ async def test_intent_request_without_slots(hass, alexa_client):
hass.states.async_set("device_tracker.anne_therese", "home")
req = await _intent_req(alexa_client, data)
assert req.status == 200
assert req.status == HTTPStatus.OK
json = await req.json()
text = json.get("response", {}).get("outputSpeech", {}).get("text")
assert text == "You are both home, you silly"
@ -404,7 +405,7 @@ async def test_intent_request_calling_service(alexa_client):
}
call_count = len(calls)
req = await _intent_req(alexa_client, data)
assert req.status == 200
assert req.status == HTTPStatus.OK
assert call_count + 1 == len(calls)
call = calls[-1]
assert call.domain == "test"
@ -445,7 +446,7 @@ async def test_intent_session_ended_request(alexa_client):
}
req = await _intent_req(alexa_client, data)
assert req.status == 200
assert req.status == HTTPStatus.OK
text = await req.text()
assert text == ""
@ -482,7 +483,7 @@ async def test_intent_from_built_in_intent_library(alexa_client):
},
}
req = await _intent_req(alexa_client, data)
assert req.status == 200
assert req.status == HTTPStatus.OK
data = await req.json()
text = data.get("response", {}).get("outputSpeech", {}).get("text")
assert text == "Playing the shins."

View file

@ -1,8 +1,9 @@
"""Test Smart Home HTTP endpoints."""
from http import HTTPStatus
import json
from homeassistant.components.alexa import DOMAIN, smart_home_http
from homeassistant.const import CONTENT_TYPE_JSON, HTTP_NOT_FOUND
from homeassistant.const import CONTENT_TYPE_JSON
from homeassistant.setup import async_setup_component
from . import get_new_request
@ -39,4 +40,4 @@ async def test_http_api_disabled(hass, hass_client):
config = {"alexa": {}}
response = await do_http_discovery(config, hass, hass_client)
assert response.status == HTTP_NOT_FOUND
assert response.status == HTTPStatus.NOT_FOUND

View file

@ -1,5 +1,6 @@
"""Test the Almond config flow."""
import asyncio
from http import HTTPStatus
from unittest.mock import patch
from homeassistant import config_entries, data_entry_flow, setup
@ -129,7 +130,7 @@ async def test_full_flow(
client = await hass_client_no_auth()
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert resp.headers["content-type"] == "text/html; charset=utf-8"
aioclient_mock.post(

View file

@ -1,5 +1,6 @@
"""The tests for the Home Assistant API component."""
# pylint: disable=protected-access
from http import HTTPStatus
import json
from unittest.mock import patch
@ -26,7 +27,7 @@ async def test_api_list_state_entities(hass, mock_api_client):
"""Test if the debug interface allows us to list state entities."""
hass.states.async_set("test.entity", "hello")
resp = await mock_api_client.get(const.URL_API_STATES)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
json = await resp.json()
remote_data = [ha.State.from_dict(item) for item in json]
@ -37,7 +38,7 @@ async def test_api_get_state(hass, mock_api_client):
"""Test if the debug interface allows us to get a state."""
hass.states.async_set("hello.world", "nice", {"attr": 1})
resp = await mock_api_client.get("/api/states/hello.world")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
json = await resp.json()
data = ha.State.from_dict(json)
@ -52,7 +53,7 @@ async def test_api_get_state(hass, mock_api_client):
async def test_api_get_non_existing_state(hass, mock_api_client):
"""Test if the debug interface allows us to get a state."""
resp = await mock_api_client.get("/api/states/does_not_exist")
assert resp.status == const.HTTP_NOT_FOUND
assert resp.status == HTTPStatus.NOT_FOUND
async def test_api_state_change(hass, mock_api_client):
@ -75,7 +76,7 @@ async def test_api_state_change_of_non_existing_entity(hass, mock_api_client):
"/api/states/test_entity.that_does_not_exist", json={"state": new_state}
)
assert resp.status == 201
assert resp.status == HTTPStatus.CREATED
assert hass.states.get("test_entity.that_does_not_exist").state == new_state
@ -87,7 +88,7 @@ async def test_api_state_change_with_bad_data(hass, mock_api_client):
"/api/states/test_entity.that_does_not_exist", json={}
)
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
# pylint: disable=invalid-name
@ -97,13 +98,13 @@ async def test_api_state_change_to_zero_value(hass, mock_api_client):
"/api/states/test_entity.with_zero_state", json={"state": 0}
)
assert resp.status == 201
assert resp.status == HTTPStatus.CREATED
resp = await mock_api_client.post(
"/api/states/test_entity.with_zero_state", json={"state": 0.0}
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
# pylint: disable=invalid-name
@ -190,7 +191,7 @@ async def test_api_fire_event_with_invalid_json(hass, mock_api_client):
await hass.async_block_till_done()
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
assert len(test_value) == 0
# Try now with valid but unusable JSON
@ -200,7 +201,7 @@ async def test_api_fire_event_with_invalid_json(hass, mock_api_client):
await hass.async_block_till_done()
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
assert len(test_value) == 0
@ -319,7 +320,7 @@ async def test_api_template_error(hass, mock_api_client):
const.URL_API_TEMPLATE, json={"template": "{{ states.sensor.temperature.state"}
)
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
async def test_stream(hass, mock_api_client):
@ -327,7 +328,7 @@ async def test_stream(hass, mock_api_client):
listen_count = _listen_count(hass)
resp = await mock_api_client.get(const.URL_API_STREAM)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert listen_count + 1 == _listen_count(hass)
hass.bus.async_fire("test_event")
@ -344,7 +345,7 @@ async def test_stream_with_restricted(hass, mock_api_client):
resp = await mock_api_client.get(
f"{const.URL_API_STREAM}?restrict=test_event1,test_event3"
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert listen_count + 1 == _listen_count(hass)
hass.bus.async_fire("test_event1")
@ -392,7 +393,7 @@ async def test_api_error_log(
resp = await client.get(const.URL_API_ERROR_LOG)
# Verify auth required
assert resp.status == 401
assert resp.status == HTTPStatus.UNAUTHORIZED
with patch(
"aiohttp.web.FileResponse", return_value=web.Response(text="Hello")
@ -404,7 +405,7 @@ async def test_api_error_log(
assert len(mock_file.mock_calls) == 1
assert mock_file.mock_calls[0][1][0] == hass.data[DATA_LOGGING]
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert await resp.text() == "Hello"
# Verify we require admin user
@ -413,7 +414,7 @@ async def test_api_error_log(
const.URL_API_ERROR_LOG,
headers={"Authorization": f"Bearer {hass_access_token}"},
)
assert resp.status == 401
assert resp.status == HTTPStatus.UNAUTHORIZED
async def test_api_fire_event_context(hass, mock_api_client, hass_access_token):
@ -473,7 +474,7 @@ async def test_event_stream_requires_admin(hass, mock_api_client, hass_admin_use
"""Test user needs to be admin to access event stream."""
hass_admin_user.groups = []
resp = await mock_api_client.get("/api/stream")
assert resp.status == 401
assert resp.status == HTTPStatus.UNAUTHORIZED
async def test_states_view_filters(hass, mock_api_client, hass_admin_user):
@ -482,7 +483,7 @@ async def test_states_view_filters(hass, mock_api_client, hass_admin_user):
hass.states.async_set("test.entity", "hello")
hass.states.async_set("test.not_visible_entity", "invisible")
resp = await mock_api_client.get(const.URL_API_STATES)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
json = await resp.json()
assert len(json) == 1
assert json[0]["entity_id"] == "test.entity"
@ -492,35 +493,35 @@ async def test_get_entity_state_read_perm(hass, mock_api_client, hass_admin_user
"""Test getting a state requires read permission."""
hass_admin_user.mock_policy({})
resp = await mock_api_client.get("/api/states/light.test")
assert resp.status == 401
assert resp.status == HTTPStatus.UNAUTHORIZED
async def test_post_entity_state_admin(hass, mock_api_client, hass_admin_user):
"""Test updating state requires admin."""
hass_admin_user.groups = []
resp = await mock_api_client.post("/api/states/light.test")
assert resp.status == 401
assert resp.status == HTTPStatus.UNAUTHORIZED
async def test_delete_entity_state_admin(hass, mock_api_client, hass_admin_user):
"""Test deleting entity requires admin."""
hass_admin_user.groups = []
resp = await mock_api_client.delete("/api/states/light.test")
assert resp.status == 401
assert resp.status == HTTPStatus.UNAUTHORIZED
async def test_post_event_admin(hass, mock_api_client, hass_admin_user):
"""Test sending event requires admin."""
hass_admin_user.groups = []
resp = await mock_api_client.post("/api/events/state_changed")
assert resp.status == 401
assert resp.status == HTTPStatus.UNAUTHORIZED
async def test_rendering_template_admin(hass, mock_api_client, hass_admin_user):
"""Test rendering a template requires admin."""
hass_admin_user.groups = []
resp = await mock_api_client.post(const.URL_API_TEMPLATE)
assert resp.status == 401
assert resp.status == HTTPStatus.UNAUTHORIZED
async def test_rendering_template_legacy_user(
@ -533,13 +534,13 @@ async def test_rendering_template_legacy_user(
const.URL_API_TEMPLATE,
json={"template": "{{ states.sensor.temperature.state }}"},
)
assert resp.status == 401
assert resp.status == HTTPStatus.UNAUTHORIZED
async def test_api_call_service_not_found(hass, mock_api_client):
"""Test if the API fails 400 if unknown service."""
resp = await mock_api_client.post("/api/services/test_domain/test_service")
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
async def test_api_call_service_bad_data(hass, mock_api_client):
@ -558,7 +559,7 @@ async def test_api_call_service_bad_data(hass, mock_api_client):
resp = await mock_api_client.post(
"/api/services/test_domain/test_service", json={"hello": 5}
)
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
async def test_api_get_discovery_info(hass, mock_api_client):

View file

@ -1,5 +1,6 @@
"""The camera tests for the august platform."""
from http import HTTPStatus
from unittest.mock import patch
from homeassistant.const import STATE_IDLE
@ -30,6 +31,6 @@ async def test_create_doorbell(hass, hass_client_no_auth):
client = await hass_client_no_auth()
resp = await client.get(url)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
body = await resp.text()
assert body == "image"

View file

@ -1,5 +1,6 @@
"""Integration tests for the auth component."""
from datetime import timedelta
from http import HTTPStatus
from unittest.mock import patch
from homeassistant.auth import InvalidAuthError
@ -43,7 +44,7 @@ async def test_login_new_user_and_trying_refresh_token(hass, aiohttp_client):
"redirect_uri": CLIENT_REDIRECT_URI,
},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
step = await resp.json()
resp = await client.post(
@ -51,7 +52,7 @@ async def test_login_new_user_and_trying_refresh_token(hass, aiohttp_client):
json={"client_id": CLIENT_ID, "username": "test-user", "password": "test-pass"},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
step = await resp.json()
code = step["result"]
@ -61,7 +62,7 @@ async def test_login_new_user_and_trying_refresh_token(hass, aiohttp_client):
data={"client_id": CLIENT_ID, "grant_type": "authorization_code", "code": code},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
tokens = await resp.json()
assert (
@ -78,7 +79,7 @@ async def test_login_new_user_and_trying_refresh_token(hass, aiohttp_client):
},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
tokens = await resp.json()
assert "refresh_token" not in tokens
assert (
@ -87,12 +88,12 @@ async def test_login_new_user_and_trying_refresh_token(hass, aiohttp_client):
# Test using access token to hit API.
resp = await client.get("/api/")
assert resp.status == 401
assert resp.status == HTTPStatus.UNAUTHORIZED
resp = await client.get(
"/api/", headers={"authorization": f"Bearer {tokens['access_token']}"}
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
def test_auth_code_store_expiration():
@ -179,7 +180,7 @@ async def test_refresh_token_system_generated(hass, aiohttp_client):
},
)
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
result = await resp.json()
assert result["error"] == "invalid_request"
@ -188,7 +189,7 @@ async def test_refresh_token_system_generated(hass, aiohttp_client):
data={"grant_type": "refresh_token", "refresh_token": refresh_token.token},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
tokens = await resp.json()
assert (
await hass.auth.async_validate_access_token(tokens["access_token"]) is not None
@ -206,7 +207,7 @@ async def test_refresh_token_different_client_id(hass, aiohttp_client):
data={"grant_type": "refresh_token", "refresh_token": refresh_token.token},
)
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
result = await resp.json()
assert result["error"] == "invalid_request"
@ -220,7 +221,7 @@ async def test_refresh_token_different_client_id(hass, aiohttp_client):
},
)
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
result = await resp.json()
assert result["error"] == "invalid_request"
@ -234,7 +235,7 @@ async def test_refresh_token_different_client_id(hass, aiohttp_client):
},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
tokens = await resp.json()
assert (
await hass.auth.async_validate_access_token(tokens["access_token"]) is not None
@ -262,7 +263,7 @@ async def test_refresh_token_provider_rejected(
},
)
assert resp.status == 403
assert resp.status == HTTPStatus.FORBIDDEN
result = await resp.json()
assert result["error"] == "access_denied"
assert result["error_description"] == "Invalid access"
@ -283,7 +284,7 @@ async def test_revoking_refresh_token(hass, aiohttp_client):
},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
tokens = await resp.json()
assert (
await hass.auth.async_validate_access_token(tokens["access_token"]) is not None
@ -293,7 +294,7 @@ async def test_revoking_refresh_token(hass, aiohttp_client):
resp = await client.post(
"/auth/token", data={"token": refresh_token.token, "action": "revoke"}
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
# Old access token should be no longer valid
assert await hass.auth.async_validate_access_token(tokens["access_token"]) is None
@ -308,7 +309,7 @@ async def test_revoking_refresh_token(hass, aiohttp_client):
},
)
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
async def test_ws_long_lived_access_token(hass, hass_ws_client, hass_access_token):

View file

@ -1,4 +1,5 @@
"""Tests for the link user flow."""
from http import HTTPStatus
from unittest.mock import patch
from . import async_setup_auth
@ -40,7 +41,7 @@ async def async_get_code(hass, aiohttp_client):
"type": "link_user",
},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
step = await resp.json()
resp = await client.post(
@ -48,7 +49,7 @@ async def async_get_code(hass, aiohttp_client):
json={"client_id": CLIENT_ID, "username": "2nd-user", "password": "2nd-pass"},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
step = await resp.json()
return {
@ -72,7 +73,7 @@ async def test_link_user(hass, aiohttp_client):
headers={"authorization": f"Bearer {info['access_token']}"},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert len(info["user"].credentials) == 1
@ -89,7 +90,7 @@ async def test_link_user_invalid_client_id(hass, aiohttp_client):
headers={"authorization": f"Bearer {info['access_token']}"},
)
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
assert len(info["user"].credentials) == 0
@ -105,7 +106,7 @@ async def test_link_user_invalid_code(hass, aiohttp_client):
headers={"authorization": f"Bearer {info['access_token']}"},
)
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
assert len(info["user"].credentials) == 0
@ -122,7 +123,7 @@ async def test_link_user_invalid_auth(hass, aiohttp_client):
headers={"authorization": "Bearer invalid"},
)
assert resp.status == 401
assert resp.status == HTTPStatus.UNAUTHORIZED
assert len(info["user"].credentials) == 0
@ -142,7 +143,7 @@ async def test_link_user_already_linked_same_user(hass, aiohttp_client):
headers={"authorization": f"Bearer {info['access_token']}"},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
# The credential was not added because it saw that it was already linked
assert len(info["user"].credentials) == 0
@ -165,7 +166,7 @@ async def test_link_user_already_linked_other_user(hass, aiohttp_client):
headers={"authorization": f"Bearer {info['access_token']}"},
)
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
# The credential was not added because it saw that it was already linked
assert len(info["user"].credentials) == 0
assert len(another_user.credentials) == 0

View file

@ -1,4 +1,5 @@
"""Tests for the login flow."""
from http import HTTPStatus
from unittest.mock import patch
from . import async_setup_auth
@ -10,7 +11,7 @@ async def test_fetch_auth_providers(hass, aiohttp_client):
"""Test fetching auth providers."""
client = await async_setup_auth(hass, aiohttp_client)
resp = await client.get("/auth/providers")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert await resp.json() == [
{"name": "Example", "type": "insecure_example", "id": None}
]
@ -24,7 +25,7 @@ async def test_fetch_auth_providers_onboarding(hass, aiohttp_client):
return_value=False,
):
resp = await client.get("/auth/providers")
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
assert await resp.json() == {
"message": "Onboarding not finished",
"code": "onboarding_required",
@ -35,7 +36,7 @@ async def test_cannot_get_flows_in_progress(hass, aiohttp_client):
"""Test we cannot get flows in progress."""
client = await async_setup_auth(hass, aiohttp_client, [])
resp = await client.get("/auth/login_flow")
assert resp.status == 405
assert resp.status == HTTPStatus.METHOD_NOT_ALLOWED
async def test_invalid_username_password(hass, aiohttp_client):
@ -49,7 +50,7 @@ async def test_invalid_username_password(hass, aiohttp_client):
"redirect_uri": CLIENT_REDIRECT_URI,
},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
step = await resp.json()
# Incorrect username
@ -62,7 +63,7 @@ async def test_invalid_username_password(hass, aiohttp_client):
},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
step = await resp.json()
assert step["step_id"] == "init"
@ -78,7 +79,7 @@ async def test_invalid_username_password(hass, aiohttp_client):
},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
step = await resp.json()
assert step["step_id"] == "init"
@ -101,7 +102,7 @@ async def test_login_exist_user(hass, aiohttp_client):
"redirect_uri": CLIENT_REDIRECT_URI,
},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
step = await resp.json()
resp = await client.post(
@ -109,7 +110,7 @@ async def test_login_exist_user(hass, aiohttp_client):
json={"client_id": CLIENT_ID, "username": "test-user", "password": "test-pass"},
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
step = await resp.json()
assert step["type"] == "create_entry"
assert len(step["result"]) > 1