Run coroutines as eager tasks in async_run_hass_job (#111683)

* Run coroutines as eager tasks in async_run_hass_job

Note that this does not change async_add_hass_job

Do not merge this. For test run only

* Phase out periodic tasks

* false by default or some tests will block forever, will need to fix each one manually

* kwarg works

* kwarg works

* kwarg works

* fixes

* fix more tests

* fix more tests

* fix lifx

* opensky

* pvpc_hourly_pricing

* adjust more

* adjust more

* smarttub

* adjust more

* adjust more

* adjust more

* adjust more

* adjust

* no eager executor

* zha

* qnap_qsw

* fix more

* fix fix

* docs

* its a wrapper now

* add more coverage

* coverage

* cover all combos

* more fixes

* more fixes

* more fixes

* remaining issues are legit bugs in tests

* make tplink test more predictable

* more fixes

* feedreader

* grind out some more

* make test race safe

* limit first scope to triggers

* one more

* Start tasks eagerly in for async_at_start(ed)

A few of these can avoid being scheduled on the loop
during startup

* fix cloud

* Revert "fix cloud"

This reverts commit 5eb3ce695d.

* fix test to do what start does

* flip flag

* flip flag

* Fix here_travel_time creating many refresh requests at startup

- Each entity would try to refresh the coordinator which
  created many tasks. Move the refresh to a single
  async_at_started

- The tests fired the EVENT_HOMEASSISTANT_START event
  but the code used async_at_started which only worked
  because the tests did not set CoreState to not_running

* fix azure

* remove kw

* remove kw

* rip

* cover

* more rips

* more rips

* more rips
This commit is contained in:
J. Nick Koston 2024-03-11 14:05:08 -10:00 committed by GitHub
parent 53c3e27ed9
commit 620433a79d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 40 additions and 49 deletions

View file

@ -158,7 +158,7 @@ class AzureEventHub:
"""
logging.getLogger("azure.eventhub").setLevel(logging.WARNING)
self._listener_remover = self.hass.bus.async_listen(
MATCH_ALL, self.async_listen
MATCH_ALL, self.async_listen, run_immediately=True
)
self._schedule_next_send()

View file

@ -144,7 +144,6 @@ async def async_attach_trigger(
}
},
event.context,
eager_start=True,
)
removes = [

View file

@ -56,7 +56,6 @@ async def async_attach_trigger(
"description": "Home Assistant starting",
}
},
eager_start=True,
)
return lambda: None

View file

@ -187,7 +187,6 @@ async def async_attach_trigger(
}
},
to_s.context,
eager_start=True,
)
@callback

View file

@ -184,7 +184,6 @@ async def async_attach_trigger(
}
},
event.context,
eager_start=True,
)
if not time_delta:

View file

@ -76,7 +76,6 @@ async def async_attach_trigger(
"entity_id": entity_id,
}
},
eager_start=True,
)
@callback

View file

@ -93,7 +93,6 @@ async def async_attach_trigger(
"description": "time pattern",
}
},
eager_start=True,
)
return async_track_time_change(

View file

@ -232,7 +232,7 @@ def _async_process_callbacks(
for callback in callbacks:
try:
hass.async_run_hass_job(
callback, discovery_info, ssdp_change, eager_start=True, background=True
callback, discovery_info, ssdp_change, background=True
)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Failed to callback info: %s", discovery_info)

View file

@ -764,7 +764,6 @@ class HomeAssistant:
self,
hassjob: HassJob[..., Coroutine[Any, Any, _R]],
*args: Any,
eager_start: bool = False,
background: bool = False,
) -> asyncio.Future[_R] | None:
...
@ -775,7 +774,6 @@ class HomeAssistant:
self,
hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
*args: Any,
eager_start: bool = False,
background: bool = False,
) -> asyncio.Future[_R] | None:
...
@ -785,14 +783,12 @@ class HomeAssistant:
self,
hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
*args: Any,
eager_start: bool = False,
background: bool = False,
) -> asyncio.Future[_R] | None:
"""Run a HassJob from within the event loop.
This method must be run in the event loop.
If eager_start is True, coroutine functions will be scheduled eagerly.
If background is True, the task will created as a background task.
hassjob: HassJob
@ -809,7 +805,7 @@ class HomeAssistant:
return None
return self.async_add_hass_job(
hassjob, *args, eager_start=eager_start, background=background
hassjob, *args, eager_start=True, background=background
)
@overload
@ -847,7 +843,7 @@ class HomeAssistant:
args: parameters for method to call.
"""
if asyncio.iscoroutine(target):
return self.async_create_task(target)
return self.async_create_task(target, eager_start=True)
# This code path is performance sensitive and uses
# if TYPE_CHECKING to avoid the overhead of constructing
@ -855,7 +851,7 @@ class HomeAssistant:
# https://github.com/home-assistant/core/pull/71960
if TYPE_CHECKING:
target = cast(Callable[..., Coroutine[Any, Any, _R] | _R], target)
return self.async_run_hass_job(HassJob(target), *args, eager_start=True)
return self.async_run_hass_job(HassJob(target), *args)
def block_till_done(self) -> None:
"""Block until all pending work is done."""
@ -1369,7 +1365,7 @@ class EventBus:
continue
if run_immediately:
try:
self._hass.async_run_hass_job(job, event, eager_start=True)
self._hass.async_run_hass_job(job, event)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error running job: %s", job)
else:

View file

@ -109,7 +109,7 @@ class Debouncer(Generic[_R_co]):
assert self._job is not None
try:
if task := self.hass.async_run_hass_job(self._job, eager_start=True):
if task := self.hass.async_run_hass_job(self._job):
await task
finally:
self._schedule_timer()
@ -130,7 +130,7 @@ class Debouncer(Generic[_R_co]):
return
try:
if task := self.hass.async_run_hass_job(self._job, eager_start=True):
if task := self.hass.async_run_hass_job(self._job):
await task
except Exception: # pylint: disable=broad-except
self.logger.exception("Unexpected exception from %s", self.function)

View file

@ -50,9 +50,7 @@ def async_listen(
@core.callback
def _async_discovery_event_listener(discovered: DiscoveryDict) -> None:
"""Listen for discovery events."""
hass.async_run_hass_job(
job, discovered["service"], discovered["discovered"], eager_start=True
)
hass.async_run_hass_job(job, discovered["service"], discovered["discovered"])
async_dispatcher_connect(
hass,
@ -115,9 +113,7 @@ def async_listen_platform(
"""Listen for platform discovery events."""
if not (platform := discovered["platform"]):
return
hass.async_run_hass_job(
job, platform, discovered.get("discovered"), eager_start=True
)
hass.async_run_hass_job(job, platform, discovered.get("discovered"))
return async_dispatcher_connect(
hass,

View file

@ -225,4 +225,4 @@ def async_dispatcher_send(
if job is None:
job = _generate_job(signal, target)
target_list[target] = job
hass.async_run_hass_job(job, *args, eager_start=True)
hass.async_run_hass_job(job, *args)

View file

@ -328,7 +328,7 @@ def _async_dispatch_entity_id_event(
return
for job in callbacks_list.copy():
try:
hass.async_run_hass_job(job, event, eager_start=True)
hass.async_run_hass_job(job, event)
except Exception: # pylint: disable=broad-except
_LOGGER.exception(
"Error while dispatching event for %s to %s",
@ -1599,7 +1599,7 @@ class _TrackTimeInterval:
self._track_job,
hass.loop.time() + self.seconds,
)
hass.async_run_hass_job(self._run_job, now, eager_start=True, background=True)
hass.async_run_hass_job(self._run_job, now, background=True)
@callback
def async_cancel(self) -> None:
@ -1684,7 +1684,7 @@ class SunListener:
"""Handle solar event."""
self._unsub_sun = None
self._listen_next_sun_event()
self.hass.async_run_hass_job(self.job, eager_start=True, background=True)
self.hass.async_run_hass_job(self.job, background=True)
@callback
def _handle_config_event(self, _event: Any) -> None:
@ -1770,9 +1770,7 @@ class _TrackUTCTimeChange:
# time when the timer was scheduled
utc_now = time_tracker_utcnow()
localized_now = dt_util.as_local(utc_now) if self.local else utc_now
hass.async_run_hass_job(
self.job, localized_now, eager_start=True, background=True
)
hass.async_run_hass_job(self.job, localized_now, background=True)
if TYPE_CHECKING:
assert self._pattern_time_change_listener_job is not None
self._cancel_callback = async_track_point_in_utc_time(

View file

@ -140,7 +140,6 @@ def _process_integration_platforms(
hass,
integration.domain,
platform,
eager_start=True,
)
)
]
@ -250,7 +249,7 @@ async def _async_process_integration_platforms(
continue
if future := hass.async_run_hass_job(
process_job, hass, integration.domain, platform, eager_start=True
process_job, hass, integration.domain, platform
):
futures.append(future)

View file

@ -969,9 +969,9 @@ async def _handle_entity_call(
partial(getattr(entity, func), **data), # type: ignore[arg-type]
job_type=entity.get_hassjob_type(func),
)
task = hass.async_run_hass_job(job, eager_start=True)
task = hass.async_run_hass_job(job)
else:
task = hass.async_run_hass_job(func, entity, data, eager_start=True)
task = hass.async_run_hass_job(func, entity, data)
# Guard because callback functions do not return a task when passed to
# async_run_job.
@ -1006,7 +1006,7 @@ async def _async_admin_handler(
if not user.is_admin:
raise Unauthorized(context=call.context)
result = hass.async_run_hass_job(service_job, call, eager_start=True)
result = hass.async_run_hass_job(service_job, call)
if result is not None:
await result

View file

@ -30,7 +30,7 @@ def _async_at_core_state(
"""
at_start_job = HassJob(at_start_cb)
if check_state(hass):
hass.async_run_hass_job(at_start_job, hass, eager_start=True)
hass.async_run_hass_job(at_start_job, hass)
return lambda: None
unsub: None | CALLBACK_TYPE = None
@ -38,7 +38,7 @@ def _async_at_core_state(
@callback
def _matched_event(event: Event) -> None:
"""Call the callback when Home Assistant started."""
hass.async_run_hass_job(at_start_job, hass, eager_start=True)
hass.async_run_hass_job(at_start_job, hass)
nonlocal unsub
unsub = None

View file

@ -123,9 +123,7 @@ async def test_async_run_hass_job_eager_start_coro_suspends(
async def job_that_suspends():
await asyncio.sleep(0)
task = hass.async_run_hass_job(
ha.HassJob(ha.callback(job_that_suspends)), eager_start=True
)
task = hass.async_run_hass_job(ha.HassJob(ha.callback(job_that_suspends)))
assert not task.done()
assert task in hass._tasks
await task
@ -169,7 +167,7 @@ async def test_async_add_hass_job_eager_background(hass: HomeAssistant) -> None:
await asyncio.sleep(0)
task = hass.async_add_hass_job(
ha.HassJob(ha.callback(job_that_suspends)), eager_start=True, background=True
ha.HassJob(ha.callback(job_that_suspends)), background=True
)
assert not task.done()
assert task in hass._background_tasks
@ -184,7 +182,7 @@ async def test_async_run_hass_job_eager_background(hass: HomeAssistant) -> None:
await asyncio.sleep(0)
task = hass.async_run_hass_job(
ha.HassJob(ha.callback(job_that_suspends)), eager_start=True, background=True
ha.HassJob(ha.callback(job_that_suspends)), background=True
)
assert not task.done()
assert task in hass._background_tasks
@ -200,7 +198,6 @@ async def test_async_run_hass_job_background_synchronous(hass: HomeAssistant) ->
task = hass.async_run_hass_job(
ha.HassJob(ha.callback(job_that_does_not_suspends)),
eager_start=True,
background=True,
)
assert task.done()
@ -217,7 +214,6 @@ async def test_async_run_hass_job_synchronous(hass: HomeAssistant) -> None:
task = hass.async_run_hass_job(
ha.HassJob(ha.callback(job_that_does_not_suspends)),
eager_start=True,
background=False,
)
assert task.done()
@ -393,9 +389,7 @@ async def test_async_run_eager_hass_job_calls_callback() -> None:
asyncio.get_running_loop() # ensure we are in the event loop
calls.append(1)
ha.HomeAssistant.async_run_hass_job(
hass, ha.HassJob(ha.callback(job)), eager_start=True
)
ha.HomeAssistant.async_run_hass_job(hass, ha.HassJob(ha.callback(job)))
assert len(calls) == 1
@ -406,7 +400,7 @@ async def test_async_run_eager_hass_job_calls_coro_function() -> None:
async def job():
pass
ha.HomeAssistant.async_run_hass_job(hass, ha.HassJob(job), eager_start=True)
ha.HomeAssistant.async_run_hass_job(hass, ha.HassJob(job))
assert len(hass.async_add_hass_job.mock_calls) == 1
@ -2145,6 +2139,20 @@ async def test_async_run_job_starts_tasks_eagerly(hass: HomeAssistant) -> None:
await task
async def test_async_run_job_starts_coro_eagerly(hass: HomeAssistant) -> None:
"""Test async_run_job starts coros eagerly."""
runs = []
async def _test():
runs.append(True)
task = hass.async_run_job(_test())
# No call to hass.async_block_till_done to ensure the task is run eagerly
assert len(runs) == 1
assert task.done()
await task
def test_valid_entity_id() -> None:
"""Test valid entity ID."""
for invalid in [