Simplify mqtt connection state dispatcher (#118184)
This commit is contained in:
parent
b7f1f805fa
commit
226d010ab2
6 changed files with 17 additions and 42 deletions
|
@ -14,7 +14,7 @@ from homeassistant import config as conf_util
|
||||||
from homeassistant.components import websocket_api
|
from homeassistant.components import websocket_api
|
||||||
from homeassistant.config_entries import ConfigEntry
|
from homeassistant.config_entries import ConfigEntry
|
||||||
from homeassistant.const import CONF_DISCOVERY, CONF_PAYLOAD, SERVICE_RELOAD
|
from homeassistant.const import CONF_DISCOVERY, CONF_PAYLOAD, SERVICE_RELOAD
|
||||||
from homeassistant.core import HassJob, HomeAssistant, ServiceCall, callback
|
from homeassistant.core import HomeAssistant, ServiceCall, callback
|
||||||
from homeassistant.exceptions import (
|
from homeassistant.exceptions import (
|
||||||
ConfigValidationError,
|
ConfigValidationError,
|
||||||
ServiceValidationError,
|
ServiceValidationError,
|
||||||
|
@ -72,8 +72,7 @@ from .const import ( # noqa: F401
|
||||||
DEFAULT_QOS,
|
DEFAULT_QOS,
|
||||||
DEFAULT_RETAIN,
|
DEFAULT_RETAIN,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
MQTT_CONNECTED,
|
MQTT_CONNECTION_STATE,
|
||||||
MQTT_DISCONNECTED,
|
|
||||||
RELOADABLE_PLATFORMS,
|
RELOADABLE_PLATFORMS,
|
||||||
TEMPLATE_ERRORS,
|
TEMPLATE_ERRORS,
|
||||||
)
|
)
|
||||||
|
@ -475,29 +474,9 @@ def async_subscribe_connection_status(
|
||||||
hass: HomeAssistant, connection_status_callback: ConnectionStatusCallback
|
hass: HomeAssistant, connection_status_callback: ConnectionStatusCallback
|
||||||
) -> Callable[[], None]:
|
) -> Callable[[], None]:
|
||||||
"""Subscribe to MQTT connection changes."""
|
"""Subscribe to MQTT connection changes."""
|
||||||
connection_status_callback_job = HassJob(connection_status_callback)
|
return async_dispatcher_connect(
|
||||||
|
hass, MQTT_CONNECTION_STATE, connection_status_callback
|
||||||
async def connected() -> None:
|
)
|
||||||
task = hass.async_run_hass_job(connection_status_callback_job, True)
|
|
||||||
if task:
|
|
||||||
await task
|
|
||||||
|
|
||||||
async def disconnected() -> None:
|
|
||||||
task = hass.async_run_hass_job(connection_status_callback_job, False)
|
|
||||||
if task:
|
|
||||||
await task
|
|
||||||
|
|
||||||
subscriptions = {
|
|
||||||
"connect": async_dispatcher_connect(hass, MQTT_CONNECTED, connected),
|
|
||||||
"disconnect": async_dispatcher_connect(hass, MQTT_DISCONNECTED, disconnected),
|
|
||||||
}
|
|
||||||
|
|
||||||
@callback
|
|
||||||
def unsubscribe() -> None:
|
|
||||||
subscriptions["connect"]()
|
|
||||||
subscriptions["disconnect"]()
|
|
||||||
|
|
||||||
return unsubscribe
|
|
||||||
|
|
||||||
|
|
||||||
def is_connected(hass: HomeAssistant) -> bool:
|
def is_connected(hass: HomeAssistant) -> bool:
|
||||||
|
|
|
@ -69,8 +69,7 @@ from .const import (
|
||||||
DEFAULT_WS_HEADERS,
|
DEFAULT_WS_HEADERS,
|
||||||
DEFAULT_WS_PATH,
|
DEFAULT_WS_PATH,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
MQTT_CONNECTED,
|
MQTT_CONNECTION_STATE,
|
||||||
MQTT_DISCONNECTED,
|
|
||||||
PROTOCOL_5,
|
PROTOCOL_5,
|
||||||
PROTOCOL_31,
|
PROTOCOL_31,
|
||||||
TRANSPORT_WEBSOCKETS,
|
TRANSPORT_WEBSOCKETS,
|
||||||
|
@ -1033,7 +1032,7 @@ class MQTT:
|
||||||
return
|
return
|
||||||
|
|
||||||
self.connected = True
|
self.connected = True
|
||||||
async_dispatcher_send(self.hass, MQTT_CONNECTED)
|
async_dispatcher_send(self.hass, MQTT_CONNECTION_STATE, True)
|
||||||
_LOGGER.debug(
|
_LOGGER.debug(
|
||||||
"Connected to MQTT server %s:%s (%s)",
|
"Connected to MQTT server %s:%s (%s)",
|
||||||
self.conf[CONF_BROKER],
|
self.conf[CONF_BROKER],
|
||||||
|
@ -1229,7 +1228,7 @@ class MQTT:
|
||||||
# result is set make sure the first connection result is set
|
# result is set make sure the first connection result is set
|
||||||
self._async_connection_result(False)
|
self._async_connection_result(False)
|
||||||
self.connected = False
|
self.connected = False
|
||||||
async_dispatcher_send(self.hass, MQTT_DISCONNECTED)
|
async_dispatcher_send(self.hass, MQTT_CONNECTION_STATE, False)
|
||||||
_LOGGER.warning(
|
_LOGGER.warning(
|
||||||
"Disconnected from MQTT server %s:%s (%s)",
|
"Disconnected from MQTT server %s:%s (%s)",
|
||||||
self.conf[CONF_BROKER],
|
self.conf[CONF_BROKER],
|
||||||
|
|
|
@ -149,8 +149,7 @@ DEFAULT_WILL = {
|
||||||
|
|
||||||
DOMAIN = "mqtt"
|
DOMAIN = "mqtt"
|
||||||
|
|
||||||
MQTT_CONNECTED = "mqtt_connected"
|
MQTT_CONNECTION_STATE = "mqtt_connection_state"
|
||||||
MQTT_DISCONNECTED = "mqtt_disconnected"
|
|
||||||
|
|
||||||
PAYLOAD_EMPTY_JSON = "{}"
|
PAYLOAD_EMPTY_JSON = "{}"
|
||||||
PAYLOAD_NONE = "None"
|
PAYLOAD_NONE = "None"
|
||||||
|
|
|
@ -92,8 +92,7 @@ from .const import (
|
||||||
CONF_VIA_DEVICE,
|
CONF_VIA_DEVICE,
|
||||||
DEFAULT_ENCODING,
|
DEFAULT_ENCODING,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
MQTT_CONNECTED,
|
MQTT_CONNECTION_STATE,
|
||||||
MQTT_DISCONNECTED,
|
|
||||||
)
|
)
|
||||||
from .debug_info import log_message
|
from .debug_info import log_message
|
||||||
from .discovery import (
|
from .discovery import (
|
||||||
|
@ -460,12 +459,11 @@ class MqttAvailabilityMixin(Entity):
|
||||||
await super().async_added_to_hass()
|
await super().async_added_to_hass()
|
||||||
self._availability_prepare_subscribe_topics()
|
self._availability_prepare_subscribe_topics()
|
||||||
self._availability_subscribe_topics()
|
self._availability_subscribe_topics()
|
||||||
self.async_on_remove(
|
|
||||||
async_dispatcher_connect(self.hass, MQTT_CONNECTED, self.async_mqtt_connect)
|
|
||||||
)
|
|
||||||
self.async_on_remove(
|
self.async_on_remove(
|
||||||
async_dispatcher_connect(
|
async_dispatcher_connect(
|
||||||
self.hass, MQTT_DISCONNECTED, self.async_mqtt_connect
|
self.hass,
|
||||||
|
MQTT_CONNECTION_STATE,
|
||||||
|
self.async_mqtt_connection_state_changed,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -553,7 +551,7 @@ class MqttAvailabilityMixin(Entity):
|
||||||
async_subscribe_topics_internal(self.hass, self._availability_sub_state)
|
async_subscribe_topics_internal(self.hass, self._availability_sub_state)
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def async_mqtt_connect(self) -> None:
|
def async_mqtt_connection_state_changed(self, state: bool) -> None:
|
||||||
"""Update state on connection/disconnection to MQTT broker."""
|
"""Update state on connection/disconnection to MQTT broker."""
|
||||||
if not self.hass.is_stopping:
|
if not self.hass.is_stopping:
|
||||||
self.async_write_ha_state()
|
self.async_write_ha_state()
|
||||||
|
|
|
@ -16,7 +16,7 @@ import yaml
|
||||||
from homeassistant import config as module_hass_config
|
from homeassistant import config as module_hass_config
|
||||||
from homeassistant.components import mqtt
|
from homeassistant.components import mqtt
|
||||||
from homeassistant.components.mqtt import debug_info
|
from homeassistant.components.mqtt import debug_info
|
||||||
from homeassistant.components.mqtt.const import MQTT_DISCONNECTED
|
from homeassistant.components.mqtt.const import MQTT_CONNECTION_STATE
|
||||||
from homeassistant.components.mqtt.mixins import MQTT_ATTRIBUTES_BLOCKED
|
from homeassistant.components.mqtt.mixins import MQTT_ATTRIBUTES_BLOCKED
|
||||||
from homeassistant.components.mqtt.models import PublishPayloadType
|
from homeassistant.components.mqtt.models import PublishPayloadType
|
||||||
from homeassistant.config_entries import ConfigEntryState
|
from homeassistant.config_entries import ConfigEntryState
|
||||||
|
@ -115,7 +115,7 @@ async def help_test_availability_when_connection_lost(
|
||||||
assert state and state.state != STATE_UNAVAILABLE
|
assert state and state.state != STATE_UNAVAILABLE
|
||||||
|
|
||||||
mqtt_mock.connected = False
|
mqtt_mock.connected = False
|
||||||
async_dispatcher_send(hass, MQTT_DISCONNECTED)
|
async_dispatcher_send(hass, MQTT_CONNECTION_STATE, False)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
state = hass.states.get(f"{domain}.test")
|
state = hass.states.get(f"{domain}.test")
|
||||||
|
|
|
@ -1023,7 +1023,7 @@ async def _mqtt_mock_entry(
|
||||||
mock_mqtt_instance.connected = True
|
mock_mqtt_instance.connected = True
|
||||||
mqtt_client_mock.on_connect(mqtt_client_mock, None, 0, 0, 0)
|
mqtt_client_mock.on_connect(mqtt_client_mock, None, 0, 0, 0)
|
||||||
|
|
||||||
async_dispatcher_send(hass, mqtt.MQTT_CONNECTED)
|
async_dispatcher_send(hass, mqtt.MQTT_CONNECTION_STATE, True)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
return mock_mqtt_instance
|
return mock_mqtt_instance
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue