Increase MQTT broker socket buffer size (#117267)
* Increase MQTT broker socket buffer size * Revert unrelated change * Try to increase buffer size * Set INITIAL_SUBSCRIBE_COOLDOWN back to 0.5 sec * Sinplify and add test * comments * comments --------- Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
parent
f4e8d46ec2
commit
92254772ca
2 changed files with 64 additions and 1 deletions
|
@ -82,8 +82,18 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
MIN_BUFFER_SIZE = 131072 # Minimum buffer size to use if preferred size fails
|
||||||
|
PREFERRED_BUFFER_SIZE = 2097152 # Set receive buffer size to 2MB
|
||||||
|
|
||||||
DISCOVERY_COOLDOWN = 5
|
DISCOVERY_COOLDOWN = 5
|
||||||
INITIAL_SUBSCRIBE_COOLDOWN = 3.0
|
# The initial subscribe cooldown controls how long to wait to group
|
||||||
|
# subscriptions together. This is to avoid making too many subscribe
|
||||||
|
# requests in a short period of time. If the number is too low, the
|
||||||
|
# system will be flooded with subscribe requests. If the number is too
|
||||||
|
# high, we risk being flooded with responses to the subscribe requests
|
||||||
|
# which can exceed the receive buffer size of the socket. To mitigate
|
||||||
|
# this, we increase the receive buffer size of the socket as well.
|
||||||
|
INITIAL_SUBSCRIBE_COOLDOWN = 0.5
|
||||||
SUBSCRIBE_COOLDOWN = 0.1
|
SUBSCRIBE_COOLDOWN = 0.1
|
||||||
UNSUBSCRIBE_COOLDOWN = 0.1
|
UNSUBSCRIBE_COOLDOWN = 0.1
|
||||||
TIMEOUT_ACK = 10
|
TIMEOUT_ACK = 10
|
||||||
|
@ -427,6 +437,7 @@ class MQTT:
|
||||||
hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self._async_ha_stop),
|
hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self._async_ha_stop),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
self._socket_buffersize: int | None = None
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def _async_ha_started(self, _hass: HomeAssistant) -> None:
|
def _async_ha_started(self, _hass: HomeAssistant) -> None:
|
||||||
|
@ -527,6 +538,29 @@ class MQTT:
|
||||||
self.hass, self._misc_loop(), name="mqtt misc loop"
|
self.hass, self._misc_loop(), name="mqtt misc loop"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _increase_socket_buffer_size(self, sock: SocketType) -> None:
|
||||||
|
"""Increase the socket buffer size."""
|
||||||
|
new_buffer_size = PREFERRED_BUFFER_SIZE
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# Some operating systems do not allow us to set the preferred
|
||||||
|
# buffer size. In that case we try some other size options.
|
||||||
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, new_buffer_size)
|
||||||
|
except OSError as err:
|
||||||
|
if new_buffer_size <= MIN_BUFFER_SIZE:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Unable to increase the socket buffer size to %s; "
|
||||||
|
"The connection may be unstable if the MQTT broker "
|
||||||
|
"sends data at volume or a large amount of subscriptions "
|
||||||
|
"need to be processed: %s",
|
||||||
|
new_buffer_size,
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
new_buffer_size //= 2
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
def _on_socket_open(
|
def _on_socket_open(
|
||||||
self, client: mqtt.Client, userdata: Any, sock: SocketType
|
self, client: mqtt.Client, userdata: Any, sock: SocketType
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -543,6 +577,7 @@ class MQTT:
|
||||||
fileno = sock.fileno()
|
fileno = sock.fileno()
|
||||||
_LOGGER.debug("%s: connection opened %s", self.config_entry.title, fileno)
|
_LOGGER.debug("%s: connection opened %s", self.config_entry.title, fileno)
|
||||||
if fileno > -1:
|
if fileno > -1:
|
||||||
|
self._increase_socket_buffer_size(sock)
|
||||||
self.loop.add_reader(sock, partial(self._async_reader_callback, client))
|
self.loop.add_reader(sock, partial(self._async_reader_callback, client))
|
||||||
self._async_start_misc_loop()
|
self._async_start_misc_loop()
|
||||||
|
|
||||||
|
|
|
@ -4406,6 +4406,34 @@ async def test_server_sock_connect_and_disconnect(
|
||||||
assert len(calls) == 0
|
assert len(calls) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
|
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
|
||||||
|
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
|
async def test_server_sock_buffer_size(
|
||||||
|
hass: HomeAssistant,
|
||||||
|
mqtt_client_mock: MqttMockPahoClient,
|
||||||
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
||||||
|
caplog: pytest.LogCaptureFixture,
|
||||||
|
) -> None:
|
||||||
|
"""Test handling the socket buffer size fails."""
|
||||||
|
mqtt_mock = await mqtt_mock_entry()
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
assert mqtt_mock.connected is True
|
||||||
|
|
||||||
|
mqtt_client_mock.loop_misc.return_value = paho_mqtt.MQTT_ERR_SUCCESS
|
||||||
|
|
||||||
|
client, server = socket.socketpair(
|
||||||
|
family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0
|
||||||
|
)
|
||||||
|
client.setblocking(False)
|
||||||
|
server.setblocking(False)
|
||||||
|
with patch.object(client, "setsockopt", side_effect=OSError("foo")):
|
||||||
|
mqtt_client_mock.on_socket_open(mqtt_client_mock, None, client)
|
||||||
|
mqtt_client_mock.on_socket_register_write(mqtt_client_mock, None, client)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
assert "Unable to increase the socket buffer size" in caplog.text
|
||||||
|
|
||||||
|
|
||||||
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
|
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
|
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
|
||||||
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
|
|
Loading…
Add table
Reference in a new issue