Fix shutdown deadlock with run_callback_threadsafe (#45807)
This commit is contained in:
parent
31a84555b9
commit
9f59515bb8
4 changed files with 122 additions and 2 deletions
|
@ -71,7 +71,11 @@ from homeassistant.exceptions import (
|
||||||
Unauthorized,
|
Unauthorized,
|
||||||
)
|
)
|
||||||
from homeassistant.util import location, network
|
from homeassistant.util import location, network
|
||||||
from homeassistant.util.async_ import fire_coroutine_threadsafe, run_callback_threadsafe
|
from homeassistant.util.async_ import (
|
||||||
|
fire_coroutine_threadsafe,
|
||||||
|
run_callback_threadsafe,
|
||||||
|
shutdown_run_callback_threadsafe,
|
||||||
|
)
|
||||||
import homeassistant.util.dt as dt_util
|
import homeassistant.util.dt as dt_util
|
||||||
from homeassistant.util.timeout import TimeoutManager
|
from homeassistant.util.timeout import TimeoutManager
|
||||||
from homeassistant.util.unit_system import IMPERIAL_SYSTEM, METRIC_SYSTEM, UnitSystem
|
from homeassistant.util.unit_system import IMPERIAL_SYSTEM, METRIC_SYSTEM, UnitSystem
|
||||||
|
@ -548,6 +552,14 @@ class HomeAssistant:
|
||||||
# stage 3
|
# stage 3
|
||||||
self.state = CoreState.not_running
|
self.state = CoreState.not_running
|
||||||
self.bus.async_fire(EVENT_HOMEASSISTANT_CLOSE)
|
self.bus.async_fire(EVENT_HOMEASSISTANT_CLOSE)
|
||||||
|
|
||||||
|
# Prevent run_callback_threadsafe from scheduling any additional
|
||||||
|
# callbacks in the event loop as callbacks created on the futures
|
||||||
|
# it returns will never run after the final `self.async_block_till_done`
|
||||||
|
# which will cause the futures to block forever when waiting for
|
||||||
|
# the `result()` which will cause a deadlock when shutting down the executor.
|
||||||
|
shutdown_run_callback_threadsafe(self.loop)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with self.timeout.async_timeout(30):
|
async with self.timeout.async_timeout(30):
|
||||||
await self.async_block_till_done()
|
await self.async_block_till_done()
|
||||||
|
|
|
@ -10,6 +10,8 @@ from typing import Any, Awaitable, Callable, Coroutine, TypeVar
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_SHUTDOWN_RUN_CALLBACK_THREADSAFE = "_shutdown_run_callback_threadsafe"
|
||||||
|
|
||||||
T = TypeVar("T")
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
@ -58,6 +60,28 @@ def run_callback_threadsafe(
|
||||||
_LOGGER.warning("Exception on lost future: ", exc_info=True)
|
_LOGGER.warning("Exception on lost future: ", exc_info=True)
|
||||||
|
|
||||||
loop.call_soon_threadsafe(run_callback)
|
loop.call_soon_threadsafe(run_callback)
|
||||||
|
|
||||||
|
if hasattr(loop, _SHUTDOWN_RUN_CALLBACK_THREADSAFE):
|
||||||
|
#
|
||||||
|
# If the final `HomeAssistant.async_block_till_done` in
|
||||||
|
# `HomeAssistant.async_stop` has already been called, the callback
|
||||||
|
# will never run and, `future.result()` will block forever which
|
||||||
|
# will prevent the thread running this code from shutting down which
|
||||||
|
# will result in a deadlock when the main thread attempts to shutdown
|
||||||
|
# the executor and `.join()` the thread running this code.
|
||||||
|
#
|
||||||
|
# To prevent this deadlock we do the following on shutdown:
|
||||||
|
#
|
||||||
|
# 1. Set the _SHUTDOWN_RUN_CALLBACK_THREADSAFE attr on this function
|
||||||
|
# by calling `shutdown_run_callback_threadsafe`
|
||||||
|
# 2. Call `hass.async_block_till_done` at least once after shutdown
|
||||||
|
# to ensure all callbacks have run
|
||||||
|
# 3. Raise an exception here to ensure `future.result()` can never be
|
||||||
|
# called and hit the deadlock since once `shutdown_run_callback_threadsafe`
|
||||||
|
# we cannot promise the callback will be executed.
|
||||||
|
#
|
||||||
|
raise RuntimeError("The event loop is in the process of shutting down.")
|
||||||
|
|
||||||
return future
|
return future
|
||||||
|
|
||||||
|
|
||||||
|
@ -139,3 +163,20 @@ async def gather_with_concurrency(
|
||||||
return await gather(
|
return await gather(
|
||||||
*(sem_task(task) for task in tasks), return_exceptions=return_exceptions
|
*(sem_task(task) for task in tasks), return_exceptions=return_exceptions
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def shutdown_run_callback_threadsafe(loop: AbstractEventLoop) -> None:
|
||||||
|
"""Call when run_callback_threadsafe should prevent creating new futures.
|
||||||
|
|
||||||
|
We must finish all callbacks before the executor is shutdown
|
||||||
|
or we can end up in a deadlock state where:
|
||||||
|
|
||||||
|
`executor.result()` is waiting for its `._condition`
|
||||||
|
and the executor shutdown is trying to `.join()` the
|
||||||
|
executor thread.
|
||||||
|
|
||||||
|
This function is considered irreversible and should only ever
|
||||||
|
be called when Home Assistant is going to shutdown and
|
||||||
|
python is going to exit.
|
||||||
|
"""
|
||||||
|
setattr(loop, _SHUTDOWN_RUN_CALLBACK_THREADSAFE, True)
|
||||||
|
|
|
@ -169,6 +169,30 @@ async def test_stage_shutdown(hass):
|
||||||
assert len(test_all) == 2
|
assert len(test_all) == 2
|
||||||
|
|
||||||
|
|
||||||
|
async def test_shutdown_calls_block_till_done_after_shutdown_run_callback_threadsafe(
|
||||||
|
hass,
|
||||||
|
):
|
||||||
|
"""Ensure shutdown_run_callback_threadsafe is called before the final async_block_till_done."""
|
||||||
|
stop_calls = []
|
||||||
|
|
||||||
|
async def _record_block_till_done():
|
||||||
|
nonlocal stop_calls
|
||||||
|
stop_calls.append("async_block_till_done")
|
||||||
|
|
||||||
|
def _record_shutdown_run_callback_threadsafe(loop):
|
||||||
|
nonlocal stop_calls
|
||||||
|
stop_calls.append(("shutdown_run_callback_threadsafe", loop))
|
||||||
|
|
||||||
|
with patch.object(hass, "async_block_till_done", _record_block_till_done), patch(
|
||||||
|
"homeassistant.core.shutdown_run_callback_threadsafe",
|
||||||
|
_record_shutdown_run_callback_threadsafe,
|
||||||
|
):
|
||||||
|
await hass.async_stop()
|
||||||
|
|
||||||
|
assert stop_calls[-2] == ("shutdown_run_callback_threadsafe", hass.loop)
|
||||||
|
assert stop_calls[-1] == "async_block_till_done"
|
||||||
|
|
||||||
|
|
||||||
async def test_pending_sheduler(hass):
|
async def test_pending_sheduler(hass):
|
||||||
"""Add a coro to pending tasks."""
|
"""Add a coro to pending tasks."""
|
||||||
call_count = []
|
call_count = []
|
||||||
|
|
|
@ -50,7 +50,8 @@ def test_fire_coroutine_threadsafe_from_inside_event_loop(
|
||||||
def test_run_callback_threadsafe_from_inside_event_loop(mock_ident, _):
|
def test_run_callback_threadsafe_from_inside_event_loop(mock_ident, _):
|
||||||
"""Testing calling run_callback_threadsafe from inside an event loop."""
|
"""Testing calling run_callback_threadsafe from inside an event loop."""
|
||||||
callback = MagicMock()
|
callback = MagicMock()
|
||||||
loop = MagicMock()
|
|
||||||
|
loop = Mock(spec=["call_soon_threadsafe"])
|
||||||
|
|
||||||
loop._thread_ident = None
|
loop._thread_ident = None
|
||||||
mock_ident.return_value = 5
|
mock_ident.return_value = 5
|
||||||
|
@ -168,3 +169,45 @@ async def test_gather_with_concurrency():
|
||||||
)
|
)
|
||||||
|
|
||||||
assert results == [2, 2, -1, -1]
|
assert results == [2, 2, -1, -1]
|
||||||
|
|
||||||
|
|
||||||
|
async def test_shutdown_run_callback_threadsafe(hass):
|
||||||
|
"""Test we can shutdown run_callback_threadsafe."""
|
||||||
|
hasync.shutdown_run_callback_threadsafe(hass.loop)
|
||||||
|
callback = MagicMock()
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError):
|
||||||
|
hasync.run_callback_threadsafe(hass.loop, callback)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_run_callback_threadsafe(hass):
|
||||||
|
"""Test run_callback_threadsafe runs code in the event loop."""
|
||||||
|
it_ran = False
|
||||||
|
|
||||||
|
def callback():
|
||||||
|
nonlocal it_ran
|
||||||
|
it_ran = True
|
||||||
|
|
||||||
|
assert hasync.run_callback_threadsafe(hass.loop, callback)
|
||||||
|
assert it_ran is False
|
||||||
|
|
||||||
|
# Verify that async_block_till_done will flush
|
||||||
|
# out the callback
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
assert it_ran is True
|
||||||
|
|
||||||
|
|
||||||
|
async def test_callback_is_always_scheduled(hass):
|
||||||
|
"""Test run_callback_threadsafe always calls call_soon_threadsafe before checking for shutdown."""
|
||||||
|
# We have to check the shutdown state AFTER the callback is scheduled otherwise
|
||||||
|
# the function could continue on and the caller call `future.result()` after
|
||||||
|
# the point in the main thread where callbacks are no longer run.
|
||||||
|
|
||||||
|
callback = MagicMock()
|
||||||
|
hasync.shutdown_run_callback_threadsafe(hass.loop)
|
||||||
|
|
||||||
|
with patch.object(hass.loop, "call_soon_threadsafe") as mock_call_soon_threadsafe:
|
||||||
|
with pytest.raises(RuntimeError):
|
||||||
|
hasync.run_callback_threadsafe(hass.loop, callback)
|
||||||
|
|
||||||
|
mock_call_soon_threadsafe.assert_called_once()
|
||||||
|
|
Loading…
Add table
Reference in a new issue