diff --git a/homeassistant/components/mqtt/client.py b/homeassistant/components/mqtt/client.py index 60f3fd6f856..67d5bb2d49d 100644 --- a/homeassistant/components/mqtt/client.py +++ b/homeassistant/components/mqtt/client.py @@ -111,6 +111,8 @@ RECONNECT_INTERVAL_SECONDS = 10 MAX_SUBSCRIBES_PER_CALL = 500 MAX_UNSUBSCRIBES_PER_CALL = 500 +MAX_PACKETS_TO_READ = 500 + type SocketType = socket.socket | ssl.SSLSocket | mqtt.WebsocketWrapper | Any type SubscribePayloadType = str | bytes # Only bytes if encoding is None @@ -567,7 +569,7 @@ class MQTT: @callback def _async_reader_callback(self, client: mqtt.Client) -> None: """Handle reading data from the socket.""" - if (status := client.loop_read()) != 0: + if (status := client.loop_read(MAX_PACKETS_TO_READ)) != 0: self._async_on_disconnect(status) @callback @@ -629,6 +631,9 @@ class MQTT: self._increase_socket_buffer_size(sock) self.loop.add_reader(sock, partial(self._async_reader_callback, client)) self._async_start_misc_loop() + # Try to consume the buffer right away so it doesn't fill up + # since add_reader will wait for the next loop iteration + self._async_reader_callback(client) @callback def _async_on_socket_close( diff --git a/tests/components/mqtt/test_init.py b/tests/components/mqtt/test_init.py index 9421cddc6a2..0a27c48834a 100644 --- a/tests/components/mqtt/test_init.py +++ b/tests/components/mqtt/test_init.py @@ -4678,7 +4678,7 @@ async def test_loop_write_failure( # Final for the disconnect callback await hass.async_block_till_done() - assert "Disconnected from MQTT server mock-broker:1883 (7)" in caplog.text + assert "Disconnected from MQTT server mock-broker:1883" in caplog.text @pytest.mark.parametrize(