Only publish mqtt_statestream when MQTT is started (#89833)
* Only publish mqtt_statestream when ha is started * also catch startup states and use event filter * Add check for MQTT to be available first * Make sure MQTT is available and started * Fix test * Improve test * Reset mock before assertung not called
This commit is contained in:
parent
41ea8fa9b4
commit
6f88fe93ef
2 changed files with 126 additions and 18 deletions
|
@ -1,19 +1,20 @@
|
|||
"""Publish simple item state changes via MQTT."""
|
||||
import json
|
||||
import logging
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components import mqtt
|
||||
from homeassistant.components.mqtt import valid_publish_topic
|
||||
from homeassistant.const import MATCH_ALL
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED
|
||||
from homeassistant.core import Event, HomeAssistant, State, callback
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.entityfilter import (
|
||||
INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA,
|
||||
convert_include_exclude_filter,
|
||||
)
|
||||
from homeassistant.helpers.event import async_track_state_change
|
||||
from homeassistant.helpers.json import JSONEncoder
|
||||
from homeassistant.helpers.start import async_at_start
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
|
||||
CONF_BASE_TOPIC = "base_topic"
|
||||
|
@ -35,23 +36,31 @@ CONFIG_SCHEMA = vol.Schema(
|
|||
extra=vol.ALLOW_EXTRA,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the MQTT state feed."""
|
||||
conf = config[DOMAIN]
|
||||
# Make sure MQTT is available and the entry is loaded
|
||||
if not hass.config_entries.async_entries(
|
||||
mqtt.DOMAIN
|
||||
) or not await hass.config_entries.async_wait_component(
|
||||
hass.config_entries.async_entries(mqtt.DOMAIN)[0]
|
||||
):
|
||||
_LOGGER.error("MQTT integration is not available")
|
||||
return False
|
||||
|
||||
conf: ConfigType = config[DOMAIN]
|
||||
publish_filter = convert_include_exclude_filter(conf)
|
||||
base_topic = conf.get(CONF_BASE_TOPIC)
|
||||
publish_attributes = conf.get(CONF_PUBLISH_ATTRIBUTES)
|
||||
publish_timestamps = conf.get(CONF_PUBLISH_TIMESTAMPS)
|
||||
base_topic: str = conf[CONF_BASE_TOPIC]
|
||||
publish_attributes: bool = conf[CONF_PUBLISH_ATTRIBUTES]
|
||||
publish_timestamps: bool = conf[CONF_PUBLISH_TIMESTAMPS]
|
||||
if not base_topic.endswith("/"):
|
||||
base_topic = f"{base_topic}/"
|
||||
|
||||
async def _state_publisher(entity_id, old_state, new_state):
|
||||
if new_state is None:
|
||||
return
|
||||
|
||||
if not publish_filter(entity_id):
|
||||
return
|
||||
async def _state_publisher(evt: Event) -> None:
|
||||
entity_id: str = evt.data["entity_id"]
|
||||
new_state: State = evt.data["new_state"]
|
||||
|
||||
payload = new_state.state
|
||||
|
||||
|
@ -81,5 +90,28 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|||
encoded_val = json.dumps(val, cls=JSONEncoder)
|
||||
await mqtt.async_publish(hass, mybase + key, encoded_val, 1, True)
|
||||
|
||||
async_track_state_change(hass, MATCH_ALL, _state_publisher)
|
||||
@callback
|
||||
def _ha_started(hass: HomeAssistant) -> None:
|
||||
@callback
|
||||
def _event_filter(evt: Event) -> bool:
|
||||
entity_id: str = evt.data["entity_id"]
|
||||
new_state: State | None = evt.data["new_state"]
|
||||
if new_state is None:
|
||||
return False
|
||||
if not publish_filter(entity_id):
|
||||
return False
|
||||
return True
|
||||
|
||||
callback_handler = hass.bus.async_listen(
|
||||
EVENT_STATE_CHANGED, _state_publisher, _event_filter
|
||||
)
|
||||
|
||||
@callback
|
||||
def _ha_stopping(_: Event) -> None:
|
||||
callback_handler()
|
||||
|
||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _ha_stopping)
|
||||
|
||||
async_at_start(hass, _ha_started)
|
||||
|
||||
return True
|
||||
|
|
|
@ -1,22 +1,25 @@
|
|||
"""The tests for the MQTT statestream component."""
|
||||
from unittest.mock import ANY, call
|
||||
|
||||
import pytest
|
||||
|
||||
import homeassistant.components.mqtt_statestream as statestream
|
||||
from homeassistant.core import HomeAssistant, State
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
||||
from homeassistant.core import CoreState, HomeAssistant, State
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from tests.common import mock_state_change_event
|
||||
from tests.common import MockEntity, MockEntityPlatform, mock_state_change_event
|
||||
from tests.typing import MqttMockHAClient
|
||||
|
||||
|
||||
async def add_statestream(
|
||||
hass,
|
||||
hass: HomeAssistant,
|
||||
base_topic=None,
|
||||
publish_attributes=None,
|
||||
publish_timestamps=None,
|
||||
publish_include=None,
|
||||
publish_exclude=None,
|
||||
):
|
||||
) -> bool:
|
||||
"""Add a mqtt_statestream component."""
|
||||
config = {}
|
||||
if base_topic:
|
||||
|
@ -48,6 +51,59 @@ async def test_setup_succeeds_without_attributes(
|
|||
assert await add_statestream(hass, base_topic="pub")
|
||||
|
||||
|
||||
async def test_setup_and_stop_waits_for_ha(
|
||||
hass: HomeAssistant, mqtt_mock: MqttMockHAClient
|
||||
) -> None:
|
||||
"""Test the success of the setup with a valid base_topic."""
|
||||
e_id = "fake.entity"
|
||||
|
||||
# HA is not running
|
||||
hass.state = CoreState.not_running
|
||||
|
||||
assert await add_statestream(hass, base_topic="pub")
|
||||
await hass.async_block_till_done()
|
||||
# Set a state of an entity
|
||||
mock_state_change_event(hass, State(e_id, "on"))
|
||||
await hass.async_block_till_done()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Make sure 'on' was not published to pub/fake/entity/state
|
||||
mqtt_mock.async_publish.assert_not_called()
|
||||
|
||||
# HA is starting up
|
||||
await hass.async_start()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Change a state of an entity
|
||||
mock_state_change_event(hass, State(e_id, "off"))
|
||||
await hass.async_block_till_done()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
mqtt_mock.async_publish.assert_called_with("pub/fake/entity/state", "off", 1, True)
|
||||
assert mqtt_mock.async_publish.called
|
||||
mqtt_mock.reset_mock()
|
||||
|
||||
# HA is shutting down
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Change a state of an entity
|
||||
mock_state_change_event(hass, State(e_id, "on"))
|
||||
await hass.async_block_till_done()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Make sure 'on' was not published to pub/fake/entity/state
|
||||
mqtt_mock.async_publish.assert_not_called()
|
||||
|
||||
|
||||
async def test_startup_no_mqtt(
|
||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
|
||||
) -> None:
|
||||
"""Test startup without MQTT support."""
|
||||
assert not await add_statestream(hass, base_topic="pub")
|
||||
assert "MQTT integration is not available" in caplog.text
|
||||
|
||||
|
||||
async def test_setup_succeeds_with_attributes(
|
||||
hass: HomeAssistant, mqtt_mock: MqttMockHAClient
|
||||
) -> None:
|
||||
|
@ -78,6 +134,26 @@ async def test_state_changed_event_sends_message(
|
|||
# Make sure 'on' was published to pub/fake/entity/state
|
||||
mqtt_mock.async_publish.assert_called_with("pub/fake/entity/state", "on", 1, True)
|
||||
assert mqtt_mock.async_publish.called
|
||||
mqtt_mock.async_publish.reset_mock()
|
||||
|
||||
# Create a test entity and add it to hass
|
||||
platform = MockEntityPlatform(hass)
|
||||
entity = MockEntity(unique_id="1234")
|
||||
await platform.async_add_entities([entity])
|
||||
|
||||
mqtt_mock.async_publish.assert_called_with(
|
||||
"pub/test_domain/test_platform_1234/state", "unknown", 1, True
|
||||
)
|
||||
mqtt_mock.async_publish.reset_mock()
|
||||
|
||||
state = hass.states.get("test_domain.test_platform_1234")
|
||||
assert state is not None
|
||||
|
||||
# Now remove it, nothing should be published
|
||||
hass.states.async_remove("test_domain.test_platform_1234")
|
||||
await hass.async_block_till_done()
|
||||
await hass.async_block_till_done()
|
||||
mqtt_mock.async_publish.assert_not_called()
|
||||
|
||||
|
||||
async def test_state_changed_event_sends_message_and_timestamp(
|
||||
|
|
Loading…
Add table
Reference in a new issue