Try to read multiple packets in MQTT (#118222)
This commit is contained in:
parent
83e4c2927c
commit
6b8223e339
2 changed files with 7 additions and 2 deletions
|
@ -111,6 +111,8 @@ RECONNECT_INTERVAL_SECONDS = 10
|
|||
MAX_SUBSCRIBES_PER_CALL = 500
|
||||
MAX_UNSUBSCRIBES_PER_CALL = 500
|
||||
|
||||
MAX_PACKETS_TO_READ = 500
|
||||
|
||||
type SocketType = socket.socket | ssl.SSLSocket | mqtt.WebsocketWrapper | Any
|
||||
|
||||
type SubscribePayloadType = str | bytes # Only bytes if encoding is None
|
||||
|
@ -567,7 +569,7 @@ class MQTT:
|
|||
@callback
|
||||
def _async_reader_callback(self, client: mqtt.Client) -> None:
|
||||
"""Handle reading data from the socket."""
|
||||
if (status := client.loop_read()) != 0:
|
||||
if (status := client.loop_read(MAX_PACKETS_TO_READ)) != 0:
|
||||
self._async_on_disconnect(status)
|
||||
|
||||
@callback
|
||||
|
@ -629,6 +631,9 @@ class MQTT:
|
|||
self._increase_socket_buffer_size(sock)
|
||||
self.loop.add_reader(sock, partial(self._async_reader_callback, client))
|
||||
self._async_start_misc_loop()
|
||||
# Try to consume the buffer right away so it doesn't fill up
|
||||
# since add_reader will wait for the next loop iteration
|
||||
self._async_reader_callback(client)
|
||||
|
||||
@callback
|
||||
def _async_on_socket_close(
|
||||
|
|
|
@ -4678,7 +4678,7 @@ async def test_loop_write_failure(
|
|||
# Final for the disconnect callback
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert "Disconnected from MQTT server mock-broker:1883 (7)" in caplog.text
|
||||
assert "Disconnected from MQTT server mock-broker:1883" in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
|
Loading…
Add table
Reference in a new issue