Add dump service to MQTT integration (#31370)

* Add dump service to MQTT integration

* Lint
This commit is contained in:
Paulus Schoutsen 2020-02-02 15:01:52 -08:00 committed by GitHub
parent af105d2d61
commit 81dbdc6b9c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 2 deletions

View file

@ -36,7 +36,7 @@ from homeassistant.exceptions import (
HomeAssistantError,
Unauthorized,
)
from homeassistant.helpers import config_validation as cv, template
from homeassistant.helpers import config_validation as cv, event, template
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.typing import ConfigType, HomeAssistantType, ServiceDataType
@ -68,6 +68,7 @@ DATA_MQTT_CONFIG = "mqtt_config"
DATA_MQTT_HASS_CONFIG = "mqtt_hass_config"
SERVICE_PUBLISH = "publish"
SERVICE_DUMP = "dump"
CONF_EMBEDDED = "embedded"
@ -651,7 +652,7 @@ async def async_setup_entry(hass, entry):
if result == CONNECTION_FAILED_RECOVERABLE:
raise ConfigEntryNotReady
async def async_stop_mqtt(event: Event):
async def async_stop_mqtt(_event: Event):
"""Stop MQTT component."""
await hass.data[DATA_MQTT].async_disconnect()
@ -683,6 +684,40 @@ async def async_setup_entry(hass, entry):
DOMAIN, SERVICE_PUBLISH, async_publish_service, schema=MQTT_PUBLISH_SCHEMA
)
async def async_dump_service(call: ServiceCall):
"""Handle MQTT dump service calls."""
messages = []
@callback
def collect_msg(msg):
messages.append((msg.topic, msg.payload.replace("\n", "")))
unsub = await async_subscribe(hass, call.data["topic"], collect_msg)
def write_dump():
with open(hass.config.path("mqtt_dump.txt"), "wt") as fp:
for msg in messages:
fp.write(",".join(msg) + "\n")
async def finish_dump(_):
"""Write dump to file."""
unsub()
await hass.async_add_executor_job(write_dump)
event.async_call_later(hass, call.data["duration"], finish_dump)
hass.services.async_register(
DOMAIN,
SERVICE_DUMP,
async_dump_service,
schema=vol.Schema(
{
vol.Required("topic"): valid_subscribe_topic,
vol.Optional("duration", default=5): int,
}
),
)
if conf.get(CONF_DISCOVERY):
await _async_setup_discovery(
hass, conf, hass.data[DATA_MQTT_HASS_CONFIG], entry

View file

@ -24,3 +24,14 @@ publish:
description: If message should have the retain flag set.
example: true
default: false
dump:
description: Dump messages on a topic selector to the 'mqtt_dump.txt' file in your config folder.
fields:
topic:
description: topic to listen to
example: "openzwave/#"
duration:
description: how long we should listen for messages in seconds
example: 5
default: 5

View file

@ -1,4 +1,5 @@
"""The tests for the MQTT component."""
from datetime import timedelta
import ssl
import unittest
from unittest import mock
@ -16,10 +17,12 @@ from homeassistant.const import (
from homeassistant.core import callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.setup import async_setup_component
from homeassistant.util.dt import utcnow
from tests.common import (
MockConfigEntry,
async_fire_mqtt_message,
async_fire_time_changed,
async_mock_mqtt_component,
fire_mqtt_message,
get_test_home_assistant,
@ -803,3 +806,25 @@ async def test_mqtt_ws_subscription(hass, hass_ws_client):
await client.send_json({"id": 8, "type": "unsubscribe_events", "subscription": 5})
response = await client.receive_json()
assert response["success"]
async def test_dump_service(hass):
"""Test that we can dump a topic."""
await async_mock_mqtt_component(hass)
mock_open = mock.mock_open()
await hass.services.async_call(
"mqtt", "dump", {"topic": "bla/#", "duration": 3}, blocking=True
)
async_fire_mqtt_message(hass, "bla/1", "test1")
async_fire_mqtt_message(hass, "bla/2", "test2")
with mock.patch("homeassistant.components.mqtt.open", mock_open):
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
await hass.async_block_till_done()
writes = mock_open.return_value.write.mock_calls
assert len(writes) == 2
assert writes[0][1][0] == "bla/1,test1\n"
assert writes[1][1][0] == "bla/2,test2\n"