J. Nick Koston 2848f8648d
Log last message when websocket reaches peak limit (#93038)
When we hit the absolute limit, we would log the last messages as
it was key to finding out the source. We now do the same when
we hit the peak limit
2023-05-14 12:22:19 -04:00

283 lines
9 KiB

"""Test Websocket API http module."""
import asyncio
from datetime import timedelta
from typing import Any, cast
from unittest.mock import patch
from aiohttp import ServerDisconnectedError, WSMsgType, web
import pytest
from homeassistant.components.websocket_api import (
from homeassistant.components.websocket_api.connection import ActiveConnection
from homeassistant.core import HomeAssistant, callback
from homeassistant.util.dt import utcnow
from tests.common import async_fire_time_changed
from tests.typing import WebSocketGenerator
def mock_low_queue():
"""Mock a low queue."""
with patch("homeassistant.components.websocket_api.http.MAX_PENDING_MSG", 1):
def mock_low_peak():
"""Mock a low queue."""
with patch("homeassistant.components.websocket_api.http.PENDING_MSG_PEAK", 5):
async def test_pending_msg_overflow(
hass: HomeAssistant, mock_low_queue, websocket_client
) -> None:
"""Test get_panels command."""
for idx in range(10):
await websocket_client.send_json({"id": idx + 1, "type": "ping"})
msg = await websocket_client.receive()
assert msg.type == WSMsgType.close
async def test_pending_msg_peak(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test pending msg overflow command."""
orig_handler = http.WebSocketHandler
setup_instance: http.WebSocketHandler | None = None
def instantiate_handler(*args):
nonlocal setup_instance
setup_instance = orig_handler(*args)
return setup_instance
with patch(
websocket_client = await hass_ws_client()
instance: http.WebSocketHandler = cast(http.WebSocketHandler, setup_instance)
# Fill the queue past the allowed peak
for _ in range(10):
instance._send_message({"overload": "message"})
hass, utcnow() + timedelta(seconds=const.PENDING_MSG_PEAK_TIME + 1)
msg = await websocket_client.receive()
assert msg.type == WSMsgType.close
assert "Client unable to keep up with pending messages" in caplog.text
assert "Stayed over 5 for 5 seconds" in caplog.text
assert "overload" in caplog.text
async def test_pending_msg_peak_recovery(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test pending msg nears the peak but recovers."""
orig_handler = http.WebSocketHandler
setup_instance: http.WebSocketHandler | None = None
def instantiate_handler(*args):
nonlocal setup_instance
setup_instance = orig_handler(*args)
return setup_instance
with patch(
websocket_client = await hass_ws_client()
instance: http.WebSocketHandler = cast(http.WebSocketHandler, setup_instance)
# Make sure the call later is started
for _ in range(10):
for _ in range(10):
msg = await websocket_client.receive()
assert msg.type == WSMsgType.TEXT
msg = await websocket_client.receive()
assert msg.type == WSMsgType.TEXT
# Cleanly shutdown
msg = await websocket_client.receive()
assert msg.type == WSMsgType.TEXT
msg = await websocket_client.receive()
assert msg.type == WSMsgType.close
assert "Client unable to keep up with pending messages" not in caplog.text
async def test_pending_msg_peak_but_does_not_overflow(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test pending msg hits the low peak but recovers and does not overflow."""
orig_handler = http.WebSocketHandler
setup_instance: http.WebSocketHandler | None = None
def instantiate_handler(*args):
nonlocal setup_instance
setup_instance = orig_handler(*args)
return setup_instance
with patch(
websocket_client = await hass_ws_client()
instance: http.WebSocketHandler = cast(http.WebSocketHandler, setup_instance)
# Kill writer task and fill queue past peak
for _ in range(5):
# Trigger the peak check
# Clear the queue
# Trigger the peak clear
hass, utcnow() + timedelta(seconds=const.PENDING_MSG_PEAK_TIME + 1)
msg = await websocket_client.receive()
assert msg.type == WSMsgType.TEXT
assert "Client unable to keep up with pending messages" not in caplog.text
async def test_non_json_message(
hass: HomeAssistant, websocket_client, caplog: pytest.LogCaptureFixture
) -> None:
"""Test trying to serialize non JSON objects."""
bad_data = object()
hass.states.async_set("test_domain.entity", "testing", {"bad": bad_data})
await websocket_client.send_json({"id": 5, "type": "get_states"})
msg = await websocket_client.receive_json()
assert msg["id"] == 5
assert msg["type"] == const.TYPE_RESULT
assert msg["success"]
assert msg["result"] == []
assert (
f"Unable to serialize to JSON. Bad data found at $.result[0](State: test_domain.entity).attributes.bad={bad_data}(<class 'object'>"
in caplog.text
async def test_prepare_fail(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test failing to prepare."""
with patch(
side_effect=(asyncio.TimeoutError, web.WebSocketResponse.prepare),
), pytest.raises(ServerDisconnectedError):
await hass_ws_client(hass)
assert "Timeout preparing request" in caplog.text
async def test_binary_message(
hass: HomeAssistant, websocket_client, caplog: pytest.LogCaptureFixture
) -> None:
"""Test binary messages."""
binary_payloads = {
104: ([], asyncio.Future()),
105: ([], asyncio.Future()),
# Register a handler
"type": "get_binary_message_handler",
def get_binary_message_handler(
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
unsub = None
def binary_message_handler(
hass: HomeAssistant, connection: ActiveConnection, payload: bytes
nonlocal unsub
if msg["id"] == 103:
raise ValueError("Boom")
if payload:
prefix, unsub = connection.async_register_binary_handler(binary_message_handler)
connection.send_result(msg["id"], {"prefix": prefix})
async_register_command(hass, get_binary_message_handler)
# Register multiple binary handlers
for i in range(101, 106):
await websocket_client.send_json(
{"id": i, "type": "get_binary_message_handler"}
result = await websocket_client.receive_json()
assert result["id"] == i
assert result["type"] == const.TYPE_RESULT
assert result["success"]
assert result["result"]["prefix"] == i - 100
# Send message to binary
await websocket_client.send_bytes((0).to_bytes(1, "big") + b"test0")
await websocket_client.send_bytes((3).to_bytes(1, "big") + b"test3")
await websocket_client.send_bytes((3).to_bytes(1, "big") + b"test3")
await websocket_client.send_bytes((10).to_bytes(1, "big") + b"test10")
await websocket_client.send_bytes((4).to_bytes(1, "big") + b"test4")
await websocket_client.send_bytes((4).to_bytes(1, "big") + b"")
await websocket_client.send_bytes((5).to_bytes(1, "big") + b"test5")
await websocket_client.send_bytes((5).to_bytes(1, "big") + b"test5-2")
await websocket_client.send_bytes((5).to_bytes(1, "big") + b"")
# Verify received
assert await binary_payloads[104][1] == b"test4"
assert await binary_payloads[105][1] == b"test5test5-2"
assert "Error handling binary message" in caplog.text
assert "Received binary message for non-existing handler 0" in caplog.text
assert "Received binary message for non-existing handler 3" in caplog.text
assert "Received binary message for non-existing handler 10" in caplog.text