Use http.HTTPStatus in components/s* (#58291)

This commit is contained in:
Ville Skyttä 2021-10-23 21:49:04 +03:00 committed by GitHub
parent 05671557f0
commit 50e0c58310
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 180 additions and 158 deletions

View file

@ -1,4 +1,5 @@
"""SendGrid notification service."""
from http import HTTPStatus
import logging
from sendgrid import SendGridAPIClient
@ -15,7 +16,6 @@ from homeassistant.const import (
CONF_RECIPIENT,
CONF_SENDER,
CONTENT_TYPE_TEXT_PLAIN,
HTTP_ACCEPTED,
)
import homeassistant.helpers.config_validation as cv
@ -66,5 +66,5 @@ class SendgridNotificationService(BaseNotificationService):
}
response = self._sg.client.mail.send.post(request_body=data)
if response.status_code != HTTP_ACCEPTED:
if response.status_code != HTTPStatus.ACCEPTED:
_LOGGER.error("Unable to send notification")

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from http import HTTPStatus
import logging
from typing import Any, Final
@ -13,12 +14,7 @@ import async_timeout
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_USERNAME,
HTTP_UNAUTHORIZED,
)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import aiohttp_client
@ -155,7 +151,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self.hass, self.host, self.info, user_input
)
except aiohttp.ClientResponseError as error:
if error.status == HTTP_UNAUTHORIZED:
if error.status == HTTPStatus.UNAUTHORIZED:
errors["base"] = "invalid_auth"
else:
errors["base"] = "cannot_connect"

View file

@ -1,5 +1,6 @@
"""Sensor for SigFox devices."""
import datetime
from http import HTTPStatus
import json
import logging
from urllib.parse import urljoin
@ -8,7 +9,7 @@ import requests
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity
from homeassistant.const import CONF_NAME, HTTP_OK, HTTP_UNAUTHORIZED
from homeassistant.const import CONF_NAME
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
@ -65,8 +66,8 @@ class SigfoxAPI:
"""Check API credentials are valid."""
url = urljoin(API_URL, "devicetypes")
response = requests.get(url, auth=self._auth, timeout=10)
if response.status_code != HTTP_OK:
if response.status_code == HTTP_UNAUTHORIZED:
if response.status_code != HTTPStatus.OK:
if response.status_code == HTTPStatus.UNAUTHORIZED:
_LOGGER.error("Invalid credentials for Sigfox API")
else:
_LOGGER.error(

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Iterable
from http import HTTPStatus
import importlib
import logging
@ -12,13 +13,7 @@ from pysmartthings import Attribute, Capability, SmartThings
from pysmartthings.device import DeviceEntity
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
CONF_ACCESS_TOKEN,
CONF_CLIENT_ID,
CONF_CLIENT_SECRET,
HTTP_FORBIDDEN,
HTTP_UNAUTHORIZED,
)
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_CLIENT_ID, CONF_CLIENT_SECRET
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
@ -164,7 +159,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.data[DOMAIN][DATA_BROKERS][entry.entry_id] = broker
except ClientResponseError as ex:
if ex.status in (HTTP_UNAUTHORIZED, HTTP_FORBIDDEN):
if ex.status in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN):
_LOGGER.exception(
"Unable to setup configuration entry '%s' - please reconfigure the integration",
entry.title,
@ -197,7 +192,7 @@ async def async_get_entry_scenes(entry: ConfigEntry, api):
try:
return await api.scenes(location_id=entry.data[CONF_LOCATION_ID])
except ClientResponseError as ex:
if ex.status == HTTP_FORBIDDEN:
if ex.status == HTTPStatus.FORBIDDEN:
_LOGGER.exception(
"Unable to load scenes for configuration entry '%s' because the access token does not have the required access",
entry.title,
@ -220,12 +215,12 @@ async def async_remove_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Perform clean-up when entry is being removed."""
api = SmartThings(async_get_clientsession(hass), entry.data[CONF_ACCESS_TOKEN])
# Remove the installed_app, which if already removed raises a HTTP_FORBIDDEN error.
# Remove the installed_app, which if already removed raises a HTTPStatus.FORBIDDEN error.
installed_app_id = entry.data[CONF_INSTALLED_APP_ID]
try:
await api.delete_installed_app(installed_app_id)
except ClientResponseError as ex:
if ex.status == HTTP_FORBIDDEN:
if ex.status == HTTPStatus.FORBIDDEN:
_LOGGER.debug(
"Installed app %s has already been removed",
installed_app_id,
@ -236,7 +231,7 @@ async def async_remove_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
_LOGGER.debug("Removed installed app %s", installed_app_id)
# Remove the app if not referenced by other entries, which if already
# removed raises a HTTP_FORBIDDEN error.
# removed raises a HTTPStatus.FORBIDDEN error.
all_entries = hass.config_entries.async_entries(DOMAIN)
app_id = entry.data[CONF_APP_ID]
app_count = sum(1 for entry in all_entries if entry.data[CONF_APP_ID] == app_id)
@ -250,7 +245,7 @@ async def async_remove_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
try:
await api.delete_app(app_id)
except ClientResponseError as ex:
if ex.status == HTTP_FORBIDDEN:
if ex.status == HTTPStatus.FORBIDDEN:
_LOGGER.debug("App %s has already been removed", app_id, exc_info=True)
else:
raise

View file

@ -1,4 +1,5 @@
"""Config flow to configure SmartThings."""
from http import HTTPStatus
import logging
from aiohttp import ClientResponseError
@ -7,13 +8,7 @@ from pysmartthings.installedapp import format_install_url
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import (
CONF_ACCESS_TOKEN,
CONF_CLIENT_ID,
CONF_CLIENT_SECRET,
HTTP_FORBIDDEN,
HTTP_UNAUTHORIZED,
)
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_CLIENT_ID, CONF_CLIENT_SECRET
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import (
@ -142,12 +137,12 @@ class SmartThingsFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
)
return self._show_step_pat(errors)
except ClientResponseError as ex:
if ex.status == HTTP_UNAUTHORIZED:
if ex.status == HTTPStatus.UNAUTHORIZED:
errors[CONF_ACCESS_TOKEN] = "token_unauthorized"
_LOGGER.debug(
"Unauthorized error received setting up SmartApp", exc_info=True
)
elif ex.status == HTTP_FORBIDDEN:
elif ex.status == HTTPStatus.FORBIDDEN:
errors[CONF_ACCESS_TOKEN] = "token_forbidden"
_LOGGER.debug(
"Forbidden error received setting up SmartApp", exc_info=True

View file

@ -1,5 +1,6 @@
"""Support to send data to a Splunk instance."""
import asyncio
from http import HTTPStatus
import json
import logging
import time
@ -111,7 +112,7 @@ async def async_setup(hass, config):
try:
await event_collector.queue(json.dumps(payload, cls=JSONEncoder), send=True)
except SplunkPayloadError as err:
if err.status == 401:
if err.status == HTTPStatus.UNAUTHORIZED:
_LOGGER.error(err)
else:
_LOGGER.warning(err)

View file

@ -1,5 +1,6 @@
"""Config flow for Logitech Squeezebox integration."""
import asyncio
from http import HTTPStatus
import logging
from pysqueezebox import Server, async_discover
@ -8,13 +9,7 @@ import voluptuous as vol
from homeassistant import config_entries, data_entry_flow
from homeassistant.components.dhcp import MAC_ADDRESS
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
HTTP_UNAUTHORIZED,
)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_PORT, CONF_USERNAME
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.entity_registry import async_get
@ -115,7 +110,7 @@ class SqueezeboxConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
try:
status = await server.async_query("serverstatus")
if not status:
if server.http_status == HTTP_UNAUTHORIZED:
if server.http_status == HTTPStatus.UNAUTHORIZED:
return "invalid_auth"
return "cannot_connect"
except Exception: # pylint: disable=broad-except

View file

@ -2,6 +2,7 @@
from __future__ import annotations
from datetime import timedelta
from http import HTTPStatus
import logging
from xml.parsers.expat import ExpatError
@ -19,7 +20,6 @@ from homeassistant.const import (
CONF_MONITORED_VARIABLES,
CONF_NAME,
DATA_GIGABYTES,
HTTP_OK,
PERCENTAGE,
)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
@ -195,7 +195,7 @@ class StartcaData:
url = f"https://www.start.ca/support/usage/api?key={self.api_key}"
with async_timeout.timeout(REQUEST_TIMEOUT):
req = await self.websession.get(url)
if req.status != HTTP_OK:
if req.status != HTTPStatus.OK:
_LOGGER.error("Request failed with status: %u", req.status)
return False

View file

@ -1,6 +1,7 @@
"""Provide functionality to stream HLS."""
from __future__ import annotations
from http import HTTPStatus
from typing import TYPE_CHECKING, cast
from aiohttp import web
@ -193,7 +194,7 @@ class HlsPlaylistView(StreamView):
"""Return a HTTP Bad Request response."""
return web.Response(
body=None,
status=400,
status=HTTPStatus.BAD_REQUEST,
# From Appendix B.1 of the RFC:
# Successful responses to blocking Playlist requests should be cached
# for six Target Durations. Unsuccessful responses (such as 404s) should
@ -211,7 +212,7 @@ class HlsPlaylistView(StreamView):
"""Return a HTTP Not Found response."""
return web.Response(
body=None,
status=404,
status=HTTPStatus.NOT_FOUND,
headers={
"Cache-Control": f"max-age={(4 if blocking else 1)*target_duration:.0f}"
},
@ -351,7 +352,7 @@ class HlsPartView(StreamView):
):
return web.Response(
body=None,
status=404,
status=HTTPStatus.NOT_FOUND,
headers={"Cache-Control": f"max-age={track.target_duration:.0f}"},
)
# If the part is ready or has been hinted,
@ -399,7 +400,7 @@ class HlsSegmentView(StreamView):
):
return web.Response(
body=None,
status=404,
status=HTTPStatus.NOT_FOUND,
headers={"Cache-Control": f"max-age={track.target_duration:.0f}"},
)
return web.Response(

View file

@ -1,4 +1,5 @@
"""SynologyChat platform for notify component."""
from http import HTTPStatus
import json
import logging
@ -10,7 +11,7 @@ from homeassistant.components.notify import (
PLATFORM_SCHEMA,
BaseNotificationService,
)
from homeassistant.const import CONF_RESOURCE, CONF_VERIFY_SSL, HTTP_CREATED, HTTP_OK
from homeassistant.const import CONF_RESOURCE, CONF_VERIFY_SSL
import homeassistant.helpers.config_validation as cv
ATTR_FILE_URL = "file_url"
@ -57,7 +58,7 @@ class SynologyChatNotificationService(BaseNotificationService):
self._resource, data=to_send, timeout=10, verify=self._verify_ssl
)
if response.status_code not in (HTTP_OK, HTTP_CREATED):
if response.status_code not in (HTTPStatus.OK, HTTPStatus.CREATED):
_LOGGER.exception(
"Error sending message. Response %d: %s:",
response.status_code,

View file

@ -1,5 +1,6 @@
"""Test the Shelly config flow."""
import asyncio
from http import HTTPStatus
from unittest.mock import AsyncMock, Mock, patch
import aiohttp
@ -325,8 +326,14 @@ async def test_form_firmware_unsupported(hass):
@pytest.mark.parametrize(
"error",
[
(aiohttp.ClientResponseError(Mock(), (), status=400), "cannot_connect"),
(aiohttp.ClientResponseError(Mock(), (), status=401), "invalid_auth"),
(
aiohttp.ClientResponseError(Mock(), (), status=HTTPStatus.BAD_REQUEST),
"cannot_connect",
),
(
aiohttp.ClientResponseError(Mock(), (), status=HTTPStatus.UNAUTHORIZED),
"invalid_auth",
),
(asyncio.TimeoutError, "cannot_connect"),
(ValueError, "unknown"),
],
@ -480,7 +487,10 @@ async def test_zeroconf_sleeping_device(hass):
@pytest.mark.parametrize(
"error",
[
(aiohttp.ClientResponseError(Mock(), (), status=400), "cannot_connect"),
(
aiohttp.ClientResponseError(Mock(), (), status=HTTPStatus.BAD_REQUEST),
"cannot_connect",
),
(asyncio.TimeoutError, "cannot_connect"),
],
)

View file

@ -1,4 +1,5 @@
"""Test shopping list component."""
from http import HTTPStatus
from homeassistant.components.shopping_list.const import (
DOMAIN,
@ -11,7 +12,7 @@ from homeassistant.components.websocket_api.const import (
ERR_NOT_FOUND,
TYPE_RESULT,
)
from homeassistant.const import ATTR_NAME, HTTP_NOT_FOUND
from homeassistant.const import ATTR_NAME
from homeassistant.helpers import intent
@ -115,7 +116,7 @@ async def test_deprecated_api_get_all(hass, hass_client, sl_setup):
client = await hass_client()
resp = await client.get("/api/shopping_list")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
data = await resp.json()
assert len(data) == 2
assert data[0]["name"] == "beer"
@ -169,7 +170,7 @@ async def test_deprecated_api_update(hass, hass_client, sl_setup):
f"/api/shopping_list/item/{beer_id}", json={"name": "soda"}
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
data = await resp.json()
assert data == {"id": beer_id, "name": "soda", "complete": False}
@ -177,7 +178,7 @@ async def test_deprecated_api_update(hass, hass_client, sl_setup):
f"/api/shopping_list/item/{wine_id}", json={"complete": True}
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
data = await resp.json()
assert data == {"id": wine_id, "name": "wine", "complete": True}
@ -238,12 +239,12 @@ async def test_api_update_fails(hass, hass_client, sl_setup):
client = await hass_client()
resp = await client.post("/api/shopping_list/non_existing", json={"name": "soda"})
assert resp.status == HTTP_NOT_FOUND
assert resp.status == HTTPStatus.NOT_FOUND
beer_id = hass.data["shopping_list"].items[0]["id"]
resp = await client.post(f"/api/shopping_list/item/{beer_id}", json={"name": 123})
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
async def test_ws_update_item_fail(hass, hass_ws_client, sl_setup):
@ -288,10 +289,10 @@ async def test_deprecated_api_clear_completed(hass, hass_client, sl_setup):
resp = await client.post(
f"/api/shopping_list/item/{beer_id}", json={"complete": True}
)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
resp = await client.post("/api/shopping_list/clear_completed")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
items = hass.data["shopping_list"].items
assert len(items) == 1
@ -334,7 +335,7 @@ async def test_deprecated_api_create(hass, hass_client, sl_setup):
client = await hass_client()
resp = await client.post("/api/shopping_list/item", json={"name": "soda"})
assert resp.status == 200
assert resp.status == HTTPStatus.OK
data = await resp.json()
assert data["name"] == "soda"
assert data["complete"] is False
@ -351,7 +352,7 @@ async def test_deprecated_api_create_fail(hass, hass_client, sl_setup):
client = await hass_client()
resp = await client.post("/api/shopping_list/item", json={"name": 1234})
assert resp.status == 400
assert resp.status == HTTPStatus.BAD_REQUEST
assert len(hass.data["shopping_list"].items) == 0

View file

@ -1,4 +1,5 @@
"""Tests for the sigfox sensor."""
from http import HTTPStatus
import re
import requests_mock
@ -34,7 +35,7 @@ async def test_invalid_credentials(hass):
"""Test for invalid credentials."""
with requests_mock.Mocker() as mock_req:
url = re.compile(API_URL + "devicetypes")
mock_req.get(url, text="{}", status_code=401)
mock_req.get(url, text="{}", status_code=HTTPStatus.UNAUTHORIZED)
assert await async_setup_component(hass, "sensor", VALID_CONFIG)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids()) == 0
@ -44,7 +45,9 @@ async def test_valid_credentials(hass):
"""Test for valid credentials."""
with requests_mock.Mocker() as mock_req:
url1 = re.compile(API_URL + "devicetypes")
mock_req.get(url1, text='{"data":[{"id":"fake_type"}]}', status_code=200)
mock_req.get(
url1, text='{"data":[{"id":"fake_type"}]}', status_code=HTTPStatus.OK
)
url2 = re.compile(API_URL + "devicetypes/fake_type/devices")
mock_req.get(url2, text='{"data":[{"id":"fake_id"}]}')

View file

@ -1,5 +1,5 @@
"""The tests for the signal_messenger platform."""
from http import HTTPStatus
import os
import tempfile
import unittest
@ -54,12 +54,12 @@ class TestSignalMesssenger(unittest.TestCase):
mock.register_uri(
"POST",
"http://127.0.0.1:8080/v2/send",
status_code=201,
status_code=HTTPStatus.CREATED,
)
mock.register_uri(
"GET",
"http://127.0.0.1:8080/v1/about",
status_code=200,
status_code=HTTPStatus.OK,
json={"versions": ["v1", "v2"]},
)
with self.assertLogs(
@ -77,12 +77,12 @@ class TestSignalMesssenger(unittest.TestCase):
mock.register_uri(
"POST",
"http://127.0.0.1:8080/v2/send",
status_code=201,
status_code=HTTPStatus.CREATED,
)
mock.register_uri(
"GET",
"http://127.0.0.1:8080/v1/about",
status_code=200,
status_code=HTTPStatus.OK,
json={"versions": ["v1", "v2"]},
)
with self.assertLogs(
@ -106,12 +106,12 @@ class TestSignalMesssenger(unittest.TestCase):
mock.register_uri(
"POST",
"http://127.0.0.1:8080/v2/send",
status_code=201,
status_code=HTTPStatus.CREATED,
)
mock.register_uri(
"GET",
"http://127.0.0.1:8080/v1/about",
status_code=200,
status_code=HTTPStatus.OK,
json={"versions": ["v1", "v2"]},
)
with self.assertLogs(

View file

@ -1,4 +1,5 @@
"""The tests for the SleepIQ component."""
from http import HTTPStatus
from unittest.mock import MagicMock, patch
from homeassistant import setup
@ -41,7 +42,7 @@ async def test_setup_login_failed(hass, requests_mock):
mock_responses(requests_mock)
requests_mock.put(
"https://prod-api.sleepiq.sleepnumber.com/rest/login",
status_code=401,
status_code=HTTPStatus.UNAUTHORIZED,
json=load_fixture("sleepiq-login-failed.json"),
)

View file

@ -1,4 +1,5 @@
"""Test the Smappee component config flow module."""
from http import HTTPStatus
from unittest.mock import patch
from homeassistant import data_entry_flow, setup
@ -424,7 +425,7 @@ async def test_full_user_flow(
client = await hass_client_no_auth()
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert resp.headers["content-type"] == "text/html; charset=utf-8"
aioclient_mock.post(

View file

@ -1,5 +1,6 @@
"""Test configuration and mocks for Smart Meter Texas."""
import asyncio
from http import HTTPStatus
import json
from pathlib import Path
@ -67,7 +68,7 @@ def mock_connection(
elif auth_fail:
aioclient_mock.post(
auth_endpoint,
status=400,
status=HTTPStatus.BAD_REQUEST,
json={"errormessage": "ERR-USR-INVALIDPASSWORDERROR"},
)
else: # auth_timeout

View file

@ -551,7 +551,10 @@ async def test_webhook_problem_shows_error(hass, smartthings_mock):
data = {"error": {}}
request_info = Mock(real_url="http://example.com")
error = APIResponseError(
request_info=request_info, history=None, data=data, status=422
request_info=request_info,
history=None,
data=data,
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)
error.is_target_error = Mock(return_value=True)
smartthings_mock.apps.side_effect = error
@ -591,7 +594,10 @@ async def test_api_error_shows_error(hass, smartthings_mock):
data = {"error": {}}
request_info = Mock(real_url="http://example.com")
error = APIResponseError(
request_info=request_info, history=None, data=data, status=400
request_info=request_info,
history=None,
data=data,
status=HTTPStatus.BAD_REQUEST,
)
smartthings_mock.apps.side_effect = error

View file

@ -58,7 +58,7 @@ async def test_unrecoverable_api_errors_create_new_flow(
config_entry.add_to_hass(hass)
request_info = Mock(real_url="http://example.com")
smartthings_mock.app.side_effect = ClientResponseError(
request_info=request_info, history=None, status=401
request_info=request_info, history=None, status=HTTPStatus.UNAUTHORIZED
)
# Assert setup returns false

View file

@ -1,5 +1,6 @@
"""Tests for the Somfy config flow."""
import asyncio
from http import HTTPStatus
from unittest.mock import patch
from homeassistant import config_entries, data_entry_flow, setup
@ -69,7 +70,7 @@ async def test_full_flow(
client = await hass_client_no_auth()
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert resp.headers["content-type"] == "text/html; charset=utf-8"
aioclient_mock.post(

View file

@ -1,4 +1,5 @@
"""Tests for the Sonarr component."""
from http import HTTPStatus
from socket import gaierror as SocketGIAError
from unittest.mock import patch
@ -148,13 +149,13 @@ def mock_connection_invalid_auth(
"""Mock Sonarr invalid auth errors."""
sonarr_url = f"http://{host}:{port}{base_path}"
aioclient_mock.get(f"{sonarr_url}/system/status", status=403)
aioclient_mock.get(f"{sonarr_url}/diskspace", status=403)
aioclient_mock.get(f"{sonarr_url}/calendar", status=403)
aioclient_mock.get(f"{sonarr_url}/command", status=403)
aioclient_mock.get(f"{sonarr_url}/queue", status=403)
aioclient_mock.get(f"{sonarr_url}/series", status=403)
aioclient_mock.get(f"{sonarr_url}/missing/wanted", status=403)
aioclient_mock.get(f"{sonarr_url}/system/status", status=HTTPStatus.FORBIDDEN)
aioclient_mock.get(f"{sonarr_url}/diskspace", status=HTTPStatus.FORBIDDEN)
aioclient_mock.get(f"{sonarr_url}/calendar", status=HTTPStatus.FORBIDDEN)
aioclient_mock.get(f"{sonarr_url}/command", status=HTTPStatus.FORBIDDEN)
aioclient_mock.get(f"{sonarr_url}/queue", status=HTTPStatus.FORBIDDEN)
aioclient_mock.get(f"{sonarr_url}/series", status=HTTPStatus.FORBIDDEN)
aioclient_mock.get(f"{sonarr_url}/missing/wanted", status=HTTPStatus.FORBIDDEN)
def mock_connection_server_error(
@ -166,13 +167,21 @@ def mock_connection_server_error(
"""Mock Sonarr server errors."""
sonarr_url = f"http://{host}:{port}{base_path}"
aioclient_mock.get(f"{sonarr_url}/system/status", status=500)
aioclient_mock.get(f"{sonarr_url}/diskspace", status=500)
aioclient_mock.get(f"{sonarr_url}/calendar", status=500)
aioclient_mock.get(f"{sonarr_url}/command", status=500)
aioclient_mock.get(f"{sonarr_url}/queue", status=500)
aioclient_mock.get(f"{sonarr_url}/series", status=500)
aioclient_mock.get(f"{sonarr_url}/missing/wanted", status=500)
aioclient_mock.get(
f"{sonarr_url}/system/status", status=HTTPStatus.INTERNAL_SERVER_ERROR
)
aioclient_mock.get(
f"{sonarr_url}/diskspace", status=HTTPStatus.INTERNAL_SERVER_ERROR
)
aioclient_mock.get(
f"{sonarr_url}/calendar", status=HTTPStatus.INTERNAL_SERVER_ERROR
)
aioclient_mock.get(f"{sonarr_url}/command", status=HTTPStatus.INTERNAL_SERVER_ERROR)
aioclient_mock.get(f"{sonarr_url}/queue", status=HTTPStatus.INTERNAL_SERVER_ERROR)
aioclient_mock.get(f"{sonarr_url}/series", status=HTTPStatus.INTERNAL_SERVER_ERROR)
aioclient_mock.get(
f"{sonarr_url}/missing/wanted", status=HTTPStatus.INTERNAL_SERVER_ERROR
)
async def setup_integration(

View file

@ -1,4 +1,6 @@
"""The tests for the Home Assistant SpaceAPI component."""
from http import HTTPStatus
# pylint: disable=protected-access
from unittest.mock import patch
@ -91,7 +93,7 @@ def mock_client(hass, hass_client):
async def test_spaceapi_get(hass, mock_client):
"""Test response after start-up Home Assistant."""
resp = await mock_client.get(URL_API_SPACEAPI)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
data = await resp.json()
@ -137,7 +139,7 @@ async def test_spaceapi_state_get(hass, mock_client):
hass.states.async_set("test.test_door", True)
resp = await mock_client.get(URL_API_SPACEAPI)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
data = await resp.json()
assert data["state"]["open"] == bool(1)
@ -146,7 +148,7 @@ async def test_spaceapi_state_get(hass, mock_client):
async def test_spaceapi_sensors_get(hass, mock_client):
"""Test the response for the sensors."""
resp = await mock_client.get(URL_API_SPACEAPI)
assert resp.status == 200
assert resp.status == HTTPStatus.OK
data = await resp.json()
assert data["sensors"] == SENSOR_OUTPUT

View file

@ -1,4 +1,5 @@
"""Tests for the Spotify config flow."""
from http import HTTPStatus
from unittest.mock import patch
from spotipy import SpotifyException
@ -80,7 +81,7 @@ async def test_full_flow(
client = await hass_client_no_auth()
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert resp.headers["content-type"] == "text/html; charset=utf-8"
aioclient_mock.post(

View file

@ -1,4 +1,5 @@
"""Test the Logitech Squeezebox config flow."""
from http import HTTPStatus
from unittest.mock import patch
from pysqueezebox import Server
@ -6,13 +7,7 @@ from pysqueezebox import Server
from homeassistant import config_entries
from homeassistant.components.dhcp import HOSTNAME, IP_ADDRESS, MAC_ADDRESS
from homeassistant.components.squeezebox.const import DOMAIN
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
HTTP_UNAUTHORIZED,
)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_PORT, CONF_USERNAME
from homeassistant.data_entry_flow import (
RESULT_TYPE_ABORT,
RESULT_TYPE_CREATE_ENTRY,
@ -39,7 +34,7 @@ async def mock_failed_discover(_discovery_callback):
async def patch_async_query_unauthorized(self, *args):
"""Mock an unauthorized query."""
self.http_status = HTTP_UNAUTHORIZED
self.http_status = HTTPStatus.UNAUTHORIZED
return False
@ -128,7 +123,7 @@ async def test_form_invalid_auth(hass):
)
async def patch_async_query(self, *args):
self.http_status = HTTP_UNAUTHORIZED
self.http_status = HTTPStatus.UNAUTHORIZED
return False
with patch("pysqueezebox.Server.async_query", new=patch_async_query):

View file

@ -13,6 +13,7 @@ from __future__ import annotations
import asyncio
from collections import deque
from http import HTTPStatus
import logging
import threading
from unittest.mock import patch
@ -171,7 +172,7 @@ class HLSSync:
self.check_requests_ready()
return self._original_not_found()
def response(self, body, headers, status=200):
def response(self, body, headers, status=HTTPStatus.OK):
"""Intercept the Response call so we know when the web handler is finished."""
self._num_finished += 1
self.check_requests_ready()

View file

@ -1,5 +1,6 @@
"""The tests for hls streams."""
from datetime import timedelta
from http import HTTPStatus
from unittest.mock import patch
from urllib.parse import urlparse
@ -15,7 +16,6 @@ from homeassistant.components.stream.const import (
NUM_PLAYLIST_SEGMENTS,
)
from homeassistant.components.stream.core import Part
from homeassistant.const import HTTP_NOT_FOUND
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
@ -138,23 +138,23 @@ async def test_hls_stream(hass, hls_stream, stream_worker_sync):
# Fetch master playlist
master_playlist_response = await hls_client.get()
assert master_playlist_response.status == 200
assert master_playlist_response.status == HTTPStatus.OK
# Fetch init
master_playlist = await master_playlist_response.text()
init_response = await hls_client.get("/init.mp4")
assert init_response.status == 200
assert init_response.status == HTTPStatus.OK
# Fetch playlist
playlist_url = "/" + master_playlist.splitlines()[-1]
playlist_response = await hls_client.get(playlist_url)
assert playlist_response.status == 200
assert playlist_response.status == HTTPStatus.OK
# Fetch segment
playlist = await playlist_response.text()
segment_url = "/" + [line for line in playlist.splitlines() if line][-1]
segment_response = await hls_client.get(segment_url)
assert segment_response.status == 200
assert segment_response.status == HTTPStatus.OK
stream_worker_sync.resume()
@ -163,7 +163,7 @@ async def test_hls_stream(hass, hls_stream, stream_worker_sync):
# Ensure playlist not accessible after stream ends
fail_response = await hls_client.get()
assert fail_response.status == HTTP_NOT_FOUND
assert fail_response.status == HTTPStatus.NOT_FOUND
async def test_stream_timeout(hass, hass_client, stream_worker_sync):
@ -186,7 +186,7 @@ async def test_stream_timeout(hass, hass_client, stream_worker_sync):
# Fetch playlist
parsed_url = urlparse(url)
playlist_response = await http_client.get(parsed_url.path)
assert playlist_response.status == 200
assert playlist_response.status == HTTPStatus.OK
# Wait a minute
future = dt_util.utcnow() + timedelta(minutes=1)
@ -194,7 +194,7 @@ async def test_stream_timeout(hass, hass_client, stream_worker_sync):
# Fetch again to reset timer
playlist_response = await http_client.get(parsed_url.path)
assert playlist_response.status == 200
assert playlist_response.status == HTTPStatus.OK
stream_worker_sync.resume()
@ -205,7 +205,7 @@ async def test_stream_timeout(hass, hass_client, stream_worker_sync):
# Ensure playlist not accessible
fail_response = await http_client.get(parsed_url.path)
assert fail_response.status == HTTP_NOT_FOUND
assert fail_response.status == HTTPStatus.NOT_FOUND
async def test_stream_timeout_after_stop(hass, hass_client, stream_worker_sync):
@ -280,7 +280,7 @@ async def test_hls_playlist_view_no_output(hass, hls_stream):
# Fetch playlist
resp = await hls_client.get("/playlist.m3u8")
assert resp.status == 404
assert resp.status == HTTPStatus.NOT_FOUND
async def test_hls_playlist_view(hass, hls_stream, stream_worker_sync):
@ -298,7 +298,7 @@ async def test_hls_playlist_view(hass, hls_stream, stream_worker_sync):
hls_client = await hls_stream(stream)
resp = await hls_client.get("/playlist.m3u8")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert await resp.text() == make_playlist(
sequence=0, segments=[make_segment(0), make_segment(1)]
)
@ -307,7 +307,7 @@ async def test_hls_playlist_view(hass, hls_stream, stream_worker_sync):
hls.put(segment)
await hass.async_block_till_done()
resp = await hls_client.get("/playlist.m3u8")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert await resp.text() == make_playlist(
sequence=0, segments=[make_segment(0), make_segment(1), make_segment(2)]
)
@ -333,7 +333,7 @@ async def test_hls_max_segments(hass, hls_stream, stream_worker_sync):
await hass.async_block_till_done()
resp = await hls_client.get("/playlist.m3u8")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
# Only NUM_PLAYLIST_SEGMENTS are returned in the playlist.
start = MAX_SEGMENTS + 1 - NUM_PLAYLIST_SEGMENTS
@ -356,12 +356,12 @@ async def test_hls_max_segments(hass, hls_stream, stream_worker_sync):
# The segment that fell off the buffer is not accessible
with patch.object(hls.stream_settings, "hls_part_timeout", 0.1):
segment_response = await hls_client.get("/segment/0.m4s")
assert segment_response.status == 404
assert segment_response.status == HTTPStatus.NOT_FOUND
# However all segments in the buffer are accessible, even those that were not in the playlist.
for sequence in range(1, MAX_SEGMENTS + 1):
segment_response = await hls_client.get(f"/segment/{sequence}.m4s")
assert segment_response.status == 200
assert segment_response.status == HTTPStatus.OK
stream_worker_sync.resume()
stream.stop()
@ -390,7 +390,7 @@ async def test_hls_playlist_view_discontinuity(hass, hls_stream, stream_worker_s
hls_client = await hls_stream(stream)
resp = await hls_client.get("/playlist.m3u8")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert await resp.text() == make_playlist(
sequence=0,
segments=[
@ -428,7 +428,7 @@ async def test_hls_max_segments_discontinuity(hass, hls_stream, stream_worker_sy
await hass.async_block_till_done()
resp = await hls_client.get("/playlist.m3u8")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
# Only NUM_PLAYLIST_SEGMENTS are returned in the playlist causing the
# EXT-X-DISCONTINUITY tag to be omitted and EXT-X-DISCONTINUITY-SEQUENCE

View file

@ -1,5 +1,6 @@
"""The tests for hls streams."""
import asyncio
from http import HTTPStatus
import itertools
import re
from urllib.parse import urlparse
@ -16,7 +17,6 @@ from homeassistant.components.stream.const import (
HLS_PROVIDER,
)
from homeassistant.components.stream.core import Part
from homeassistant.const import HTTP_NOT_FOUND
from homeassistant.setup import async_setup_component
from .test_hls import SEGMENT_DURATION, STREAM_SOURCE, HlsClient, make_playlist
@ -143,17 +143,17 @@ async def test_ll_hls_stream(hass, hls_stream, stream_worker_sync):
# Fetch playlist
master_playlist_response = await hls_client.get()
assert master_playlist_response.status == 200
assert master_playlist_response.status == HTTPStatus.OK
# Fetch init
master_playlist = await master_playlist_response.text()
init_response = await hls_client.get("/init.mp4")
assert init_response.status == 200
assert init_response.status == HTTPStatus.OK
# Fetch playlist
playlist_url = "/" + master_playlist.splitlines()[-1]
playlist_response = await hls_client.get(playlist_url)
assert playlist_response.status == 200
assert playlist_response.status == HTTPStatus.OK
# Fetch segments
playlist = await playlist_response.text()
@ -163,7 +163,7 @@ async def test_ll_hls_stream(hass, hls_stream, stream_worker_sync):
if match:
segment_url = "/" + match.group("segment_url")
segment_response = await hls_client.get(segment_url)
assert segment_response.status == 200
assert segment_response.status == HTTPStatus.OK
def check_part_is_moof_mdat(data: bytes):
if len(data) < 8 or data[4:8] != b"moof":
@ -200,7 +200,7 @@ async def test_ll_hls_stream(hass, hls_stream, stream_worker_sync):
"Range": f'bytes={match.group("byterange_start")}-{byterange_end}'
},
)
assert part_segment_response.status == 206
assert part_segment_response.status == HTTPStatus.PARTIAL_CONTENT
assert check_part_is_moof_mdat(await part_segment_response.read())
stream_worker_sync.resume()
@ -210,7 +210,7 @@ async def test_ll_hls_stream(hass, hls_stream, stream_worker_sync):
# Ensure playlist not accessible after stream ends
fail_response = await hls_client.get()
assert fail_response.status == HTTP_NOT_FOUND
assert fail_response.status == HTTPStatus.NOT_FOUND
async def test_ll_hls_playlist_view(hass, hls_stream, stream_worker_sync):
@ -244,7 +244,7 @@ async def test_ll_hls_playlist_view(hass, hls_stream, stream_worker_sync):
hls_client = await hls_stream(stream)
resp = await hls_client.get("/playlist.m3u8")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert await resp.text() == make_playlist(
sequence=0,
segments=[
@ -265,7 +265,7 @@ async def test_ll_hls_playlist_view(hass, hls_stream, stream_worker_sync):
await hass.async_block_till_done()
resp = await hls_client.get("/playlist.m3u8")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
assert await resp.text() == make_playlist(
sequence=0,
segments=[
@ -316,10 +316,10 @@ async def test_ll_hls_msn(hass, hls_stream, stream_worker_sync, hls_sync):
msn_responses = await msn_requests
assert msn_responses[0].status == 200
assert msn_responses[1].status == 200
assert msn_responses[2].status == 400
assert msn_responses[3].status == 400
assert msn_responses[0].status == HTTPStatus.OK
assert msn_responses[1].status == HTTPStatus.OK
assert msn_responses[2].status == HTTPStatus.BAD_REQUEST
assert msn_responses[3].status == HTTPStatus.BAD_REQUEST
# Sequence number is now 2. Create six more requests for sequences 0 through 5.
# Calls for msn 0 through 4 should work, 5 should fail.
@ -334,12 +334,12 @@ async def test_ll_hls_msn(hass, hls_stream, stream_worker_sync, hls_sync):
hls.put(segment)
msn_responses = await msn_requests
assert msn_responses[0].status == 200
assert msn_responses[1].status == 200
assert msn_responses[2].status == 200
assert msn_responses[3].status == 200
assert msn_responses[4].status == 200
assert msn_responses[5].status == 400
assert msn_responses[0].status == HTTPStatus.OK
assert msn_responses[1].status == HTTPStatus.OK
assert msn_responses[2].status == HTTPStatus.OK
assert msn_responses[3].status == HTTPStatus.OK
assert msn_responses[4].status == HTTPStatus.OK
assert msn_responses[5].status == HTTPStatus.BAD_REQUEST
stream_worker_sync.resume()
@ -369,7 +369,9 @@ async def test_ll_hls_playlist_bad_msn_part(hass, hls_stream, stream_worker_sync
# If the Playlist URI contains an _HLS_part directive but no _HLS_msn
# directive, the Server MUST return Bad Request, such as HTTP 400.
assert (await hls_client.get("/playlist.m3u8?_HLS_part=1")).status == 400
assert (
await hls_client.get("/playlist.m3u8?_HLS_part=1")
).status == HTTPStatus.BAD_REQUEST
# Seed hls with 1 complete segment and 1 in process segment
segment = create_segment(sequence=0)
@ -398,12 +400,14 @@ async def test_ll_hls_playlist_bad_msn_part(hass, hls_stream, stream_worker_sync
# The following two tests should fail immediately:
# - request with a _HLS_msn of 4
# - request with a _HLS_msn of 1 and a _HLS_part of num_completed_parts-1+advance_part_limit
assert (await hls_client.get("/playlist.m3u8?_HLS_msn=4")).status == 400
assert (
await hls_client.get("/playlist.m3u8?_HLS_msn=4")
).status == HTTPStatus.BAD_REQUEST
assert (
await hls_client.get(
f"/playlist.m3u8?_HLS_msn=1&_HLS_part={num_completed_parts-1+hass.data[DOMAIN][ATTR_SETTINGS].hls_advance_part_limit}"
)
).status == 400
).status == HTTPStatus.BAD_REQUEST
stream_worker_sync.resume()
@ -478,8 +482,8 @@ async def test_ll_hls_playlist_rollover_part(
different_response, *same_responses = await requests
assert different_response.status == 200
assert all(response.status == 200 for response in same_responses)
assert different_response.status == HTTPStatus.OK
assert all(response.status == HTTPStatus.OK for response in same_responses)
different_playlist = await different_response.read()
same_playlists = [await response.read() for response in same_responses]
assert different_playlist != same_playlists[0]
@ -549,8 +553,8 @@ async def test_ll_hls_playlist_msn_part(hass, hls_stream, stream_worker_sync, hl
msn_responses = await msn_requests
# All the responses should succeed except the last one which fails
assert all(response.status == 200 for response in msn_responses[:-1])
assert msn_responses[-1].status == 400
assert all(response.status == HTTPStatus.OK for response in msn_responses[:-1])
assert msn_responses[-1].status == HTTPStatus.BAD_REQUEST
stream_worker_sync.resume()
@ -600,7 +604,7 @@ async def test_get_part_segments(hass, hls_stream, stream_worker_sync, hls_sync)
)
)
responses = await requests
assert all(response.status == 200 for response in responses)
assert all(response.status == HTTPStatus.OK for response in responses)
assert all(
[
await responses[i].read() == segment.parts[i].data
@ -616,7 +620,7 @@ async def test_get_part_segments(hass, hls_stream, stream_worker_sync, hls_sync)
await hls_sync.wait_for_handler()
hls.part_put()
response = await request
assert response.status == 404
assert response.status == HTTPStatus.NOT_FOUND
# Put the remaining parts and complete the segment
while remaining_parts:
@ -641,7 +645,7 @@ async def test_get_part_segments(hass, hls_stream, stream_worker_sync, hls_sync)
complete_segment(segment)
# Check the response
response = await request
assert response.status == 200
assert response.status == HTTPStatus.OK
assert (
await response.read()
== ALT_SEQUENCE_BYTES[: len(hls.get_segment(2).parts[0].data)]

View file

@ -1,7 +1,7 @@
"""Test STT component setup."""
from http import HTTPStatus
from homeassistant.components import stt
from homeassistant.const import HTTP_NOT_FOUND
from homeassistant.setup import async_setup_component
@ -17,7 +17,7 @@ async def test_demo_settings_not_exists(hass, hass_client):
response = await client.get("/api/stt/beer")
assert response.status == HTTP_NOT_FOUND
assert response.status == HTTPStatus.NOT_FOUND
async def test_demo_speech_not_exists(hass, hass_client):
@ -27,4 +27,4 @@ async def test_demo_speech_not_exists(hass, hass_client):
response = await client.post("/api/stt/beer", data=b"test")
assert response.status == HTTP_NOT_FOUND
assert response.status == HTTPStatus.NOT_FOUND

View file

@ -1,5 +1,6 @@
"""Test system log component."""
import asyncio
from http import HTTPStatus
import logging
import queue
from unittest.mock import MagicMock, patch
@ -40,7 +41,7 @@ async def get_error_log(hass, hass_client, expected_count):
client = await hass_client()
resp = await client.get("/api/error/all")
assert resp.status == 200
assert resp.status == HTTPStatus.OK
data = await resp.json()