From 041456759fb20f88923d593312db7d48d54454dd Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 1 May 2024 15:18:44 -0500 Subject: [PATCH] Remove duplicate mid handling in MQTT (#116531) --- homeassistant/components/mqtt/client.py | 14 +++++----- tests/components/mqtt/test_init.py | 34 +++++++++++++++++++------ 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/homeassistant/components/mqtt/client.py b/homeassistant/components/mqtt/client.py index e96ad9318d5..88f9598596b 100644 --- a/homeassistant/components/mqtt/client.py +++ b/homeassistant/components/mqtt/client.py @@ -617,7 +617,7 @@ class MQTT: qos, ) _raise_on_error(msg_info.rc) - await self._wait_for_mid(msg_info.mid) + await self._async_wait_for_mid(msg_info.mid) async def async_connect(self, client_available: asyncio.Future[bool]) -> None: """Connect to the host. Does not process messages yet.""" @@ -849,7 +849,7 @@ class MQTT: self._last_subscribe = time.monotonic() if result == 0: - await self._wait_for_mid(mid) + await self._async_wait_for_mid(mid) else: _raise_on_error(result) @@ -866,7 +866,7 @@ class MQTT: for topic in topics: _LOGGER.debug("Unsubscribing from %s, mid: %s", topic, mid) - await self._wait_for_mid(mid) + await self._async_wait_for_mid(mid) async def _async_resubscribe_and_publish_birth_message( self, birth_message: PublishMessage @@ -1055,8 +1055,8 @@ class MQTT: # see https://github.com/eclipse/paho.mqtt.python/issues/687 # properties and reason codes are not used in Home Assistant future = self._async_get_mid_future(mid) - if future.done(): - _LOGGER.warning("Received duplicate mid: %s", mid) + if future.done() and future.exception(): + # Timed out return future.set_result(None) @@ -1104,9 +1104,9 @@ class MQTT: if not future.done(): future.set_exception(asyncio.TimeoutError) - async def _wait_for_mid(self, mid: int) -> None: + async def _async_wait_for_mid(self, mid: int) -> None: """Wait for ACK from broker.""" - # Create the mid event if not created, either _mqtt_handle_mid or _wait_for_mid + # Create the mid event if not created, either _mqtt_handle_mid or _async_wait_for_mid # may be executed first. future = self._async_get_mid_future(mid) loop = self.hass.loop diff --git a/tests/components/mqtt/test_init.py b/tests/components/mqtt/test_init.py index 94a8c4831b4..6cfb37df29b 100644 --- a/tests/components/mqtt/test_init.py +++ b/tests/components/mqtt/test_init.py @@ -2074,16 +2074,34 @@ async def test_handle_mqtt_on_callback( ) -> None: """Test receiving an ACK callback before waiting for it.""" await mqtt_mock_entry() - # Simulate an ACK for mid == 1, this will call mqtt_mock._mqtt_handle_mid(mid) - mqtt_client_mock.on_publish(mqtt_client_mock, None, 1) - await hass.async_block_till_done() - # Make sure the ACK has been received - await hass.async_block_till_done() - # Now call publish without call back, this will call _wait_for_mid(msg_info.mid) - await mqtt.async_publish(hass, "no_callback/test-topic", "test-payload") - # Since the mid event was already set, we should not see any timeout warning in the log + with patch.object(mqtt_client_mock, "get_mid", return_value=100): + # Simulate an ACK for mid == 100, this will call mqtt_mock._async_get_mid_future(mid) + mqtt_client_mock.on_publish(mqtt_client_mock, None, 100) + await hass.async_block_till_done() + # Make sure the ACK has been received + await hass.async_block_till_done() + # Now call publish without call back, this will call _async_async_wait_for_mid(msg_info.mid) + await mqtt.async_publish(hass, "no_callback/test-topic", "test-payload") + # Since the mid event was already set, we should not see any timeout warning in the log + await hass.async_block_till_done() + assert "No ACK from MQTT server" not in caplog.text + + +async def test_handle_mqtt_on_callback_after_timeout( + hass: HomeAssistant, + caplog: pytest.LogCaptureFixture, + mqtt_mock_entry: MqttMockHAClientGenerator, + mqtt_client_mock: MqttMockPahoClient, +) -> None: + """Test receiving an ACK after a timeout.""" + mqtt_mock = await mqtt_mock_entry() + # Simulate the mid future getting a timeout + mqtt_mock()._async_get_mid_future(100).set_exception(asyncio.TimeoutError) + # Simulate an ACK for mid == 100, being received after the timeout + mqtt_client_mock.on_publish(mqtt_client_mock, None, 100) await hass.async_block_till_done() assert "No ACK from MQTT server" not in caplog.text + assert "InvalidStateError" not in caplog.text async def test_publish_error(