Limit executor imports to a single thread (#111898)

* Limit executor imports to a single thread

* test for import executor

* test for import executor

* test for import executor

* fixes

* better fix
This commit is contained in:
J. Nick Koston 2024-02-29 16:02:13 -10:00 committed by GitHub
parent 3b93c21d9d
commit 25510fc13c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 35 additions and 4 deletions

View file

@ -95,6 +95,7 @@ from .util.async_ import (
run_callback_threadsafe,
shutdown_run_callback_threadsafe,
)
from .util.executor import InterruptibleThreadPoolExecutor
from .util.json import JsonObjectType
from .util.read_only_dict import ReadOnlyDict
from .util.timeout import TimeoutManager
@ -394,6 +395,9 @@ class HomeAssistant:
self.timeout: TimeoutManager = TimeoutManager()
self._stop_future: concurrent.futures.Future[None] | None = None
self._shutdown_jobs: list[HassJobWithArgs] = []
self.import_executor = InterruptibleThreadPoolExecutor(
max_workers=1, thread_name_prefix="ImportExecutor"
)
@cached_property
def is_running(self) -> bool:
@ -678,6 +682,16 @@ class HomeAssistant:
return task
@callback
def async_add_import_executor_job(
self, target: Callable[..., _T], *args: Any
) -> asyncio.Future[_T]:
"""Add an import executor job from within the event loop."""
task = self.loop.run_in_executor(self.import_executor, target, *args)
self._tasks.add(task)
task.add_done_callback(self._tasks.remove)
return task
@overload
@callback
def async_run_hass_job(
@ -992,6 +1006,7 @@ class HomeAssistant:
self._async_log_running_tasks("close")
self.set_state(CoreState.stopped)
self.import_executor.shutdown()
if self._stopped is not None:
self._stopped.set()

View file

@ -853,7 +853,7 @@ class Integration:
# So we do it before validating config to catch these errors.
if load_executor:
try:
comp = await self.hass.async_add_executor_job(self.get_component)
comp = await self.hass.async_add_import_executor_job(self.get_component)
except ImportError as ex:
load_executor = False
_LOGGER.debug("Failed to import %s in executor", domain, exc_info=ex)
@ -924,7 +924,7 @@ class Integration:
try:
if load_executor:
try:
platform = await self.hass.async_add_executor_job(
platform = await self.hass.async_add_import_executor_job(
self._load_platform, platform_name
)
except ImportError as ex:

View file

@ -310,7 +310,7 @@ async def test_update_entity_ha_not_running(
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test update occurs only after HA is running."""
await hass.async_stop()
hass.set_state(CoreState.not_running)
client.async_send_command.return_value = {"updates": []}
@ -632,7 +632,7 @@ async def test_update_entity_delay(
"""Test update occurs on a delay after HA starts."""
client.async_send_command.reset_mock()
client.async_send_command.return_value = {"updates": []}
await hass.async_stop()
hass.set_state(CoreState.not_running)
entry = MockConfigEntry(domain="zwave_js", data={"url": "ws://test.org"})
entry.add_to_hass(hass)

View file

@ -2869,3 +2869,19 @@ def test_one_time_listener_repr(hass: HomeAssistant) -> None:
assert "OneTimeListener" in repr_str
assert "test_core" in repr_str
assert "_listener" in repr_str
async def test_async_add_import_executor_job(hass: HomeAssistant) -> None:
"""Test async_add_import_executor_job works and is limited to one thread."""
evt = threading.Event()
loop = asyncio.get_running_loop()
def executor_func() -> None:
evt.set()
return evt
future = hass.async_add_import_executor_job(executor_func)
await loop.run_in_executor(None, evt.wait)
assert await future is evt
assert hass.import_executor._max_workers == 1