diff --git a/homeassistant/components/mqtt/client.py b/homeassistant/components/mqtt/client.py index 09edf3f9b34..02998f5d6dd 100644 --- a/homeassistant/components/mqtt/client.py +++ b/homeassistant/components/mqtt/client.py @@ -82,8 +82,18 @@ if TYPE_CHECKING: _LOGGER = logging.getLogger(__name__) +MIN_BUFFER_SIZE = 131072 # Minimum buffer size to use if preferred size fails +PREFERRED_BUFFER_SIZE = 2097152 # Set receive buffer size to 2MB + DISCOVERY_COOLDOWN = 5 -INITIAL_SUBSCRIBE_COOLDOWN = 3.0 +# The initial subscribe cooldown controls how long to wait to group +# subscriptions together. This is to avoid making too many subscribe +# requests in a short period of time. If the number is too low, the +# system will be flooded with subscribe requests. If the number is too +# high, we risk being flooded with responses to the subscribe requests +# which can exceed the receive buffer size of the socket. To mitigate +# this, we increase the receive buffer size of the socket as well. +INITIAL_SUBSCRIBE_COOLDOWN = 0.5 SUBSCRIBE_COOLDOWN = 0.1 UNSUBSCRIBE_COOLDOWN = 0.1 TIMEOUT_ACK = 10 @@ -427,6 +437,7 @@ class MQTT: hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, self._async_ha_stop), ) ) + self._socket_buffersize: int | None = None @callback def _async_ha_started(self, _hass: HomeAssistant) -> None: @@ -527,6 +538,29 @@ class MQTT: self.hass, self._misc_loop(), name="mqtt misc loop" ) + def _increase_socket_buffer_size(self, sock: SocketType) -> None: + """Increase the socket buffer size.""" + new_buffer_size = PREFERRED_BUFFER_SIZE + while True: + try: + # Some operating systems do not allow us to set the preferred + # buffer size. In that case we try some other size options. + sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, new_buffer_size) + except OSError as err: + if new_buffer_size <= MIN_BUFFER_SIZE: + _LOGGER.warning( + "Unable to increase the socket buffer size to %s; " + "The connection may be unstable if the MQTT broker " + "sends data at volume or a large amount of subscriptions " + "need to be processed: %s", + new_buffer_size, + err, + ) + return + new_buffer_size //= 2 + else: + return + def _on_socket_open( self, client: mqtt.Client, userdata: Any, sock: SocketType ) -> None: @@ -543,6 +577,7 @@ class MQTT: fileno = sock.fileno() _LOGGER.debug("%s: connection opened %s", self.config_entry.title, fileno) if fileno > -1: + self._increase_socket_buffer_size(sock) self.loop.add_reader(sock, partial(self._async_reader_callback, client)) self._async_start_misc_loop() diff --git a/tests/components/mqtt/test_init.py b/tests/components/mqtt/test_init.py index ea836f55c12..e74c1762569 100644 --- a/tests/components/mqtt/test_init.py +++ b/tests/components/mqtt/test_init.py @@ -4406,6 +4406,34 @@ async def test_server_sock_connect_and_disconnect( assert len(calls) == 0 +@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0) +@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0) +@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0) +async def test_server_sock_buffer_size( + hass: HomeAssistant, + mqtt_client_mock: MqttMockPahoClient, + mqtt_mock_entry: MqttMockHAClientGenerator, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test handling the socket buffer size fails.""" + mqtt_mock = await mqtt_mock_entry() + await hass.async_block_till_done() + assert mqtt_mock.connected is True + + mqtt_client_mock.loop_misc.return_value = paho_mqtt.MQTT_ERR_SUCCESS + + client, server = socket.socketpair( + family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0 + ) + client.setblocking(False) + server.setblocking(False) + with patch.object(client, "setsockopt", side_effect=OSError("foo")): + mqtt_client_mock.on_socket_open(mqtt_client_mock, None, client) + mqtt_client_mock.on_socket_register_write(mqtt_client_mock, None, client) + await hass.async_block_till_done() + assert "Unable to increase the socket buffer size" in caplog.text + + @patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0) @patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0) @patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)