Turn on thread safety checks in async_dispatcher_send (#116867)
* Turn on thread safety checks in async_dispatcher_send We keep seeing issues where async_dispatcher_send is called from a thread which means we call the callback function on the other side in the thread as well which usually leads to a crash * Turn on thread safety checks in async_dispatcher_send We keep seeing issues where async_dispatcher_send is called from a thread which means we call the callback function on the other side in the thread as well which usually leads to a crash * adjust
This commit is contained in:
parent
b41b1bb998
commit
91fa8b50cc
6 changed files with 47 additions and 17 deletions
|
@ -84,7 +84,7 @@ from .helpers import (
|
||||||
template,
|
template,
|
||||||
translation,
|
translation,
|
||||||
)
|
)
|
||||||
from .helpers.dispatcher import async_dispatcher_send
|
from .helpers.dispatcher import async_dispatcher_send_internal
|
||||||
from .helpers.storage import get_internal_store_manager
|
from .helpers.storage import get_internal_store_manager
|
||||||
from .helpers.system_info import async_get_system_info
|
from .helpers.system_info import async_get_system_info
|
||||||
from .helpers.typing import ConfigType
|
from .helpers.typing import ConfigType
|
||||||
|
@ -700,7 +700,7 @@ class _WatchPendingSetups:
|
||||||
def _async_dispatch(self, remaining_with_setup_started: dict[str, float]) -> None:
|
def _async_dispatch(self, remaining_with_setup_started: dict[str, float]) -> None:
|
||||||
"""Dispatch the signal."""
|
"""Dispatch the signal."""
|
||||||
if remaining_with_setup_started or not self._previous_was_empty:
|
if remaining_with_setup_started or not self._previous_was_empty:
|
||||||
async_dispatcher_send(
|
async_dispatcher_send_internal(
|
||||||
self._hass, SIGNAL_BOOTSTRAP_INTEGRATIONS, remaining_with_setup_started
|
self._hass, SIGNAL_BOOTSTRAP_INTEGRATIONS, remaining_with_setup_started
|
||||||
)
|
)
|
||||||
self._previous_was_empty = not remaining_with_setup_started
|
self._previous_was_empty = not remaining_with_setup_started
|
||||||
|
|
|
@ -48,7 +48,7 @@ from .exceptions import (
|
||||||
)
|
)
|
||||||
from .helpers import device_registry, entity_registry, issue_registry as ir, storage
|
from .helpers import device_registry, entity_registry, issue_registry as ir, storage
|
||||||
from .helpers.debounce import Debouncer
|
from .helpers.debounce import Debouncer
|
||||||
from .helpers.dispatcher import SignalType, async_dispatcher_send
|
from .helpers.dispatcher import SignalType, async_dispatcher_send_internal
|
||||||
from .helpers.event import (
|
from .helpers.event import (
|
||||||
RANDOM_MICROSECOND_MAX,
|
RANDOM_MICROSECOND_MAX,
|
||||||
RANDOM_MICROSECOND_MIN,
|
RANDOM_MICROSECOND_MIN,
|
||||||
|
@ -841,7 +841,7 @@ class ConfigEntry(Generic[_DataT]):
|
||||||
error_reason_translation_placeholders,
|
error_reason_translation_placeholders,
|
||||||
)
|
)
|
||||||
self.clear_cache()
|
self.clear_cache()
|
||||||
async_dispatcher_send(
|
async_dispatcher_send_internal(
|
||||||
hass, SIGNAL_CONFIG_ENTRY_CHANGED, ConfigEntryChange.UPDATED, self
|
hass, SIGNAL_CONFIG_ENTRY_CHANGED, ConfigEntryChange.UPDATED, self
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1880,6 +1880,7 @@ class ConfigEntries:
|
||||||
if entry.entry_id not in self._entries:
|
if entry.entry_id not in self._entries:
|
||||||
raise UnknownEntry(entry.entry_id)
|
raise UnknownEntry(entry.entry_id)
|
||||||
|
|
||||||
|
self.hass.verify_event_loop_thread("async_update_entry")
|
||||||
changed = False
|
changed = False
|
||||||
_setter = object.__setattr__
|
_setter = object.__setattr__
|
||||||
|
|
||||||
|
@ -1928,7 +1929,7 @@ class ConfigEntries:
|
||||||
self, change_type: ConfigEntryChange, entry: ConfigEntry
|
self, change_type: ConfigEntryChange, entry: ConfigEntry
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Dispatch a config entry change."""
|
"""Dispatch a config entry change."""
|
||||||
async_dispatcher_send(
|
async_dispatcher_send_internal(
|
||||||
self.hass, SIGNAL_CONFIG_ENTRY_CHANGED, change_type, entry
|
self.hass, SIGNAL_CONFIG_ENTRY_CHANGED, change_type, entry
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ from homeassistant.const import Platform
|
||||||
from homeassistant.loader import bind_hass
|
from homeassistant.loader import bind_hass
|
||||||
|
|
||||||
from ..util.signal_type import SignalTypeFormat
|
from ..util.signal_type import SignalTypeFormat
|
||||||
from .dispatcher import async_dispatcher_connect, async_dispatcher_send
|
from .dispatcher import async_dispatcher_connect, async_dispatcher_send_internal
|
||||||
from .typing import ConfigType, DiscoveryInfoType
|
from .typing import ConfigType, DiscoveryInfoType
|
||||||
|
|
||||||
SIGNAL_PLATFORM_DISCOVERED: SignalTypeFormat[DiscoveryDict] = SignalTypeFormat(
|
SIGNAL_PLATFORM_DISCOVERED: SignalTypeFormat[DiscoveryDict] = SignalTypeFormat(
|
||||||
|
@ -95,7 +95,9 @@ async def async_discover(
|
||||||
"discovered": discovered,
|
"discovered": discovered,
|
||||||
}
|
}
|
||||||
|
|
||||||
async_dispatcher_send(hass, SIGNAL_PLATFORM_DISCOVERED.format(service), data)
|
async_dispatcher_send_internal(
|
||||||
|
hass, SIGNAL_PLATFORM_DISCOVERED.format(service), data
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@bind_hass
|
@bind_hass
|
||||||
|
@ -177,4 +179,6 @@ async def async_load_platform(
|
||||||
"discovered": discovered,
|
"discovered": discovered,
|
||||||
}
|
}
|
||||||
|
|
||||||
async_dispatcher_send(hass, SIGNAL_PLATFORM_DISCOVERED.format(service), data)
|
async_dispatcher_send_internal(
|
||||||
|
hass, SIGNAL_PLATFORM_DISCOVERED.format(service), data
|
||||||
|
)
|
||||||
|
|
|
@ -145,7 +145,7 @@ def dispatcher_send(hass: HomeAssistant, signal: str, *args: Any) -> None: ...
|
||||||
@bind_hass # type: ignore[misc] # workaround; exclude typing of 2 overload in func def
|
@bind_hass # type: ignore[misc] # workaround; exclude typing of 2 overload in func def
|
||||||
def dispatcher_send(hass: HomeAssistant, signal: SignalType[*_Ts], *args: *_Ts) -> None:
|
def dispatcher_send(hass: HomeAssistant, signal: SignalType[*_Ts], *args: *_Ts) -> None:
|
||||||
"""Send signal and data."""
|
"""Send signal and data."""
|
||||||
hass.loop.call_soon_threadsafe(async_dispatcher_send, hass, signal, *args)
|
hass.loop.call_soon_threadsafe(async_dispatcher_send_internal, hass, signal, *args)
|
||||||
|
|
||||||
|
|
||||||
def _format_err(
|
def _format_err(
|
||||||
|
@ -199,9 +199,33 @@ def async_dispatcher_send(
|
||||||
|
|
||||||
This method must be run in the event loop.
|
This method must be run in the event loop.
|
||||||
"""
|
"""
|
||||||
if hass.config.debug:
|
# We turned on asyncio debug in April 2024 in the dev containers
|
||||||
hass.verify_event_loop_thread("async_dispatcher_send")
|
# in the hope of catching some of the issues that have been
|
||||||
|
# reported. It will take a while to get all the issues fixed in
|
||||||
|
# custom components.
|
||||||
|
#
|
||||||
|
# In 2025.5 we should guard the `verify_event_loop_thread`
|
||||||
|
# check with a check for the `hass.config.debug` flag being set as
|
||||||
|
# long term we don't want to be checking this in production
|
||||||
|
# environments since it is a performance hit.
|
||||||
|
hass.verify_event_loop_thread("async_dispatcher_send")
|
||||||
|
async_dispatcher_send_internal(hass, signal, *args)
|
||||||
|
|
||||||
|
|
||||||
|
@callback
|
||||||
|
@bind_hass
|
||||||
|
def async_dispatcher_send_internal(
|
||||||
|
hass: HomeAssistant, signal: SignalType[*_Ts] | str, *args: *_Ts
|
||||||
|
) -> None:
|
||||||
|
"""Send signal and data.
|
||||||
|
|
||||||
|
This method is intended to only be used by core internally
|
||||||
|
and should not be considered a stable API. We will make
|
||||||
|
breaking changes to this function in the future and it
|
||||||
|
should not be used in integrations.
|
||||||
|
|
||||||
|
This method must be run in the event loop.
|
||||||
|
"""
|
||||||
if (maybe_dispatchers := hass.data.get(DATA_DISPATCHER)) is None:
|
if (maybe_dispatchers := hass.data.get(DATA_DISPATCHER)) is None:
|
||||||
return
|
return
|
||||||
dispatchers: _DispatcherDataType[*_Ts] = maybe_dispatchers
|
dispatchers: _DispatcherDataType[*_Ts] = maybe_dispatchers
|
||||||
|
|
|
@ -85,7 +85,7 @@ from homeassistant.util.signal_type import SignalType, SignalTypeFormat
|
||||||
|
|
||||||
from . import condition, config_validation as cv, service, template
|
from . import condition, config_validation as cv, service, template
|
||||||
from .condition import ConditionCheckerType, trace_condition_function
|
from .condition import ConditionCheckerType, trace_condition_function
|
||||||
from .dispatcher import async_dispatcher_connect, async_dispatcher_send
|
from .dispatcher import async_dispatcher_connect, async_dispatcher_send_internal
|
||||||
from .event import async_call_later, async_track_template
|
from .event import async_call_later, async_track_template
|
||||||
from .script_variables import ScriptVariables
|
from .script_variables import ScriptVariables
|
||||||
from .trace import (
|
from .trace import (
|
||||||
|
@ -208,7 +208,9 @@ async def trace_action(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
async_dispatcher_send(hass, SCRIPT_BREAKPOINT_HIT, key, run_id, path)
|
async_dispatcher_send_internal(
|
||||||
|
hass, SCRIPT_BREAKPOINT_HIT, key, run_id, path
|
||||||
|
)
|
||||||
|
|
||||||
done = hass.loop.create_future()
|
done = hass.loop.create_future()
|
||||||
|
|
||||||
|
@ -1986,7 +1988,7 @@ def debug_continue(hass: HomeAssistant, key: str, run_id: str) -> None:
|
||||||
breakpoint_clear(hass, key, run_id, NODE_ANY)
|
breakpoint_clear(hass, key, run_id, NODE_ANY)
|
||||||
|
|
||||||
signal = SCRIPT_DEBUG_CONTINUE_STOP.format(key, run_id)
|
signal = SCRIPT_DEBUG_CONTINUE_STOP.format(key, run_id)
|
||||||
async_dispatcher_send(hass, signal, "continue")
|
async_dispatcher_send_internal(hass, signal, "continue")
|
||||||
|
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
|
@ -1996,11 +1998,11 @@ def debug_step(hass: HomeAssistant, key: str, run_id: str) -> None:
|
||||||
breakpoint_set(hass, key, run_id, NODE_ANY)
|
breakpoint_set(hass, key, run_id, NODE_ANY)
|
||||||
|
|
||||||
signal = SCRIPT_DEBUG_CONTINUE_STOP.format(key, run_id)
|
signal = SCRIPT_DEBUG_CONTINUE_STOP.format(key, run_id)
|
||||||
async_dispatcher_send(hass, signal, "continue")
|
async_dispatcher_send_internal(hass, signal, "continue")
|
||||||
|
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def debug_stop(hass: HomeAssistant, key: str, run_id: str) -> None:
|
def debug_stop(hass: HomeAssistant, key: str, run_id: str) -> None:
|
||||||
"""Stop execution of a running or halted script."""
|
"""Stop execution of a running or halted script."""
|
||||||
signal = SCRIPT_DEBUG_CONTINUE_STOP.format(key, run_id)
|
signal = SCRIPT_DEBUG_CONTINUE_STOP.format(key, run_id)
|
||||||
async_dispatcher_send(hass, signal, "stop")
|
async_dispatcher_send_internal(hass, signal, "stop")
|
||||||
|
|
|
@ -243,7 +243,6 @@ async def test_dispatcher_add_dispatcher(hass: HomeAssistant) -> None:
|
||||||
|
|
||||||
async def test_thread_safety_checks(hass: HomeAssistant) -> None:
|
async def test_thread_safety_checks(hass: HomeAssistant) -> None:
|
||||||
"""Test dispatcher thread safety checks."""
|
"""Test dispatcher thread safety checks."""
|
||||||
hass.config.debug = True
|
|
||||||
calls = []
|
calls = []
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue