Fix race when handling MQTT discovery messages (#44730)

* Fix race when handling MQTT discovery messages

* Lint

* retrigger checks
This commit is contained in:
Erik Montnemery 2021-01-04 12:28:17 +01:00 committed by GitHub
parent 43474762b2
commit 34bd70aee6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 368 additions and 59 deletions

View file

@ -35,7 +35,11 @@ from homeassistant.const import CONF_UNIQUE_ID # noqa: F401
from homeassistant.core import CoreState, Event, HassJob, ServiceCall, callback from homeassistant.core import CoreState, Event, HassJob, ServiceCall, callback
from homeassistant.exceptions import HomeAssistantError, Unauthorized from homeassistant.exceptions import HomeAssistantError, Unauthorized
from homeassistant.helpers import config_validation as cv, event, template from homeassistant.helpers import config_validation as cv, event, template
from homeassistant.helpers.dispatcher import async_dispatcher_connect, dispatcher_send from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
dispatcher_send,
)
from homeassistant.helpers.entity import Entity from homeassistant.helpers.entity import Entity
from homeassistant.helpers.typing import ConfigType, HomeAssistantType, ServiceDataType from homeassistant.helpers.typing import ConfigType, HomeAssistantType, ServiceDataType
from homeassistant.loader import bind_hass from homeassistant.loader import bind_hass
@ -78,6 +82,7 @@ from .const import (
from .debug_info import log_messages from .debug_info import log_messages
from .discovery import ( from .discovery import (
LAST_DISCOVERY, LAST_DISCOVERY,
MQTT_DISCOVERY_DONE,
MQTT_DISCOVERY_UPDATED, MQTT_DISCOVERY_UPDATED,
clear_discovery_hash, clear_discovery_hash,
set_discovery_hash, set_discovery_hash,
@ -1315,6 +1320,9 @@ class MqttDiscoveryUpdate(Entity):
else: else:
# Non-empty, unchanged payload: Ignore to avoid changing states # Non-empty, unchanged payload: Ignore to avoid changing states
_LOGGER.info("Ignoring unchanged update for: %s", self.entity_id) _LOGGER.info("Ignoring unchanged update for: %s", self.entity_id)
async_dispatcher_send(
self.hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
if discovery_hash: if discovery_hash:
debug_info.add_entity_discovery_data( debug_info.add_entity_discovery_data(
@ -1327,17 +1335,26 @@ class MqttDiscoveryUpdate(Entity):
MQTT_DISCOVERY_UPDATED.format(discovery_hash), MQTT_DISCOVERY_UPDATED.format(discovery_hash),
discovery_callback, discovery_callback,
) )
async_dispatcher_send(
self.hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
async def async_removed_from_registry(self) -> None: async def async_removed_from_registry(self) -> None:
"""Clear retained discovery topic in broker.""" """Clear retained discovery topic in broker."""
if not self._removed_from_hass: if not self._removed_from_hass:
discovery_topic = self._discovery_data[ATTR_DISCOVERY_TOPIC] discovery_topic = self._discovery_data[ATTR_DISCOVERY_TOPIC]
publish( publish(self.hass, discovery_topic, "", retain=True)
self.hass,
discovery_topic, @callback
"", def add_to_platform_abort(self) -> None:
retain=True, """Abort adding an entity to a platform."""
if self._discovery_data:
discovery_hash = self._discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(self.hass, discovery_hash)
async_dispatcher_send(
self.hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
) )
super().add_to_platform_abort()
async def async_will_remove_from_hass(self) -> None: async def async_will_remove_from_hass(self) -> None:
"""Stop listening to signal and cleanup discovery data..""" """Stop listening to signal and cleanup discovery data.."""

View file

@ -29,7 +29,10 @@ from homeassistant.const import (
) )
from homeassistant.core import callback from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.typing import ConfigType, HomeAssistantType from homeassistant.helpers.typing import ConfigType, HomeAssistantType
@ -49,7 +52,7 @@ from . import (
) )
from .. import mqtt from .. import mqtt
from .debug_info import log_messages from .debug_info import log_messages
from .discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from .discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -119,7 +122,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
hass, config, async_add_entities, config_entry, discovery_data hass, config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -21,7 +21,10 @@ from homeassistant.const import (
) )
from homeassistant.core import callback from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
import homeassistant.helpers.event as evt import homeassistant.helpers.event as evt
from homeassistant.helpers.event import async_track_point_in_utc_time from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.reload import async_setup_reload_service
@ -42,7 +45,7 @@ from . import (
) )
from .. import mqtt from .. import mqtt
from .debug_info import log_messages from .debug_info import log_messages
from .discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from .discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -92,7 +95,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
hass, config, async_add_entities, config_entry, discovery_data hass, config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -8,7 +8,10 @@ from homeassistant.components.camera import Camera
from homeassistant.const import CONF_DEVICE, CONF_NAME, CONF_UNIQUE_ID from homeassistant.const import CONF_DEVICE, CONF_NAME, CONF_UNIQUE_ID
from homeassistant.core import callback from homeassistant.core import callback
from homeassistant.helpers import config_validation as cv from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.typing import ConfigType, HomeAssistantType from homeassistant.helpers.typing import ConfigType, HomeAssistantType
@ -25,7 +28,7 @@ from . import (
) )
from .. import mqtt from .. import mqtt
from .debug_info import log_messages from .debug_info import log_messages
from .discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from .discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -66,7 +69,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
config, async_add_entities, config_entry, discovery_data config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -47,7 +47,10 @@ from homeassistant.const import (
) )
from homeassistant.core import callback from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.typing import ConfigType, HomeAssistantType from homeassistant.helpers.typing import ConfigType, HomeAssistantType
@ -66,7 +69,7 @@ from . import (
) )
from .. import mqtt from .. import mqtt
from .debug_info import log_messages from .debug_info import log_messages
from .discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from .discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -259,7 +262,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
hass, config, async_add_entities, config_entry, discovery_data hass, config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -33,7 +33,10 @@ from homeassistant.const import (
) )
from homeassistant.core import callback from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.typing import ConfigType, HomeAssistantType from homeassistant.helpers.typing import ConfigType, HomeAssistantType
@ -53,7 +56,7 @@ from . import (
) )
from .. import mqtt from .. import mqtt
from .debug_info import log_messages from .debug_info import log_messages
from .discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from .discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -190,7 +193,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
hass, config, async_add_entities, config_entry, discovery_data hass, config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -4,11 +4,14 @@ import logging
import voluptuous as vol import voluptuous as vol
from homeassistant.helpers.device_registry import EVENT_DEVICE_REGISTRY_UPDATED from homeassistant.helpers.device_registry import EVENT_DEVICE_REGISTRY_UPDATED
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from . import ATTR_DISCOVERY_HASH, device_trigger from . import ATTR_DISCOVERY_HASH, device_trigger
from .. import mqtt from .. import mqtt
from .discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from .discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -42,7 +45,11 @@ async def async_setup_entry(hass, config_entry):
hass, config, config_entry, discovery_data hass, config, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -20,7 +20,10 @@ from homeassistant.const import (
) )
from homeassistant.core import callback from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from .. import ( from .. import (
MqttAttributes, MqttAttributes,
@ -32,7 +35,7 @@ from .. import (
from ... import mqtt from ... import mqtt
from ..const import ATTR_DISCOVERY_HASH, CONF_QOS, CONF_STATE_TOPIC from ..const import ATTR_DISCOVERY_HASH, CONF_QOS, CONF_STATE_TOPIC
from ..debug_info import log_messages from ..debug_info import log_messages
from ..discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from ..discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -69,7 +72,11 @@ async def async_setup_entry_from_discovery(hass, config_entry, async_add_entitie
hass, config, async_add_entities, config_entry, discovery_data hass, config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -11,7 +11,10 @@ from homeassistant.const import CONF_DEVICE_ID, CONF_DOMAIN, CONF_PLATFORM, CONF
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.typing import ConfigType, HomeAssistantType from homeassistant.helpers.typing import ConfigType, HomeAssistantType
from . import ( from . import (
@ -28,7 +31,7 @@ from . import (
trigger as mqtt_trigger, trigger as mqtt_trigger,
) )
from .. import mqtt from .. import mqtt
from .discovery import MQTT_DISCOVERY_UPDATED, clear_discovery_hash from .discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_UPDATED, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -206,6 +209,7 @@ async def async_setup_trigger(hass, config, config_entry, discovery_data):
await _update_device(hass, config_entry, config) await _update_device(hass, config_entry, config)
device_trigger = hass.data[DEVICE_TRIGGERS][discovery_id] device_trigger = hass.data[DEVICE_TRIGGERS][discovery_id]
await device_trigger.update_trigger(config, discovery_hash, remove_signal) await device_trigger.update_trigger(config, discovery_hash, remove_signal)
async_dispatcher_send(hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None)
remove_signal = async_dispatcher_connect( remove_signal = async_dispatcher_connect(
hass, MQTT_DISCOVERY_UPDATED.format(discovery_hash), discovery_update hass, MQTT_DISCOVERY_UPDATED.format(discovery_hash), discovery_update
@ -220,6 +224,7 @@ async def async_setup_trigger(hass, config, config_entry, discovery_data):
) )
if device is None: if device is None:
async_dispatcher_send(hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None)
return return
if DEVICE_TRIGGERS not in hass.data: if DEVICE_TRIGGERS not in hass.data:
@ -244,6 +249,8 @@ async def async_setup_trigger(hass, config, config_entry, discovery_data):
hass, discovery_hash, discovery_data, device.id hass, discovery_hash, discovery_data, device.id
) )
async_dispatcher_send(hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None)
async def async_device_removed(hass: HomeAssistant, device_id: str): async def async_device_removed(hass: HomeAssistant, device_id: str):
"""Handle the removal of a device.""" """Handle the removal of a device."""

View file

@ -1,5 +1,6 @@
"""Support for MQTT discovery.""" """Support for MQTT discovery."""
import asyncio import asyncio
from collections import deque
import functools import functools
import json import json
import logging import logging
@ -7,7 +8,10 @@ import re
import time import time
from homeassistant.const import CONF_DEVICE, CONF_PLATFORM from homeassistant.const import CONF_DEVICE, CONF_PLATFORM
from homeassistant.helpers.dispatcher import async_dispatcher_send from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.typing import HomeAssistantType from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.loader import async_get_mqtt from homeassistant.loader import async_get_mqtt
@ -46,6 +50,7 @@ SUPPORTED_COMPONENTS = [
] ]
ALREADY_DISCOVERED = "mqtt_discovered_components" ALREADY_DISCOVERED = "mqtt_discovered_components"
PENDING_DISCOVERED = "mqtt_pending_components"
CONFIG_ENTRY_IS_SETUP = "mqtt_config_entry_is_setup" CONFIG_ENTRY_IS_SETUP = "mqtt_config_entry_is_setup"
DATA_CONFIG_ENTRY_LOCK = "mqtt_config_entry_lock" DATA_CONFIG_ENTRY_LOCK = "mqtt_config_entry_lock"
DATA_CONFIG_FLOW_LOCK = "mqtt_discovery_config_flow_lock" DATA_CONFIG_FLOW_LOCK = "mqtt_discovery_config_flow_lock"
@ -53,6 +58,7 @@ DISCOVERY_UNSUBSCRIBE = "mqtt_discovery_unsubscribe"
INTEGRATION_UNSUBSCRIBE = "mqtt_integration_discovery_unsubscribe" INTEGRATION_UNSUBSCRIBE = "mqtt_integration_discovery_unsubscribe"
MQTT_DISCOVERY_UPDATED = "mqtt_discovery_updated_{}" MQTT_DISCOVERY_UPDATED = "mqtt_discovery_updated_{}"
MQTT_DISCOVERY_NEW = "mqtt_discovery_new_{}_{}" MQTT_DISCOVERY_NEW = "mqtt_discovery_new_{}_{}"
MQTT_DISCOVERY_DONE = "mqtt_discovery_done_{}"
LAST_DISCOVERY = "mqtt_last_discovery" LAST_DISCOVERY = "mqtt_last_discovery"
TOPIC_BASE = "~" TOPIC_BASE = "~"
@ -78,7 +84,7 @@ async def async_start(
"""Start MQTT Discovery.""" """Start MQTT Discovery."""
mqtt_integrations = {} mqtt_integrations = {}
async def async_entity_message_received(msg): async def async_discovery_message_received(msg):
"""Process the received message.""" """Process the received message."""
hass.data[LAST_DISCOVERY] = time.time() hass.data[LAST_DISCOVERY] = time.time()
payload = msg.payload payload = msg.payload
@ -141,8 +147,46 @@ async def async_start(
payload[CONF_PLATFORM] = "mqtt" payload[CONF_PLATFORM] = "mqtt"
if ALREADY_DISCOVERED not in hass.data: if discovery_hash in hass.data[PENDING_DISCOVERED]:
hass.data[ALREADY_DISCOVERED] = {} pending = hass.data[PENDING_DISCOVERED][discovery_hash]["pending"]
pending.appendleft(payload)
_LOGGER.info(
"Component has already been discovered: %s %s, queuing update",
component,
discovery_id,
)
return
await async_process_discovery_payload(component, discovery_id, payload)
async def async_process_discovery_payload(component, discovery_id, payload):
_LOGGER.debug("Process discovery payload %s", payload)
discovery_hash = (component, discovery_id)
if discovery_hash in hass.data[ALREADY_DISCOVERED] or payload:
async def discovery_done(_):
pending = hass.data[PENDING_DISCOVERED][discovery_hash]["pending"]
_LOGGER.debug("Pending discovery for %s: %s", discovery_hash, pending)
if not pending:
hass.data[PENDING_DISCOVERED][discovery_hash]["unsub"]()
hass.data[PENDING_DISCOVERED].pop(discovery_hash)
else:
payload = pending.pop()
await async_process_discovery_payload(
component, discovery_id, payload
)
if discovery_hash not in hass.data[PENDING_DISCOVERED]:
hass.data[PENDING_DISCOVERED][discovery_hash] = {
"unsub": async_dispatcher_connect(
hass,
MQTT_DISCOVERY_DONE.format(discovery_hash),
discovery_done,
),
"pending": deque([]),
}
if discovery_hash in hass.data[ALREADY_DISCOVERED]: if discovery_hash in hass.data[ALREADY_DISCOVERED]:
# Dispatch update # Dispatch update
_LOGGER.info( _LOGGER.info(
@ -182,13 +226,21 @@ async def async_start(
async_dispatcher_send( async_dispatcher_send(
hass, MQTT_DISCOVERY_NEW.format(component, "mqtt"), payload hass, MQTT_DISCOVERY_NEW.format(component, "mqtt"), payload
) )
else:
# Unhandled discovery message
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
hass.data[DATA_CONFIG_ENTRY_LOCK] = asyncio.Lock() hass.data[DATA_CONFIG_ENTRY_LOCK] = asyncio.Lock()
hass.data[DATA_CONFIG_FLOW_LOCK] = asyncio.Lock() hass.data[DATA_CONFIG_FLOW_LOCK] = asyncio.Lock()
hass.data[CONFIG_ENTRY_IS_SETUP] = set() hass.data[CONFIG_ENTRY_IS_SETUP] = set()
hass.data[ALREADY_DISCOVERED] = {}
hass.data[PENDING_DISCOVERED] = {}
hass.data[DISCOVERY_UNSUBSCRIBE] = await mqtt.async_subscribe( hass.data[DISCOVERY_UNSUBSCRIBE] = await mqtt.async_subscribe(
hass, f"{discovery_topic}/#", async_entity_message_received, 0 hass, f"{discovery_topic}/#", async_discovery_message_received, 0
) )
hass.data[LAST_DISCOVERY] = time.time() hass.data[LAST_DISCOVERY] = time.time()
mqtt_integrations = await async_get_mqtt(hass) mqtt_integrations = await async_get_mqtt(hass)

View file

@ -25,7 +25,10 @@ from homeassistant.const import (
) )
from homeassistant.core import callback from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.typing import ConfigType, HomeAssistantType from homeassistant.helpers.typing import ConfigType, HomeAssistantType
@ -45,7 +48,7 @@ from . import (
) )
from .. import mqtt from .. import mqtt
from .debug_info import log_messages from .debug_info import log_messages
from .discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from .discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -131,7 +134,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
hass, config, async_add_entities, config_entry, discovery_data hass, config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -4,12 +4,15 @@ import logging
import voluptuous as vol import voluptuous as vol
from homeassistant.components import light from homeassistant.components import light
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.typing import ConfigType, HomeAssistantType from homeassistant.helpers.typing import ConfigType, HomeAssistantType
from .. import ATTR_DISCOVERY_HASH, DOMAIN, PLATFORMS from .. import ATTR_DISCOVERY_HASH, DOMAIN, PLATFORMS
from ..discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from ..discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
from .schema import CONF_SCHEMA, MQTT_LIGHT_SCHEMA_SCHEMA from .schema import CONF_SCHEMA, MQTT_LIGHT_SCHEMA_SCHEMA
from .schema_basic import PLATFORM_SCHEMA_BASIC, async_setup_entity_basic from .schema_basic import PLATFORM_SCHEMA_BASIC, async_setup_entity_basic
from .schema_json import PLATFORM_SCHEMA_JSON, async_setup_entity_json from .schema_json import PLATFORM_SCHEMA_JSON, async_setup_entity_json
@ -53,7 +56,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
hass, config, async_add_entities, config_entry, discovery_data hass, config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -14,7 +14,10 @@ from homeassistant.const import (
) )
from homeassistant.core import callback from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.typing import ConfigType, HomeAssistantType from homeassistant.helpers.typing import ConfigType, HomeAssistantType
@ -34,7 +37,7 @@ from . import (
) )
from .. import mqtt from .. import mqtt
from .debug_info import log_messages from .debug_info import log_messages
from .discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from .discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -93,7 +96,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
hass, config, async_add_entities, config_entry, discovery_data hass, config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -7,7 +7,10 @@ from homeassistant.components import scene
from homeassistant.components.scene import Scene from homeassistant.components.scene import Scene
from homeassistant.const import CONF_ICON, CONF_NAME, CONF_PAYLOAD_ON, CONF_UNIQUE_ID from homeassistant.const import CONF_ICON, CONF_NAME, CONF_PAYLOAD_ON, CONF_UNIQUE_ID
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.typing import ConfigType, HomeAssistantType from homeassistant.helpers.typing import ConfigType, HomeAssistantType
@ -22,7 +25,7 @@ from . import (
MqttDiscoveryUpdate, MqttDiscoveryUpdate,
) )
from .. import mqtt from .. import mqtt
from .discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from .discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -61,7 +64,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
config, async_add_entities, config_entry, discovery_data config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -19,7 +19,10 @@ from homeassistant.const import (
) )
from homeassistant.core import callback from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.entity import Entity from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import async_track_point_in_utc_time from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.reload import async_setup_reload_service
@ -40,7 +43,7 @@ from . import (
) )
from .. import mqtt from .. import mqtt
from .debug_info import log_messages from .debug_info import log_messages
from .discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from .discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -86,7 +89,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
hass, config, async_add_entities, config_entry, discovery_data hass, config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -18,7 +18,10 @@ from homeassistant.const import (
) )
from homeassistant.core import callback from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.restore_state import RestoreEntity from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import ConfigType, HomeAssistantType from homeassistant.helpers.typing import ConfigType, HomeAssistantType
@ -39,7 +42,7 @@ from . import (
) )
from .. import mqtt from .. import mqtt
from .debug_info import log_messages from .debug_info import log_messages
from .discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from .discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -89,7 +92,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
hass, config, async_add_entities, config_entry, discovery_data hass, config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -6,7 +6,10 @@ import voluptuous as vol
from homeassistant.const import CONF_PLATFORM, CONF_VALUE_TEMPLATE from homeassistant.const import CONF_PLATFORM, CONF_VALUE_TEMPLATE
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device_registry import EVENT_DEVICE_REGISTRY_UPDATED from homeassistant.helpers.device_registry import EVENT_DEVICE_REGISTRY_UPDATED
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from . import ( from . import (
ATTR_DISCOVERY_HASH, ATTR_DISCOVERY_HASH,
@ -21,7 +24,12 @@ from . import (
subscription, subscription,
) )
from .. import mqtt from .. import mqtt
from .discovery import MQTT_DISCOVERY_NEW, MQTT_DISCOVERY_UPDATED, clear_discovery_hash from .discovery import (
MQTT_DISCOVERY_DONE,
MQTT_DISCOVERY_NEW,
MQTT_DISCOVERY_UPDATED,
clear_discovery_hash,
)
from .util import valid_subscribe_topic from .util import valid_subscribe_topic
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -50,7 +58,11 @@ async def async_setup_entry(hass, config_entry):
config = PLATFORM_SCHEMA(discovery_payload) config = PLATFORM_SCHEMA(discovery_payload)
await async_setup_tag(hass, config, config_entry, discovery_data) await async_setup_tag(hass, config, config_entry, discovery_data)
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(
@ -142,6 +154,10 @@ class MQTTTagScanner:
self._setup_from_config(config) self._setup_from_config(config)
await self.subscribe_topics() await self.subscribe_topics()
async_dispatcher_send(
self.hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
def _setup_from_config(self, config): def _setup_from_config(self, config):
self._value_template = lambda value, error_value: value self._value_template = lambda value, error_value: value
if CONF_VALUE_TEMPLATE in config: if CONF_VALUE_TEMPLATE in config:
@ -163,6 +179,9 @@ class MQTTTagScanner:
MQTT_DISCOVERY_UPDATED.format(discovery_hash), MQTT_DISCOVERY_UPDATED.format(discovery_hash),
self.discovery_update, self.discovery_update,
) )
async_dispatcher_send(
self.hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
async def subscribe_topics(self): async def subscribe_topics(self):
"""Subscribe to MQTT topics.""" """Subscribe to MQTT topics."""

View file

@ -4,11 +4,14 @@ import logging
import voluptuous as vol import voluptuous as vol
from homeassistant.components.vacuum import DOMAIN from homeassistant.components.vacuum import DOMAIN
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.reload import async_setup_reload_service
from .. import ATTR_DISCOVERY_HASH, DOMAIN as MQTT_DOMAIN, PLATFORMS from .. import ATTR_DISCOVERY_HASH, DOMAIN as MQTT_DOMAIN, PLATFORMS
from ..discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash from ..discovery import MQTT_DISCOVERY_DONE, MQTT_DISCOVERY_NEW, clear_discovery_hash
from .schema import CONF_SCHEMA, LEGACY, MQTT_VACUUM_SCHEMA, STATE from .schema import CONF_SCHEMA, LEGACY, MQTT_VACUUM_SCHEMA, STATE
from .schema_legacy import PLATFORM_SCHEMA_LEGACY, async_setup_entity_legacy from .schema_legacy import PLATFORM_SCHEMA_LEGACY, async_setup_entity_legacy
from .schema_state import PLATFORM_SCHEMA_STATE, async_setup_entity_state from .schema_state import PLATFORM_SCHEMA_STATE, async_setup_entity_state
@ -45,7 +48,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
config, async_add_entities, config_entry, discovery_data config, async_add_entities, config_entry, discovery_data
) )
except Exception: except Exception:
clear_discovery_hash(hass, discovery_data[ATTR_DISCOVERY_HASH]) discovery_hash = discovery_data[ATTR_DISCOVERY_HASH]
clear_discovery_hash(hass, discovery_hash)
async_dispatcher_send(
hass, MQTT_DISCOVERY_DONE.format(discovery_hash), None
)
raise raise
async_dispatcher_connect( async_dispatcher_connect(

View file

@ -12,7 +12,8 @@ from homeassistant.components.mqtt.abbreviations import (
DEVICE_ABBREVIATIONS, DEVICE_ABBREVIATIONS,
) )
from homeassistant.components.mqtt.discovery import ALREADY_DISCOVERED, async_start from homeassistant.components.mqtt.discovery import ALREADY_DISCOVERED, async_start
from homeassistant.const import STATE_OFF, STATE_ON from homeassistant.const import EVENT_STATE_CHANGED, STATE_OFF, STATE_ON
import homeassistant.core as ha
from tests.common import ( from tests.common import (
async_fire_mqtt_message, async_fire_mqtt_message,
@ -252,6 +253,121 @@ async def test_rediscover(hass, mqtt_mock, caplog):
assert state is not None assert state is not None
async def test_rapid_rediscover(hass, mqtt_mock, caplog):
"""Test immediate rediscover of removed component."""
events = []
@ha.callback
def callback(event):
"""Verify event got called."""
events.append(event)
hass.bus.async_listen(EVENT_STATE_CHANGED, callback)
async_fire_mqtt_message(
hass,
"homeassistant/binary_sensor/bla/config",
'{ "name": "Beer", "state_topic": "test-topic" }',
)
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.beer")
assert state is not None
assert len(events) == 1
# Removal immediately followed by rediscover
async_fire_mqtt_message(hass, "homeassistant/binary_sensor/bla/config", "")
async_fire_mqtt_message(
hass,
"homeassistant/binary_sensor/bla/config",
'{ "name": "Beer", "state_topic": "test-topic" }',
)
async_fire_mqtt_message(hass, "homeassistant/binary_sensor/bla/config", "")
async_fire_mqtt_message(
hass,
"homeassistant/binary_sensor/bla/config",
'{ "name": "Milk", "state_topic": "test-topic" }',
)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids("binary_sensor")) == 1
state = hass.states.get("binary_sensor.milk")
assert state is not None
assert len(events) == 5
# Remove the entity
assert events[1].data["entity_id"] == "binary_sensor.beer"
assert events[1].data["new_state"] is None
# Add the entity
assert events[2].data["entity_id"] == "binary_sensor.beer"
assert events[2].data["old_state"] is None
# Remove the entity
assert events[3].data["entity_id"] == "binary_sensor.beer"
assert events[3].data["new_state"] is None
# Add the entity
assert events[4].data["entity_id"] == "binary_sensor.milk"
assert events[4].data["old_state"] is None
async def test_rapid_rediscover_unique(hass, mqtt_mock, caplog):
"""Test immediate rediscover of removed component."""
events = []
@ha.callback
def callback(event):
"""Verify event got called."""
events.append(event)
hass.bus.async_listen(EVENT_STATE_CHANGED, callback)
async_fire_mqtt_message(
hass,
"homeassistant/binary_sensor/bla2/config",
'{ "name": "Ale", "state_topic": "test-topic", "unique_id": "very_unique" }',
)
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.ale")
assert state is not None
assert len(events) == 1
# Duplicate unique_id, immediately followed by correct unique_id
async_fire_mqtt_message(
hass,
"homeassistant/binary_sensor/bla/config",
'{ "name": "Beer", "state_topic": "test-topic", "unique_id": "very_unique" }',
)
async_fire_mqtt_message(
hass,
"homeassistant/binary_sensor/bla/config",
'{ "name": "Beer", "state_topic": "test-topic", "unique_id": "even_uniquer" }',
)
async_fire_mqtt_message(hass, "homeassistant/binary_sensor/bla/config", "")
async_fire_mqtt_message(
hass,
"homeassistant/binary_sensor/bla/config",
'{ "name": "Milk", "state_topic": "test-topic", "unique_id": "even_uniquer" }',
)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids("binary_sensor")) == 2
state = hass.states.get("binary_sensor.ale")
assert state is not None
state = hass.states.get("binary_sensor.milk")
assert state is not None
assert len(events) == 4
# Add the entity
assert events[1].data["entity_id"] == "binary_sensor.beer"
assert events[1].data["old_state"] is None
# Remove the entity
assert events[2].data["entity_id"] == "binary_sensor.beer"
assert events[2].data["new_state"] is None
# Add the entity
assert events[3].data["entity_id"] == "binary_sensor.milk"
assert events[3].data["old_state"] is None
async def test_duplicate_removal(hass, mqtt_mock, caplog): async def test_duplicate_removal(hass, mqtt_mock, caplog):
"""Test for a non duplicate component.""" """Test for a non duplicate component."""
async_fire_mqtt_message( async_fire_mqtt_message(