diff --git a/homeassistant/components/mqtt_statestream/__init__.py b/homeassistant/components/mqtt_statestream/__init__.py index 5213f675460..01425737543 100644 --- a/homeassistant/components/mqtt_statestream/__init__.py +++ b/homeassistant/components/mqtt_statestream/__init__.py @@ -1,19 +1,20 @@ """Publish simple item state changes via MQTT.""" import json +import logging import voluptuous as vol from homeassistant.components import mqtt from homeassistant.components.mqtt import valid_publish_topic -from homeassistant.const import MATCH_ALL -from homeassistant.core import HomeAssistant +from homeassistant.const import EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED +from homeassistant.core import Event, HomeAssistant, State, callback import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entityfilter import ( INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA, convert_include_exclude_filter, ) -from homeassistant.helpers.event import async_track_state_change from homeassistant.helpers.json import JSONEncoder +from homeassistant.helpers.start import async_at_start from homeassistant.helpers.typing import ConfigType CONF_BASE_TOPIC = "base_topic" @@ -35,23 +36,31 @@ CONFIG_SCHEMA = vol.Schema( extra=vol.ALLOW_EXTRA, ) +_LOGGER = logging.getLogger(__name__) + async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the MQTT state feed.""" - conf = config[DOMAIN] + # Make sure MQTT is available and the entry is loaded + if not hass.config_entries.async_entries( + mqtt.DOMAIN + ) or not await hass.config_entries.async_wait_component( + hass.config_entries.async_entries(mqtt.DOMAIN)[0] + ): + _LOGGER.error("MQTT integration is not available") + return False + + conf: ConfigType = config[DOMAIN] publish_filter = convert_include_exclude_filter(conf) - base_topic = conf.get(CONF_BASE_TOPIC) - publish_attributes = conf.get(CONF_PUBLISH_ATTRIBUTES) - publish_timestamps = conf.get(CONF_PUBLISH_TIMESTAMPS) + base_topic: str = conf[CONF_BASE_TOPIC] + publish_attributes: bool = conf[CONF_PUBLISH_ATTRIBUTES] + publish_timestamps: bool = conf[CONF_PUBLISH_TIMESTAMPS] if not base_topic.endswith("/"): base_topic = f"{base_topic}/" - async def _state_publisher(entity_id, old_state, new_state): - if new_state is None: - return - - if not publish_filter(entity_id): - return + async def _state_publisher(evt: Event) -> None: + entity_id: str = evt.data["entity_id"] + new_state: State = evt.data["new_state"] payload = new_state.state @@ -81,5 +90,28 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: encoded_val = json.dumps(val, cls=JSONEncoder) await mqtt.async_publish(hass, mybase + key, encoded_val, 1, True) - async_track_state_change(hass, MATCH_ALL, _state_publisher) + @callback + def _ha_started(hass: HomeAssistant) -> None: + @callback + def _event_filter(evt: Event) -> bool: + entity_id: str = evt.data["entity_id"] + new_state: State | None = evt.data["new_state"] + if new_state is None: + return False + if not publish_filter(entity_id): + return False + return True + + callback_handler = hass.bus.async_listen( + EVENT_STATE_CHANGED, _state_publisher, _event_filter + ) + + @callback + def _ha_stopping(_: Event) -> None: + callback_handler() + + hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _ha_stopping) + + async_at_start(hass, _ha_started) + return True diff --git a/tests/components/mqtt_statestream/test_init.py b/tests/components/mqtt_statestream/test_init.py index fd3430e2c79..130d874cc50 100644 --- a/tests/components/mqtt_statestream/test_init.py +++ b/tests/components/mqtt_statestream/test_init.py @@ -1,22 +1,25 @@ """The tests for the MQTT statestream component.""" from unittest.mock import ANY, call +import pytest + import homeassistant.components.mqtt_statestream as statestream -from homeassistant.core import HomeAssistant, State +from homeassistant.const import EVENT_HOMEASSISTANT_STOP +from homeassistant.core import CoreState, HomeAssistant, State from homeassistant.setup import async_setup_component -from tests.common import mock_state_change_event +from tests.common import MockEntity, MockEntityPlatform, mock_state_change_event from tests.typing import MqttMockHAClient async def add_statestream( - hass, + hass: HomeAssistant, base_topic=None, publish_attributes=None, publish_timestamps=None, publish_include=None, publish_exclude=None, -): +) -> bool: """Add a mqtt_statestream component.""" config = {} if base_topic: @@ -48,6 +51,59 @@ async def test_setup_succeeds_without_attributes( assert await add_statestream(hass, base_topic="pub") +async def test_setup_and_stop_waits_for_ha( + hass: HomeAssistant, mqtt_mock: MqttMockHAClient +) -> None: + """Test the success of the setup with a valid base_topic.""" + e_id = "fake.entity" + + # HA is not running + hass.state = CoreState.not_running + + assert await add_statestream(hass, base_topic="pub") + await hass.async_block_till_done() + # Set a state of an entity + mock_state_change_event(hass, State(e_id, "on")) + await hass.async_block_till_done() + await hass.async_block_till_done() + + # Make sure 'on' was not published to pub/fake/entity/state + mqtt_mock.async_publish.assert_not_called() + + # HA is starting up + await hass.async_start() + await hass.async_block_till_done() + + # Change a state of an entity + mock_state_change_event(hass, State(e_id, "off")) + await hass.async_block_till_done() + await hass.async_block_till_done() + + mqtt_mock.async_publish.assert_called_with("pub/fake/entity/state", "off", 1, True) + assert mqtt_mock.async_publish.called + mqtt_mock.reset_mock() + + # HA is shutting down + hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) + await hass.async_block_till_done() + + # Change a state of an entity + mock_state_change_event(hass, State(e_id, "on")) + await hass.async_block_till_done() + await hass.async_block_till_done() + + # Make sure 'on' was not published to pub/fake/entity/state + mqtt_mock.async_publish.assert_not_called() + + +async def test_startup_no_mqtt( + hass: HomeAssistant, caplog: pytest.LogCaptureFixture +) -> None: + """Test startup without MQTT support.""" + assert not await add_statestream(hass, base_topic="pub") + assert "MQTT integration is not available" in caplog.text + + async def test_setup_succeeds_with_attributes( hass: HomeAssistant, mqtt_mock: MqttMockHAClient ) -> None: @@ -78,6 +134,26 @@ async def test_state_changed_event_sends_message( # Make sure 'on' was published to pub/fake/entity/state mqtt_mock.async_publish.assert_called_with("pub/fake/entity/state", "on", 1, True) assert mqtt_mock.async_publish.called + mqtt_mock.async_publish.reset_mock() + + # Create a test entity and add it to hass + platform = MockEntityPlatform(hass) + entity = MockEntity(unique_id="1234") + await platform.async_add_entities([entity]) + + mqtt_mock.async_publish.assert_called_with( + "pub/test_domain/test_platform_1234/state", "unknown", 1, True + ) + mqtt_mock.async_publish.reset_mock() + + state = hass.states.get("test_domain.test_platform_1234") + assert state is not None + + # Now remove it, nothing should be published + hass.states.async_remove("test_domain.test_platform_1234") + await hass.async_block_till_done() + await hass.async_block_till_done() + mqtt_mock.async_publish.assert_not_called() async def test_state_changed_event_sends_message_and_timestamp(