Remove duplicate mid handling in MQTT (#116531)
This commit is contained in:
parent
e68901235b
commit
041456759f
2 changed files with 33 additions and 15 deletions
|
@ -617,7 +617,7 @@ class MQTT:
|
||||||
qos,
|
qos,
|
||||||
)
|
)
|
||||||
_raise_on_error(msg_info.rc)
|
_raise_on_error(msg_info.rc)
|
||||||
await self._wait_for_mid(msg_info.mid)
|
await self._async_wait_for_mid(msg_info.mid)
|
||||||
|
|
||||||
async def async_connect(self, client_available: asyncio.Future[bool]) -> None:
|
async def async_connect(self, client_available: asyncio.Future[bool]) -> None:
|
||||||
"""Connect to the host. Does not process messages yet."""
|
"""Connect to the host. Does not process messages yet."""
|
||||||
|
@ -849,7 +849,7 @@ class MQTT:
|
||||||
self._last_subscribe = time.monotonic()
|
self._last_subscribe = time.monotonic()
|
||||||
|
|
||||||
if result == 0:
|
if result == 0:
|
||||||
await self._wait_for_mid(mid)
|
await self._async_wait_for_mid(mid)
|
||||||
else:
|
else:
|
||||||
_raise_on_error(result)
|
_raise_on_error(result)
|
||||||
|
|
||||||
|
@ -866,7 +866,7 @@ class MQTT:
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
_LOGGER.debug("Unsubscribing from %s, mid: %s", topic, mid)
|
_LOGGER.debug("Unsubscribing from %s, mid: %s", topic, mid)
|
||||||
|
|
||||||
await self._wait_for_mid(mid)
|
await self._async_wait_for_mid(mid)
|
||||||
|
|
||||||
async def _async_resubscribe_and_publish_birth_message(
|
async def _async_resubscribe_and_publish_birth_message(
|
||||||
self, birth_message: PublishMessage
|
self, birth_message: PublishMessage
|
||||||
|
@ -1055,8 +1055,8 @@ class MQTT:
|
||||||
# see https://github.com/eclipse/paho.mqtt.python/issues/687
|
# see https://github.com/eclipse/paho.mqtt.python/issues/687
|
||||||
# properties and reason codes are not used in Home Assistant
|
# properties and reason codes are not used in Home Assistant
|
||||||
future = self._async_get_mid_future(mid)
|
future = self._async_get_mid_future(mid)
|
||||||
if future.done():
|
if future.done() and future.exception():
|
||||||
_LOGGER.warning("Received duplicate mid: %s", mid)
|
# Timed out
|
||||||
return
|
return
|
||||||
future.set_result(None)
|
future.set_result(None)
|
||||||
|
|
||||||
|
@ -1104,9 +1104,9 @@ class MQTT:
|
||||||
if not future.done():
|
if not future.done():
|
||||||
future.set_exception(asyncio.TimeoutError)
|
future.set_exception(asyncio.TimeoutError)
|
||||||
|
|
||||||
async def _wait_for_mid(self, mid: int) -> None:
|
async def _async_wait_for_mid(self, mid: int) -> None:
|
||||||
"""Wait for ACK from broker."""
|
"""Wait for ACK from broker."""
|
||||||
# Create the mid event if not created, either _mqtt_handle_mid or _wait_for_mid
|
# Create the mid event if not created, either _mqtt_handle_mid or _async_wait_for_mid
|
||||||
# may be executed first.
|
# may be executed first.
|
||||||
future = self._async_get_mid_future(mid)
|
future = self._async_get_mid_future(mid)
|
||||||
loop = self.hass.loop
|
loop = self.hass.loop
|
||||||
|
|
|
@ -2074,16 +2074,34 @@ async def test_handle_mqtt_on_callback(
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test receiving an ACK callback before waiting for it."""
|
"""Test receiving an ACK callback before waiting for it."""
|
||||||
await mqtt_mock_entry()
|
await mqtt_mock_entry()
|
||||||
# Simulate an ACK for mid == 1, this will call mqtt_mock._mqtt_handle_mid(mid)
|
with patch.object(mqtt_client_mock, "get_mid", return_value=100):
|
||||||
mqtt_client_mock.on_publish(mqtt_client_mock, None, 1)
|
# Simulate an ACK for mid == 100, this will call mqtt_mock._async_get_mid_future(mid)
|
||||||
await hass.async_block_till_done()
|
mqtt_client_mock.on_publish(mqtt_client_mock, None, 100)
|
||||||
# Make sure the ACK has been received
|
await hass.async_block_till_done()
|
||||||
await hass.async_block_till_done()
|
# Make sure the ACK has been received
|
||||||
# Now call publish without call back, this will call _wait_for_mid(msg_info.mid)
|
await hass.async_block_till_done()
|
||||||
await mqtt.async_publish(hass, "no_callback/test-topic", "test-payload")
|
# Now call publish without call back, this will call _async_async_wait_for_mid(msg_info.mid)
|
||||||
# Since the mid event was already set, we should not see any timeout warning in the log
|
await mqtt.async_publish(hass, "no_callback/test-topic", "test-payload")
|
||||||
|
# Since the mid event was already set, we should not see any timeout warning in the log
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
assert "No ACK from MQTT server" not in caplog.text
|
||||||
|
|
||||||
|
|
||||||
|
async def test_handle_mqtt_on_callback_after_timeout(
|
||||||
|
hass: HomeAssistant,
|
||||||
|
caplog: pytest.LogCaptureFixture,
|
||||||
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
||||||
|
mqtt_client_mock: MqttMockPahoClient,
|
||||||
|
) -> None:
|
||||||
|
"""Test receiving an ACK after a timeout."""
|
||||||
|
mqtt_mock = await mqtt_mock_entry()
|
||||||
|
# Simulate the mid future getting a timeout
|
||||||
|
mqtt_mock()._async_get_mid_future(100).set_exception(asyncio.TimeoutError)
|
||||||
|
# Simulate an ACK for mid == 100, being received after the timeout
|
||||||
|
mqtt_client_mock.on_publish(mqtt_client_mock, None, 100)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert "No ACK from MQTT server" not in caplog.text
|
assert "No ACK from MQTT server" not in caplog.text
|
||||||
|
assert "InvalidStateError" not in caplog.text
|
||||||
|
|
||||||
|
|
||||||
async def test_publish_error(
|
async def test_publish_error(
|
||||||
|
|
Loading…
Add table
Reference in a new issue