Replace periodic tasks with background tasks (#112726)
* Phase out periodic tasks * false by default or some tests will block forever, will need to fix each one manually * kwarg works * kwarg works * kwarg works * fixes * fix more tests * fix more tests * fix lifx * opensky * pvpc_hourly_pricing * adjust more * adjust more * smarttub * adjust more * adjust more * adjust more * adjust more * adjust * no eager executor * zha * qnap_qsw * fix more * fix fix * docs * its a wrapper now * add more coverage * coverage * cover all combos * more fixes * more fixes * more fixes * remaining issues are legit bugs in tests * make tplink test more predictable * more fixes * feedreader * grind out some more * make test race safe * one more
This commit is contained in:
parent
08416974c9
commit
65358c129a
48 changed files with 413 additions and 333 deletions
|
@ -375,7 +375,6 @@ class HomeAssistant:
|
|||
self.loop = asyncio.get_running_loop()
|
||||
self._tasks: set[asyncio.Future[Any]] = set()
|
||||
self._background_tasks: set[asyncio.Future[Any]] = set()
|
||||
self._periodic_tasks: set[asyncio.Future[Any]] = set()
|
||||
self.bus = EventBus(self)
|
||||
self.services = ServiceRegistry(self)
|
||||
self.states = StateMachine(self.bus, self.loop)
|
||||
|
@ -585,23 +584,38 @@ class HomeAssistant:
|
|||
@overload
|
||||
@callback
|
||||
def async_add_hass_job(
|
||||
self, hassjob: HassJob[..., Coroutine[Any, Any, _R]], *args: Any
|
||||
self,
|
||||
hassjob: HassJob[..., Coroutine[Any, Any, _R]],
|
||||
*args: Any,
|
||||
eager_start: bool = False,
|
||||
background: bool = False,
|
||||
) -> asyncio.Future[_R] | None:
|
||||
...
|
||||
|
||||
@overload
|
||||
@callback
|
||||
def async_add_hass_job(
|
||||
self, hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R], *args: Any
|
||||
self,
|
||||
hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
|
||||
*args: Any,
|
||||
eager_start: bool = False,
|
||||
background: bool = False,
|
||||
) -> asyncio.Future[_R] | None:
|
||||
...
|
||||
|
||||
@callback
|
||||
def async_add_hass_job(
|
||||
self, hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R], *args: Any
|
||||
self,
|
||||
hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
|
||||
*args: Any,
|
||||
eager_start: bool = False,
|
||||
background: bool = False,
|
||||
) -> asyncio.Future[_R] | None:
|
||||
"""Add a HassJob from within the event loop.
|
||||
|
||||
If eager_start is True, coroutine functions will be scheduled eagerly.
|
||||
If background is True, the task will created as a background task.
|
||||
|
||||
This method must be run in the event loop.
|
||||
hassjob: HassJob to call.
|
||||
args: parameters for method to call.
|
||||
|
@ -618,7 +632,14 @@ class HomeAssistant:
|
|||
)
|
||||
# Use loop.create_task
|
||||
# to avoid the extra function call in asyncio.create_task.
|
||||
task = self.loop.create_task(hassjob.target(*args), name=hassjob.name)
|
||||
if eager_start:
|
||||
task = create_eager_task(
|
||||
hassjob.target(*args), name=hassjob.name, loop=self.loop
|
||||
)
|
||||
if task.done():
|
||||
return task
|
||||
else:
|
||||
task = self.loop.create_task(hassjob.target(*args), name=hassjob.name)
|
||||
elif hassjob.job_type is HassJobType.Callback:
|
||||
if TYPE_CHECKING:
|
||||
hassjob.target = cast(Callable[..., _R], hassjob.target)
|
||||
|
@ -629,58 +650,9 @@ class HomeAssistant:
|
|||
hassjob.target = cast(Callable[..., _R], hassjob.target)
|
||||
task = self.loop.run_in_executor(None, hassjob.target, *args)
|
||||
|
||||
self._tasks.add(task)
|
||||
task.add_done_callback(self._tasks.remove)
|
||||
|
||||
return task
|
||||
|
||||
@overload
|
||||
@callback
|
||||
def async_run_periodic_hass_job(
|
||||
self, hassjob: HassJob[..., Coroutine[Any, Any, _R]], *args: Any
|
||||
) -> asyncio.Future[_R] | None:
|
||||
...
|
||||
|
||||
@overload
|
||||
@callback
|
||||
def async_run_periodic_hass_job(
|
||||
self, hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R], *args: Any
|
||||
) -> asyncio.Future[_R] | None:
|
||||
...
|
||||
|
||||
@callback
|
||||
def async_run_periodic_hass_job(
|
||||
self, hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R], *args: Any
|
||||
) -> asyncio.Future[_R] | None:
|
||||
"""Add a periodic HassJob from within the event loop.
|
||||
|
||||
This method must be run in the event loop.
|
||||
hassjob: HassJob to call.
|
||||
args: parameters for method to call.
|
||||
"""
|
||||
task: asyncio.Future[_R]
|
||||
# This code path is performance sensitive and uses
|
||||
# if TYPE_CHECKING to avoid the overhead of constructing
|
||||
# the type used for the cast. For history see:
|
||||
# https://github.com/home-assistant/core/pull/71960
|
||||
if hassjob.job_type is HassJobType.Coroutinefunction:
|
||||
if TYPE_CHECKING:
|
||||
hassjob.target = cast(
|
||||
Callable[..., Coroutine[Any, Any, _R]], hassjob.target
|
||||
)
|
||||
task = create_eager_task(hassjob.target(*args), name=hassjob.name)
|
||||
elif hassjob.job_type is HassJobType.Callback:
|
||||
if TYPE_CHECKING:
|
||||
hassjob.target = cast(Callable[..., _R], hassjob.target)
|
||||
hassjob.target(*args)
|
||||
return None
|
||||
else:
|
||||
if TYPE_CHECKING:
|
||||
hassjob.target = cast(Callable[..., _R], hassjob.target)
|
||||
task = self.loop.run_in_executor(None, hassjob.target, *args)
|
||||
|
||||
self._periodic_tasks.add(task)
|
||||
task.add_done_callback(self._periodic_tasks.remove)
|
||||
task_bucket = self._background_tasks if background else self._tasks
|
||||
task_bucket.add(task)
|
||||
task.add_done_callback(task_bucket.remove)
|
||||
|
||||
return task
|
||||
|
||||
|
@ -751,37 +723,6 @@ class HomeAssistant:
|
|||
task.add_done_callback(self._background_tasks.remove)
|
||||
return task
|
||||
|
||||
@callback
|
||||
def async_create_periodic_task(
|
||||
self, target: Coroutine[Any, Any, _R], name: str, eager_start: bool = False
|
||||
) -> asyncio.Task[_R]:
|
||||
"""Create a task from within the event loop.
|
||||
|
||||
This type of task is typically used for polling.
|
||||
|
||||
A periodic task is different from a normal task:
|
||||
|
||||
- Will not block startup
|
||||
- Will be automatically cancelled on shutdown
|
||||
- Calls to async_block_till_done will wait for completion by default
|
||||
|
||||
If you are using this in your integration, use the create task
|
||||
methods on the config entry instead.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
if eager_start:
|
||||
task = create_eager_task(target, name=name, loop=self.loop)
|
||||
if task.done():
|
||||
return task
|
||||
else:
|
||||
# Use loop.create_task
|
||||
# to avoid the extra function call in asyncio.create_task.
|
||||
task = self.loop.create_task(target, name=name)
|
||||
self._periodic_tasks.add(task)
|
||||
task.add_done_callback(self._periodic_tasks.remove)
|
||||
return task
|
||||
|
||||
@callback
|
||||
def async_add_executor_job(
|
||||
self, target: Callable[..., _T], *args: Any
|
||||
|
@ -806,25 +747,40 @@ class HomeAssistant:
|
|||
@overload
|
||||
@callback
|
||||
def async_run_hass_job(
|
||||
self, hassjob: HassJob[..., Coroutine[Any, Any, _R]], *args: Any
|
||||
self,
|
||||
hassjob: HassJob[..., Coroutine[Any, Any, _R]],
|
||||
*args: Any,
|
||||
eager_start: bool = False,
|
||||
background: bool = False,
|
||||
) -> asyncio.Future[_R] | None:
|
||||
...
|
||||
|
||||
@overload
|
||||
@callback
|
||||
def async_run_hass_job(
|
||||
self, hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R], *args: Any
|
||||
self,
|
||||
hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
|
||||
*args: Any,
|
||||
eager_start: bool = False,
|
||||
background: bool = False,
|
||||
) -> asyncio.Future[_R] | None:
|
||||
...
|
||||
|
||||
@callback
|
||||
def async_run_hass_job(
|
||||
self, hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R], *args: Any
|
||||
self,
|
||||
hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
|
||||
*args: Any,
|
||||
eager_start: bool = False,
|
||||
background: bool = False,
|
||||
) -> asyncio.Future[_R] | None:
|
||||
"""Run a HassJob from within the event loop.
|
||||
|
||||
This method must be run in the event loop.
|
||||
|
||||
If eager_start is True, coroutine functions will be scheduled eagerly.
|
||||
If background is True, the task will created as a background task.
|
||||
|
||||
hassjob: HassJob
|
||||
args: parameters for method to call.
|
||||
"""
|
||||
|
@ -838,7 +794,9 @@ class HomeAssistant:
|
|||
hassjob.target(*args)
|
||||
return None
|
||||
|
||||
return self.async_add_hass_job(hassjob, *args)
|
||||
return self.async_add_hass_job(
|
||||
hassjob, *args, eager_start=eager_start, background=background
|
||||
)
|
||||
|
||||
@overload
|
||||
@callback
|
||||
|
@ -891,7 +849,7 @@ class HomeAssistant:
|
|||
self.async_block_till_done(), self.loop
|
||||
).result()
|
||||
|
||||
async def async_block_till_done(self, wait_periodic_tasks: bool = True) -> None:
|
||||
async def async_block_till_done(self, wait_background_tasks: bool = False) -> None:
|
||||
"""Block until all pending work is done."""
|
||||
# To flush out any call_soon_threadsafe
|
||||
await asyncio.sleep(0)
|
||||
|
@ -900,8 +858,8 @@ class HomeAssistant:
|
|||
while tasks := [
|
||||
task
|
||||
for task in (
|
||||
self._tasks | self._periodic_tasks
|
||||
if wait_periodic_tasks
|
||||
self._tasks | self._background_tasks
|
||||
if wait_background_tasks
|
||||
else self._tasks
|
||||
)
|
||||
if task is not current_task and not cancelling(task)
|
||||
|
@ -1034,7 +992,7 @@ class HomeAssistant:
|
|||
self._tasks = set()
|
||||
|
||||
# Cancel all background tasks
|
||||
for task in self._background_tasks | self._periodic_tasks:
|
||||
for task in self._background_tasks:
|
||||
self._tasks.add(task)
|
||||
task.add_done_callback(self._tasks.remove)
|
||||
task.cancel("Home Assistant is stopping")
|
||||
|
@ -1046,7 +1004,7 @@ class HomeAssistant:
|
|||
self.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
|
||||
try:
|
||||
async with self.timeout.async_timeout(STOP_STAGE_SHUTDOWN_TIMEOUT):
|
||||
await self.async_block_till_done(wait_periodic_tasks=False)
|
||||
await self.async_block_till_done()
|
||||
except TimeoutError:
|
||||
_LOGGER.warning(
|
||||
"Timed out waiting for integrations to stop, the shutdown will"
|
||||
|
@ -1059,7 +1017,7 @@ class HomeAssistant:
|
|||
self.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
|
||||
try:
|
||||
async with self.timeout.async_timeout(FINAL_WRITE_STAGE_SHUTDOWN_TIMEOUT):
|
||||
await self.async_block_till_done(wait_periodic_tasks=False)
|
||||
await self.async_block_till_done()
|
||||
except TimeoutError:
|
||||
_LOGGER.warning(
|
||||
"Timed out waiting for final writes to complete, the shutdown will"
|
||||
|
@ -1111,7 +1069,7 @@ class HomeAssistant:
|
|||
|
||||
try:
|
||||
async with self.timeout.async_timeout(CLOSE_STAGE_SHUTDOWN_TIMEOUT):
|
||||
await self.async_block_till_done(wait_periodic_tasks=False)
|
||||
await self.async_block_till_done()
|
||||
except TimeoutError:
|
||||
_LOGGER.warning(
|
||||
"Timed out waiting for close event to be processed, the shutdown will"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue