Make sure MQTT client is available when starting depending platforms (#91164)

* Make sure MQTT is available starting mqtt_json

* Wait for mqtt client

* Sync client connect

* Simplify

* Addiitional tests async_wait_for_mqtt_client

* Improve comment waiting for mqtt

* Improve docstr

* Do not wait unless the MQTT client is in setup

* Handle entry errors during setup

* More comments - do not clear event

* Add snips and mqtt_room

* Add manual_mqtt

* Update homeassistant/components/mqtt/__init__.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Use a fixture, improve tests

* Simplify

---------

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Jan Bouwhuis 2023-04-20 08:07:35 +02:00 committed by GitHub
parent adc472862b
commit 0bcda9fe9c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 346 additions and 34 deletions

View file

@ -187,13 +187,19 @@ PLATFORM_SCHEMA = vol.Schema(
)
def setup_platform(
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the manual MQTT alarm platform."""
# Make sure MQTT integration is enabled and the client is available
# We cannot count on dependencies as the alarm_control_panel platform setup
# also will be triggered when mqtt is loading the `alarm_control_panel` platform
if not await mqtt.async_wait_for_mqtt_client(hass):
_LOGGER.error("MQTT integration is not available")
return
add_entities(
[
ManualMQTTAlarm(

View file

@ -68,6 +68,7 @@ from .const import ( # noqa: F401
CONF_WS_HEADERS,
CONF_WS_PATH,
DATA_MQTT,
DATA_MQTT_AVAILABLE,
DEFAULT_DISCOVERY,
DEFAULT_ENCODING,
DEFAULT_PREFIX,
@ -87,8 +88,9 @@ from .models import ( # noqa: F401
ReceiveMessage,
ReceivePayloadType,
)
from .util import (
from .util import ( # noqa: F401
async_create_certificate_temp_files,
async_wait_for_mqtt_client,
get_mqtt_data,
mqtt_config_entry_enabled,
valid_publish_topic,
@ -183,34 +185,54 @@ async def _async_config_entry_updated(hass: HomeAssistant, entry: ConfigEntry) -
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Load a config entry."""
conf = dict(entry.data)
# Fetch configuration
hass_config = await conf_util.async_hass_config_yaml(hass)
mqtt_yaml = PLATFORM_CONFIG_SCHEMA_BASE(hass_config.get(DOMAIN, {}))
client = MQTT(hass, entry, conf)
if DOMAIN in hass.data:
mqtt_data = get_mqtt_data(hass)
mqtt_data.config = mqtt_yaml
mqtt_data.client = client
else:
# Initial setup
websocket_api.async_register_command(hass, websocket_subscribe)
websocket_api.async_register_command(hass, websocket_mqtt_info)
hass.data[DATA_MQTT] = mqtt_data = MqttData(config=mqtt_yaml, client=client)
client.start(mqtt_data)
conf: dict[str, Any]
mqtt_data: MqttData
await async_create_certificate_temp_files(hass, dict(entry.data))
# Restore saved subscriptions
if mqtt_data.subscriptions_to_restore:
mqtt_data.client.async_restore_tracked_subscriptions(
mqtt_data.subscriptions_to_restore
async def _setup_client() -> tuple[MqttData, dict[str, Any]]:
"""Set up the MQTT client."""
# Fetch configuration
conf = dict(entry.data)
hass_config = await conf_util.async_hass_config_yaml(hass)
mqtt_yaml = PLATFORM_CONFIG_SCHEMA_BASE(hass_config.get(DOMAIN, {}))
client = MQTT(hass, entry, conf)
if DOMAIN in hass.data:
mqtt_data = get_mqtt_data(hass)
mqtt_data.config = mqtt_yaml
mqtt_data.client = client
else:
# Initial setup
websocket_api.async_register_command(hass, websocket_subscribe)
websocket_api.async_register_command(hass, websocket_mqtt_info)
hass.data[DATA_MQTT] = mqtt_data = MqttData(config=mqtt_yaml, client=client)
client.start(mqtt_data)
await async_create_certificate_temp_files(hass, dict(entry.data))
# Restore saved subscriptions
if mqtt_data.subscriptions_to_restore:
mqtt_data.client.async_restore_tracked_subscriptions(
mqtt_data.subscriptions_to_restore
)
mqtt_data.subscriptions_to_restore = []
mqtt_data.reload_dispatchers.append(
entry.add_update_listener(_async_config_entry_updated)
)
mqtt_data.subscriptions_to_restore = []
mqtt_data.reload_dispatchers.append(
entry.add_update_listener(_async_config_entry_updated)
)
await mqtt_data.client.async_connect()
await mqtt_data.client.async_connect()
return (mqtt_data, conf)
client_available: asyncio.Future[bool]
if DATA_MQTT_AVAILABLE not in hass.data:
client_available = hass.data[DATA_MQTT_AVAILABLE] = asyncio.Future()
else:
client_available = hass.data[DATA_MQTT_AVAILABLE]
setup_ok: bool = False
try:
mqtt_data, conf = await _setup_client()
setup_ok = True
finally:
if not client_available.done():
client_available.set_result(setup_ok)
async def async_publish_service(call: ServiceCall) -> None:
"""Handle MQTT publish service calls."""
@ -565,6 +587,9 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
registry_hooks.popitem()[1]()
# Wait for all ACKs and stop the loop
await mqtt_client.async_disconnect()
# Cleanup MQTT client availability
hass.data.pop(DATA_MQTT_AVAILABLE, None)
# Store remaining subscriptions to be able to restore or reload them
# when the entry is set up again
if subscriptions := mqtt_client.subscriptions:

View file

@ -35,6 +35,7 @@ CONF_CLIENT_CERT = "client_cert"
CONF_TLS_INSECURE = "tls_insecure"
DATA_MQTT = "mqtt"
DATA_MQTT_AVAILABLE = "mqtt_client_available"
DEFAULT_PREFIX = "homeassistant"
DEFAULT_BIRTH_WILL_TOPIC = DEFAULT_PREFIX + "/status"

View file

@ -2,13 +2,16 @@
from __future__ import annotations
import asyncio
import os
from pathlib import Path
import tempfile
from typing import Any
import async_timeout
import voluptuous as vol
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv, template
from homeassistant.helpers.typing import ConfigType
@ -22,6 +25,7 @@ from .const import (
CONF_CLIENT_CERT,
CONF_CLIENT_KEY,
DATA_MQTT,
DATA_MQTT_AVAILABLE,
DEFAULT_ENCODING,
DEFAULT_QOS,
DEFAULT_RETAIN,
@ -29,6 +33,8 @@ from .const import (
)
from .models import MqttData
AVAILABILITY_TIMEOUT = 30.0
TEMP_DIR_NAME = f"home-assistant-{DOMAIN}"
_VALID_QOS_SCHEMA = vol.All(vol.Coerce(int), vol.In([0, 1, 2]))
@ -41,6 +47,37 @@ def mqtt_config_entry_enabled(hass: HomeAssistant) -> bool | None:
return not bool(hass.config_entries.async_entries(DOMAIN)[0].disabled_by)
async def async_wait_for_mqtt_client(hass: HomeAssistant) -> bool:
"""Wait for the MQTT client to become available.
Waits when mqtt set up is in progress,
It is not needed that the client is connected.
Returns True if the mqtt client is available.
Returns False when the client is not available.
"""
if not mqtt_config_entry_enabled(hass):
return False
entry = hass.config_entries.async_entries(DOMAIN)[0]
if entry.state == ConfigEntryState.LOADED:
return True
state_reached_future: asyncio.Future[bool]
if DATA_MQTT_AVAILABLE not in hass.data:
hass.data[DATA_MQTT_AVAILABLE] = state_reached_future = asyncio.Future()
else:
state_reached_future = hass.data[DATA_MQTT_AVAILABLE]
if state_reached_future.done():
return state_reached_future.result()
try:
async with async_timeout.timeout(AVAILABILITY_TIMEOUT):
# Await the client setup or an error state was received
return await state_reached_future
except asyncio.TimeoutError:
return False
def valid_topic(topic: Any) -> str:
"""Validate that this is a valid topic name/filter."""
validated_topic = cv.string(topic)

View file

@ -47,6 +47,13 @@ async def async_setup_scanner(
discovery_info: DiscoveryInfoType | None = None,
) -> bool:
"""Set up the MQTT JSON tracker."""
# Make sure MQTT integration is enabled and the client is available
# We cannot count on dependencies as the device_tracker platform setup
# also will be triggered when mqtt is loading the `device_tracker` platform
if not await mqtt.async_wait_for_mqtt_client(hass):
_LOGGER.error("MQTT integration is not available")
return False
devices = config[CONF_DEVICES]
qos = config[CONF_QOS]

View file

@ -68,6 +68,12 @@ async def async_setup_platform(
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up MQTT room Sensor."""
# Make sure MQTT integration is enabled and the client is available
# We cannot count on dependencies as the sensor platform setup
# also will be triggered when mqtt is loading the `sensor` platform
if not await mqtt.async_wait_for_mqtt_client(hass):
_LOGGER.error("MQTT integration is not available")
return
async_add_entities(
[
MQTTRoomSensor(

View file

@ -90,12 +90,8 @@ SERVICE_SCHEMA_FEEDBACK = vol.Schema(
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Activate Snips component."""
# Make sure MQTT is available and the entry is loaded
if not hass.config_entries.async_entries(
mqtt.DOMAIN
) or not await hass.config_entries.async_wait_component(
hass.config_entries.async_entries(mqtt.DOMAIN)[0]
):
# Make sure MQTT integration is enabled and the client is available
if not await mqtt.async_wait_for_mqtt_client(hass):
_LOGGER.error("MQTT integration is not available")
return False

View file

@ -1506,3 +1506,24 @@ async def test_state_changes_are_published_to_mqtt(
mqtt_mock.async_publish.assert_called_once_with(
"alarm/state", STATE_ALARM_DISARMED, 0, True
)
async def test_no_mqtt(hass: HomeAssistant, caplog: pytest.LogCaptureFixture) -> None:
"""Test publishing of MQTT messages when state changes."""
assert await async_setup_component(
hass,
alarm_control_panel.DOMAIN,
{
alarm_control_panel.DOMAIN: {
"platform": "manual_mqtt",
"name": "test",
"state_topic": "alarm/state",
"command_topic": "alarm/command",
}
},
)
await hass.async_block_till_done()
entity_id = "alarm_control_panel.test"
assert hass.states.get(entity_id) is None
assert "MQTT integration is not available" in caplog.text

View file

@ -1,12 +1,17 @@
"""Test MQTT utils."""
from collections.abc import Callable
from random import getrandbits
from unittest.mock import patch
import pytest
from homeassistant.components import mqtt
from homeassistant.core import HomeAssistant
from homeassistant.config_entries import ConfigEntryDisabler, ConfigEntryState
from homeassistant.core import CoreState, HomeAssistant
from tests.common import MockConfigEntry
from tests.typing import MqttMockHAClient, MqttMockPahoClient
@pytest.fixture(autouse=True)
@ -48,3 +53,163 @@ async def test_reading_non_exitisting_certificate_file() -> None:
assert (
mqtt.util.migrate_certificate_file_to_content("/home/file_not_exists") is None
)
@patch("homeassistant.components.mqtt.PLATFORMS", [])
async def test_waiting_for_client_not_loaded(
hass: HomeAssistant,
mqtt_client_mock: MqttMockPahoClient,
) -> None:
"""Test waiting for client while mqtt entry is not yet loaded."""
hass.state = CoreState.starting
await hass.async_block_till_done()
entry = MockConfigEntry(
domain=mqtt.DOMAIN,
data={"broker": "test-broker"},
state=ConfigEntryState.NOT_LOADED,
)
entry.add_to_hass(hass)
unsubs: list[Callable[[], None]] = []
async def _async_just_in_time_subscribe() -> Callable[[], None]:
nonlocal unsub
assert await mqtt.async_wait_for_mqtt_client(hass)
# Awaiting a second time should work too and return True
assert await mqtt.async_wait_for_mqtt_client(hass)
unsubs.append(await mqtt.async_subscribe(hass, "test_topic", lambda msg: None))
# Simulate some integration waiting for the client to become available
hass.async_add_job(_async_just_in_time_subscribe)
hass.async_add_job(_async_just_in_time_subscribe)
hass.async_add_job(_async_just_in_time_subscribe)
hass.async_add_job(_async_just_in_time_subscribe)
assert entry.state == ConfigEntryState.NOT_LOADED
assert await hass.config_entries.async_setup(entry.entry_id)
assert len(unsubs) == 4
for unsub in unsubs:
unsub()
@patch("homeassistant.components.mqtt.PLATFORMS", [])
async def test_waiting_for_client_loaded(
hass: HomeAssistant,
mqtt_mock: MqttMockHAClient,
) -> None:
"""Test waiting for client where mqtt entry is loaded."""
unsub: Callable[[], None] | None = None
async def _async_just_in_time_subscribe() -> Callable[[], None]:
nonlocal unsub
assert await mqtt.async_wait_for_mqtt_client(hass)
unsub = await mqtt.async_subscribe(hass, "test_topic", lambda msg: None)
entry = hass.config_entries.async_entries(mqtt.DATA_MQTT)[0]
assert entry.state == ConfigEntryState.LOADED
await _async_just_in_time_subscribe()
assert unsub is not None
unsub()
async def test_waiting_for_client_entry_fails(
hass: HomeAssistant,
mqtt_client_mock: MqttMockPahoClient,
) -> None:
"""Test waiting for client where mqtt entry is failing."""
hass.state = CoreState.starting
await hass.async_block_till_done()
entry = MockConfigEntry(
domain=mqtt.DOMAIN,
data={"broker": "test-broker"},
state=ConfigEntryState.NOT_LOADED,
)
entry.add_to_hass(hass)
async def _async_just_in_time_subscribe() -> Callable[[], None]:
assert not await mqtt.async_wait_for_mqtt_client(hass)
hass.async_add_job(_async_just_in_time_subscribe)
assert entry.state == ConfigEntryState.NOT_LOADED
with patch(
"homeassistant.components.mqtt.async_setup_entry",
side_effect=Exception,
):
await hass.config_entries.async_setup(entry.entry_id)
assert entry.state == ConfigEntryState.SETUP_ERROR
async def test_waiting_for_client_setup_fails(
hass: HomeAssistant,
mqtt_client_mock: MqttMockPahoClient,
) -> None:
"""Test waiting for client where mqtt entry is failing during setup."""
hass.state = CoreState.starting
await hass.async_block_till_done()
entry = MockConfigEntry(
domain=mqtt.DOMAIN,
data={"broker": "test-broker"},
state=ConfigEntryState.NOT_LOADED,
)
entry.add_to_hass(hass)
async def _async_just_in_time_subscribe() -> Callable[[], None]:
assert not await mqtt.async_wait_for_mqtt_client(hass)
hass.async_add_job(_async_just_in_time_subscribe)
assert entry.state == ConfigEntryState.NOT_LOADED
# Simulate MQTT setup fails before the client would become available
mqtt_client_mock.connect.side_effect = Exception
assert not await hass.config_entries.async_setup(entry.entry_id)
assert entry.state == ConfigEntryState.SETUP_ERROR
@patch("homeassistant.components.mqtt.util.AVAILABILITY_TIMEOUT", 0.01)
async def test_waiting_for_client_timeout(
hass: HomeAssistant,
) -> None:
"""Test waiting for client with timeout."""
hass.state = CoreState.starting
await hass.async_block_till_done()
entry = MockConfigEntry(
domain=mqtt.DOMAIN,
data={"broker": "test-broker"},
state=ConfigEntryState.NOT_LOADED,
)
entry.add_to_hass(hass)
assert entry.state == ConfigEntryState.NOT_LOADED
# returns False after timeout
assert not await mqtt.async_wait_for_mqtt_client(hass)
async def test_waiting_for_client_with_disabled_entry(
hass: HomeAssistant,
) -> None:
"""Test waiting for client with timeout."""
hass.state = CoreState.starting
await hass.async_block_till_done()
entry = MockConfigEntry(
domain=mqtt.DOMAIN,
data={"broker": "test-broker"},
state=ConfigEntryState.NOT_LOADED,
)
entry.add_to_hass(hass)
# Disable MQTT config entry
await hass.config_entries.async_set_disabled_by(
entry.entry_id, ConfigEntryDisabler.USER
)
assert entry.state == ConfigEntryState.NOT_LOADED
# returns False because entry is disabled
assert not await mqtt.async_wait_for_mqtt_client(hass)

View file

@ -11,6 +11,8 @@ from homeassistant.components.device_tracker.legacy import (
DOMAIN as DT_DOMAIN,
YAML_DEVICES,
)
from homeassistant.components.mqtt import DOMAIN as MQTT_DOMAIN
from homeassistant.config_entries import ConfigEntryDisabler
from homeassistant.const import CONF_PLATFORM
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
@ -39,6 +41,28 @@ async def setup_comp(
os.remove(yaml_devices)
async def test_setup_fails_without_mqtt_being_setup(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Ensure mqtt is started when we setup the component."""
# Simulate MQTT is was removed
mqtt_entry = hass.config_entries.async_entries(MQTT_DOMAIN)[0]
await hass.config_entries.async_unload(mqtt_entry.entry_id)
await hass.config_entries.async_set_disabled_by(
mqtt_entry.entry_id, ConfigEntryDisabler.USER
)
dev_id = "zanzito"
topic = "location/zanzito"
await async_setup_component(
hass,
DT_DOMAIN,
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: topic}}},
)
assert "MQTT integration is not available" in caplog.text
async def test_ensure_device_tracker_platform_validation(hass: HomeAssistant) -> None:
"""Test if platform validation was done."""

View file

@ -3,6 +3,8 @@ import datetime
import json
from unittest.mock import patch
import pytest
from homeassistant.components.mqtt import CONF_QOS, CONF_STATE_TOPIC, DEFAULT_QOS
import homeassistant.components.sensor as sensor
from homeassistant.const import (
@ -56,6 +58,28 @@ async def assert_distance(hass, distance):
assert state.attributes.get("distance") == distance
async def test_no_mqtt(hass: HomeAssistant, caplog: pytest.LogCaptureFixture) -> None:
"""Test no mqtt available."""
assert await async_setup_component(
hass,
sensor.DOMAIN,
{
sensor.DOMAIN: {
CONF_PLATFORM: "mqtt_room",
CONF_NAME: NAME,
CONF_DEVICE_ID: DEVICE_ID,
CONF_STATE_TOPIC: "room_presence",
CONF_QOS: DEFAULT_QOS,
CONF_TIMEOUT: 5,
}
},
)
await hass.async_block_till_done()
state = hass.states.get(SENSOR_STATE)
assert state is None
assert "MQTT integration is not available" in caplog.text
async def test_room_update(hass: HomeAssistant, mqtt_mock: MqttMockHAClient) -> None:
"""Test the updating between rooms."""
assert await async_setup_component(