Await in sequence when we know we will be blocked by the entity platform semaphore (#93649)

* Defer creating tasks we know are going to wait on the entity platform semaphore

When looking at whats going on with aiomonitor-ng, I noticed
we end up creating a lot of tasks that block waiting for the
executor because of the entity platform parallel_updates semaphore.

When we know the tasks are going to block we now await them
in sequence to avoid feeding the loop a herd of tasks that
will block on the semaphore

* change during iteration fix

* change during iteration fix

* cleanup

* cleanup

* fix vizio test
This commit is contained in:
J. Nick Koston 2023-05-27 18:58:02 -05:00 committed by GitHub
parent a56b5994e5
commit b966ff7a3d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 6 deletions

View file

@ -136,6 +136,7 @@ class EntityPlatform:
self._process_updates: asyncio.Lock | None = None
self.parallel_updates: asyncio.Semaphore | None = None
self._update_in_parallel: bool = True
# Platform is None for the EntityComponent "catch-all" EntityPlatform
# which powers entity_component.add_entities
@ -186,6 +187,7 @@ class EntityPlatform:
if parallel_updates is not None:
self.parallel_updates = asyncio.Semaphore(parallel_updates)
self._update_in_parallel = parallel_updates != 1
return self.parallel_updates
@ -844,13 +846,20 @@ class EntityPlatform:
return
async with self._process_updates:
tasks: list[Coroutine[Any, Any, None]] = []
for entity in self.entities.values():
if not entity.should_poll:
continue
tasks.append(entity.async_update_ha_state(True))
if self._update_in_parallel or len(self.entities) <= 1:
# If we know are going to update sequentially, we want to update
# to avoid scheduling the coroutines as tasks that will we know
# are going to wait on the semaphore lock.
for entity in list(self.entities.values()):
if entity.should_poll and entity.hass:
await entity.async_update_ha_state(True)
return
if tasks:
if tasks := [
entity.async_update_ha_state(True)
for entity in self.entities.values()
if entity.should_poll
]:
await asyncio.gather(*tasks)

View file

@ -746,6 +746,8 @@ async def test_apps_update(
"homeassistant.components.vizio.gen_apps_list_from_url",
return_value=APP_LIST,
):
async_fire_time_changed(hass, dt_util.now() + timedelta(days=2))
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.now() + timedelta(days=2))
await hass.async_block_till_done()
# Check source list, remove TV inputs, and verify that the integration is