Don't log same MQTT message multiple times (#34511)
This commit is contained in:
parent
730a257f3c
commit
877eb0c3ad
5 changed files with 86 additions and 12 deletions
|
@ -40,6 +40,7 @@ from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||||
from homeassistant.helpers.entity import Entity
|
from homeassistant.helpers.entity import Entity
|
||||||
from homeassistant.helpers.typing import ConfigType, HomeAssistantType, ServiceDataType
|
from homeassistant.helpers.typing import ConfigType, HomeAssistantType, ServiceDataType
|
||||||
from homeassistant.loader import bind_hass
|
from homeassistant.loader import bind_hass
|
||||||
|
from homeassistant.util import dt as dt_util
|
||||||
from homeassistant.util.async_ import run_callback_threadsafe
|
from homeassistant.util.async_ import run_callback_threadsafe
|
||||||
from homeassistant.util.logging import catch_log_exception
|
from homeassistant.util.logging import catch_log_exception
|
||||||
|
|
||||||
|
@ -945,7 +946,8 @@ class MQTT:
|
||||||
self.async_publish( # pylint: disable=no-value-for-parameter
|
self.async_publish( # pylint: disable=no-value-for-parameter
|
||||||
*attr.astuple(
|
*attr.astuple(
|
||||||
self.birth_message,
|
self.birth_message,
|
||||||
filter=lambda attr, value: attr.name != "subscribed_topic",
|
filter=lambda attr, value: attr.name
|
||||||
|
not in ["subscribed_topic", "timestamp"],
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -962,6 +964,7 @@ class MQTT:
|
||||||
" (retained)" if msg.retain else "",
|
" (retained)" if msg.retain else "",
|
||||||
msg.payload,
|
msg.payload,
|
||||||
)
|
)
|
||||||
|
timestamp = dt_util.utcnow()
|
||||||
|
|
||||||
for subscription in self.subscriptions:
|
for subscription in self.subscriptions:
|
||||||
if not _match_topic(subscription.topic, msg.topic):
|
if not _match_topic(subscription.topic, msg.topic):
|
||||||
|
@ -983,7 +986,14 @@ class MQTT:
|
||||||
|
|
||||||
self.hass.async_run_job(
|
self.hass.async_run_job(
|
||||||
subscription.callback,
|
subscription.callback,
|
||||||
Message(msg.topic, payload, msg.qos, msg.retain, subscription.topic),
|
Message(
|
||||||
|
msg.topic,
|
||||||
|
payload,
|
||||||
|
msg.qos,
|
||||||
|
msg.retain,
|
||||||
|
subscription.topic,
|
||||||
|
timestamp,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
def _mqtt_on_disconnect(self, _mqttc, _userdata, result_code: int) -> None:
|
def _mqtt_on_disconnect(self, _mqttc, _userdata, result_code: int) -> None:
|
||||||
|
|
|
@ -24,7 +24,8 @@ def log_messages(hass: HomeAssistantType, entity_id: str) -> MessageCallbackType
|
||||||
messages = debug_info["entities"][entity_id]["subscriptions"][
|
messages = debug_info["entities"][entity_id]["subscriptions"][
|
||||||
msg.subscribed_topic
|
msg.subscribed_topic
|
||||||
]
|
]
|
||||||
messages.append((msg.payload, msg.topic))
|
if msg not in messages:
|
||||||
|
messages.append(msg)
|
||||||
|
|
||||||
def _decorator(msg_callback: MessageCallbackType):
|
def _decorator(msg_callback: MessageCallbackType):
|
||||||
@wraps(msg_callback)
|
@wraps(msg_callback)
|
||||||
|
@ -125,7 +126,8 @@ async def info_for_device(hass, device_id):
|
||||||
{
|
{
|
||||||
"topic": topic,
|
"topic": topic,
|
||||||
"messages": [
|
"messages": [
|
||||||
{"payload": msg[0], "topic": msg[1]} for msg in list(messages)
|
{"payload": msg.payload, "time": msg.timestamp, "topic": msg.topic}
|
||||||
|
for msg in list(messages)
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
for topic, messages in entity_info["subscriptions"].items()
|
for topic, messages in entity_info["subscriptions"].items()
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
"""Modesl used by multiple MQTT modules."""
|
"""Modesl used by multiple MQTT modules."""
|
||||||
|
import datetime as dt
|
||||||
from typing import Callable, Union
|
from typing import Callable, Union
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
@ -15,6 +16,7 @@ class Message:
|
||||||
qos = attr.ib(type=int)
|
qos = attr.ib(type=int)
|
||||||
retain = attr.ib(type=bool)
|
retain = attr.ib(type=bool)
|
||||||
subscribed_topic = attr.ib(type=str, default=None)
|
subscribed_topic = attr.ib(type=str, default=None)
|
||||||
|
timestamp = attr.ib(type=dt.datetime, default=None)
|
||||||
|
|
||||||
|
|
||||||
MessageCallbackType = Callable[[Message], None]
|
MessageCallbackType = Callable[[Message], None]
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
"""Common test objects."""
|
"""Common test objects."""
|
||||||
import copy
|
import copy
|
||||||
|
from datetime import datetime
|
||||||
import json
|
import json
|
||||||
|
from unittest import mock
|
||||||
from unittest.mock import ANY
|
from unittest.mock import ANY
|
||||||
|
|
||||||
from homeassistant.components import mqtt
|
from homeassistant.components import mqtt
|
||||||
|
@ -586,8 +588,11 @@ async def help_test_entity_debug_info_max_messages(hass, mqtt_mock, domain, conf
|
||||||
"subscriptions"
|
"subscriptions"
|
||||||
]
|
]
|
||||||
|
|
||||||
for i in range(0, debug_info.STORED_MESSAGES + 1):
|
start_dt = datetime(2019, 1, 1, 0, 0, 0)
|
||||||
async_fire_mqtt_message(hass, "test-topic", f"{i}")
|
with mock.patch("homeassistant.util.dt.utcnow") as dt_utcnow:
|
||||||
|
dt_utcnow.return_value = start_dt
|
||||||
|
for i in range(0, debug_info.STORED_MESSAGES + 1):
|
||||||
|
async_fire_mqtt_message(hass, "test-topic", f"{i}")
|
||||||
|
|
||||||
debug_info_data = await debug_info.info_for_device(hass, device.id)
|
debug_info_data = await debug_info.info_for_device(hass, device.id)
|
||||||
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
|
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
|
||||||
|
@ -596,7 +601,7 @@ async def help_test_entity_debug_info_max_messages(hass, mqtt_mock, domain, conf
|
||||||
== debug_info.STORED_MESSAGES
|
== debug_info.STORED_MESSAGES
|
||||||
)
|
)
|
||||||
messages = [
|
messages = [
|
||||||
{"topic": "test-topic", "payload": f"{i}"}
|
{"topic": "test-topic", "payload": f"{i}", "time": start_dt}
|
||||||
for i in range(1, debug_info.STORED_MESSAGES + 1)
|
for i in range(1, debug_info.STORED_MESSAGES + 1)
|
||||||
]
|
]
|
||||||
assert {"topic": "test-topic", "messages": messages} in debug_info_data["entities"][
|
assert {"topic": "test-topic", "messages": messages} in debug_info_data["entities"][
|
||||||
|
@ -642,13 +647,16 @@ async def help_test_entity_debug_info_message(
|
||||||
"subscriptions"
|
"subscriptions"
|
||||||
]
|
]
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, topic, payload)
|
start_dt = datetime(2019, 1, 1, 0, 0, 0)
|
||||||
|
with mock.patch("homeassistant.util.dt.utcnow") as dt_utcnow:
|
||||||
|
dt_utcnow.return_value = start_dt
|
||||||
|
async_fire_mqtt_message(hass, topic, payload)
|
||||||
|
|
||||||
debug_info_data = await debug_info.info_for_device(hass, device.id)
|
debug_info_data = await debug_info.info_for_device(hass, device.id)
|
||||||
assert len(debug_info_data["entities"][0]["subscriptions"]) >= 1
|
assert len(debug_info_data["entities"][0]["subscriptions"]) >= 1
|
||||||
assert {
|
assert {
|
||||||
"topic": topic,
|
"topic": topic,
|
||||||
"messages": [{"topic": topic, "payload": payload}],
|
"messages": [{"topic": topic, "payload": payload, "time": start_dt}],
|
||||||
} in debug_info_data["entities"][0]["subscriptions"]
|
} in debug_info_data["entities"][0]["subscriptions"]
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
"""The tests for the MQTT component."""
|
"""The tests for the MQTT component."""
|
||||||
from datetime import timedelta
|
from datetime import datetime, timedelta
|
||||||
import json
|
import json
|
||||||
import ssl
|
import ssl
|
||||||
import unittest
|
import unittest
|
||||||
|
@ -1266,11 +1266,63 @@ async def test_debug_info_wildcard(hass, mqtt_mock):
|
||||||
"subscriptions"
|
"subscriptions"
|
||||||
]
|
]
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "sensor/abc", "123")
|
start_dt = datetime(2019, 1, 1, 0, 0, 0)
|
||||||
|
with mock.patch("homeassistant.util.dt.utcnow") as dt_utcnow:
|
||||||
|
dt_utcnow.return_value = start_dt
|
||||||
|
async_fire_mqtt_message(hass, "sensor/abc", "123")
|
||||||
|
|
||||||
debug_info_data = await debug_info.info_for_device(hass, device.id)
|
debug_info_data = await debug_info.info_for_device(hass, device.id)
|
||||||
assert len(debug_info_data["entities"][0]["subscriptions"]) >= 1
|
assert len(debug_info_data["entities"][0]["subscriptions"]) >= 1
|
||||||
assert {
|
assert {
|
||||||
"topic": "sensor/#",
|
"topic": "sensor/#",
|
||||||
"messages": [{"topic": "sensor/abc", "payload": "123"}],
|
"messages": [{"topic": "sensor/abc", "payload": "123", "time": start_dt}],
|
||||||
} in debug_info_data["entities"][0]["subscriptions"]
|
} in debug_info_data["entities"][0]["subscriptions"]
|
||||||
|
|
||||||
|
|
||||||
|
async def test_debug_info_filter_same(hass, mqtt_mock):
|
||||||
|
"""Test debug info removes messages with same timestamp."""
|
||||||
|
config = {
|
||||||
|
"device": {"identifiers": ["helloworld"]},
|
||||||
|
"platform": "mqtt",
|
||||||
|
"name": "test",
|
||||||
|
"state_topic": "sensor/#",
|
||||||
|
"unique_id": "veryunique",
|
||||||
|
}
|
||||||
|
|
||||||
|
entry = MockConfigEntry(domain=mqtt.DOMAIN)
|
||||||
|
entry.add_to_hass(hass)
|
||||||
|
await async_start(hass, "homeassistant", {}, entry)
|
||||||
|
registry = await hass.helpers.device_registry.async_get_registry()
|
||||||
|
|
||||||
|
data = json.dumps(config)
|
||||||
|
async_fire_mqtt_message(hass, "homeassistant/sensor/bla/config", data)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
device = registry.async_get_device({("mqtt", "helloworld")}, set())
|
||||||
|
assert device is not None
|
||||||
|
|
||||||
|
debug_info_data = await debug_info.info_for_device(hass, device.id)
|
||||||
|
assert len(debug_info_data["entities"][0]["subscriptions"]) >= 1
|
||||||
|
assert {"topic": "sensor/#", "messages": []} in debug_info_data["entities"][0][
|
||||||
|
"subscriptions"
|
||||||
|
]
|
||||||
|
|
||||||
|
dt1 = datetime(2019, 1, 1, 0, 0, 0)
|
||||||
|
dt2 = datetime(2019, 1, 1, 0, 0, 1)
|
||||||
|
with mock.patch("homeassistant.util.dt.utcnow") as dt_utcnow:
|
||||||
|
dt_utcnow.return_value = dt1
|
||||||
|
async_fire_mqtt_message(hass, "sensor/abc", "123")
|
||||||
|
async_fire_mqtt_message(hass, "sensor/abc", "123")
|
||||||
|
dt_utcnow.return_value = dt2
|
||||||
|
async_fire_mqtt_message(hass, "sensor/abc", "123")
|
||||||
|
|
||||||
|
debug_info_data = await debug_info.info_for_device(hass, device.id)
|
||||||
|
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
|
||||||
|
assert len(debug_info_data["entities"][0]["subscriptions"][0]["messages"]) == 2
|
||||||
|
assert {
|
||||||
|
"topic": "sensor/#",
|
||||||
|
"messages": [
|
||||||
|
{"topic": "sensor/abc", "payload": "123", "time": dt1},
|
||||||
|
{"topic": "sensor/abc", "payload": "123", "time": dt2},
|
||||||
|
],
|
||||||
|
} == debug_info_data["entities"][0]["subscriptions"][0]
|
||||||
|
|
Loading…
Add table
Reference in a new issue