Rename gather_with_concurrency
to gather_with_limited_concurrency
(#102241)
* Rename gather_with_concurrency to gather_with_limited_concurrency * Update test
This commit is contained in:
parent
4498c2e8c4
commit
dff18b4a16
7 changed files with 17 additions and 15 deletions
|
@ -7,7 +7,7 @@ from typing import Any, cast
|
||||||
from aiohttp import ClientResponseError
|
from aiohttp import ClientResponseError
|
||||||
from bond_async import Action, Bond, BondType
|
from bond_async import Action, Bond, BondType
|
||||||
|
|
||||||
from homeassistant.util.async_ import gather_with_concurrency
|
from homeassistant.util.async_ import gather_with_limited_concurrency
|
||||||
|
|
||||||
from .const import BRIDGE_MAKE
|
from .const import BRIDGE_MAKE
|
||||||
|
|
||||||
|
@ -163,7 +163,7 @@ class BondHub:
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
responses = await gather_with_concurrency(MAX_REQUESTS, *tasks)
|
responses = await gather_with_limited_concurrency(MAX_REQUESTS, *tasks)
|
||||||
response_idx = 0
|
response_idx = 0
|
||||||
for device_id in setup_device_ids:
|
for device_id in setup_device_ids:
|
||||||
self._devices.append(
|
self._devices.append(
|
||||||
|
|
|
@ -22,7 +22,7 @@ import homeassistant.helpers.config_validation as cv
|
||||||
from homeassistant.helpers.event import async_track_point_in_utc_time
|
from homeassistant.helpers.event import async_track_point_in_utc_time
|
||||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||||
from homeassistant.util import dt as dt_util
|
from homeassistant.util import dt as dt_util
|
||||||
from homeassistant.util.async_ import gather_with_concurrency
|
from homeassistant.util.async_ import gather_with_limited_concurrency
|
||||||
from homeassistant.util.process import kill_subprocess
|
from homeassistant.util.process import kill_subprocess
|
||||||
|
|
||||||
from .const import DOMAIN, ICMP_TIMEOUT, PING_ATTEMPTS_COUNT, PING_PRIVS, PING_TIMEOUT
|
from .const import DOMAIN, ICMP_TIMEOUT, PING_ATTEMPTS_COUNT, PING_PRIVS, PING_TIMEOUT
|
||||||
|
@ -117,7 +117,7 @@ async def async_setup_scanner(
|
||||||
|
|
||||||
async def async_update(now: datetime) -> None:
|
async def async_update(now: datetime) -> None:
|
||||||
"""Update all the hosts on every interval time."""
|
"""Update all the hosts on every interval time."""
|
||||||
results = await gather_with_concurrency(
|
results = await gather_with_limited_concurrency(
|
||||||
CONCURRENT_PING_LIMIT,
|
CONCURRENT_PING_LIMIT,
|
||||||
*(hass.async_add_executor_job(host.update) for host in hosts),
|
*(hass.async_add_executor_job(host.update) for host in hosts),
|
||||||
)
|
)
|
||||||
|
|
|
@ -16,7 +16,7 @@ from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
|
||||||
from homeassistant.helpers import aiohttp_client
|
from homeassistant.helpers import aiohttp_client
|
||||||
from homeassistant.helpers.entity_registry import RegistryEntry, async_migrate_entries
|
from homeassistant.helpers.entity_registry import RegistryEntry, async_migrate_entries
|
||||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||||
from homeassistant.util.async_ import gather_with_concurrency
|
from homeassistant.util.async_ import gather_with_limited_concurrency
|
||||||
|
|
||||||
from .const import DOMAIN, LOGGER
|
from .const import DOMAIN, LOGGER
|
||||||
|
|
||||||
|
@ -106,7 +106,9 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
)
|
)
|
||||||
coordinator_init_tasks.append(coordinator.async_refresh())
|
coordinator_init_tasks.append(coordinator.async_refresh())
|
||||||
|
|
||||||
await gather_with_concurrency(DEFAULT_INIT_TASK_LIMIT, *coordinator_init_tasks)
|
await gather_with_limited_concurrency(
|
||||||
|
DEFAULT_INIT_TASK_LIMIT, *coordinator_init_tasks
|
||||||
|
)
|
||||||
hass.data.setdefault(DOMAIN, {})
|
hass.data.setdefault(DOMAIN, {})
|
||||||
hass.data[DOMAIN][entry.entry_id] = TileData(coordinators=coordinators, tiles=tiles)
|
hass.data[DOMAIN][entry.entry_id] = TileData(coordinators=coordinators, tiles=tiles)
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
|
||||||
from homeassistant.helpers import config_validation as cv
|
from homeassistant.helpers import config_validation as cv
|
||||||
from homeassistant.helpers.event import async_call_later
|
from homeassistant.helpers.event import async_call_later
|
||||||
from homeassistant.helpers.typing import ConfigType
|
from homeassistant.helpers.typing import ConfigType
|
||||||
from homeassistant.util.async_ import gather_with_concurrency
|
from homeassistant.util.async_ import gather_with_limited_concurrency
|
||||||
|
|
||||||
from .const import DOMAIN
|
from .const import DOMAIN
|
||||||
from .models import WemoConfigEntryData, WemoData, async_wemo_data
|
from .models import WemoConfigEntryData, WemoData, async_wemo_data
|
||||||
|
@ -217,7 +217,7 @@ class WemoDispatcher:
|
||||||
"""Consider a platform as loaded and dispatch any backlog of discovered devices."""
|
"""Consider a platform as loaded and dispatch any backlog of discovered devices."""
|
||||||
self._dispatch_callbacks[platform] = dispatch
|
self._dispatch_callbacks[platform] = dispatch
|
||||||
|
|
||||||
await gather_with_concurrency(
|
await gather_with_limited_concurrency(
|
||||||
MAX_CONCURRENCY,
|
MAX_CONCURRENCY,
|
||||||
*(
|
*(
|
||||||
dispatch(coordinator)
|
dispatch(coordinator)
|
||||||
|
@ -289,7 +289,7 @@ class WemoDiscovery:
|
||||||
if not self._static_config:
|
if not self._static_config:
|
||||||
return
|
return
|
||||||
_LOGGER.debug("Adding statically configured WeMo devices")
|
_LOGGER.debug("Adding statically configured WeMo devices")
|
||||||
for device in await gather_with_concurrency(
|
for device in await gather_with_limited_concurrency(
|
||||||
MAX_CONCURRENCY,
|
MAX_CONCURRENCY,
|
||||||
*(
|
*(
|
||||||
self._hass.async_add_executor_job(validate_static_config, host, port)
|
self._hass.async_add_executor_job(validate_static_config, host, port)
|
||||||
|
|
|
@ -8,7 +8,7 @@ from homeassistant.const import EVENT_HOMEASSISTANT_STARTED
|
||||||
from homeassistant.core import CoreState, Event, HomeAssistant, callback
|
from homeassistant.core import CoreState, Event, HomeAssistant, callback
|
||||||
from homeassistant.data_entry_flow import FlowResult
|
from homeassistant.data_entry_flow import FlowResult
|
||||||
from homeassistant.loader import bind_hass
|
from homeassistant.loader import bind_hass
|
||||||
from homeassistant.util.async_ import gather_with_concurrency
|
from homeassistant.util.async_ import gather_with_limited_concurrency
|
||||||
|
|
||||||
FLOW_INIT_LIMIT = 2
|
FLOW_INIT_LIMIT = 2
|
||||||
DISCOVERY_FLOW_DISPATCHER = "discovery_flow_dispatcher"
|
DISCOVERY_FLOW_DISPATCHER = "discovery_flow_dispatcher"
|
||||||
|
@ -93,7 +93,7 @@ class FlowDispatcher:
|
||||||
for flow_key, flows in pending_flows.items()
|
for flow_key, flows in pending_flows.items()
|
||||||
for flow_values in flows
|
for flow_values in flows
|
||||||
]
|
]
|
||||||
await gather_with_concurrency(
|
await gather_with_limited_concurrency(
|
||||||
FLOW_INIT_LIMIT,
|
FLOW_INIT_LIMIT,
|
||||||
*[init_coro for init_coro in init_coros if init_coro is not None],
|
*[init_coro for init_coro in init_coros if init_coro is not None],
|
||||||
)
|
)
|
||||||
|
|
|
@ -171,7 +171,7 @@ def protect_loop(func: Callable[_P, _R], strict: bool = True) -> Callable[_P, _R
|
||||||
return protected_loop_func
|
return protected_loop_func
|
||||||
|
|
||||||
|
|
||||||
async def gather_with_concurrency(
|
async def gather_with_limited_concurrency(
|
||||||
limit: int, *tasks: Any, return_exceptions: bool = False
|
limit: int, *tasks: Any, return_exceptions: bool = False
|
||||||
) -> Any:
|
) -> Any:
|
||||||
"""Wrap asyncio.gather to limit the number of concurrent tasks.
|
"""Wrap asyncio.gather to limit the number of concurrent tasks.
|
||||||
|
|
|
@ -183,8 +183,8 @@ async def test_protect_loop_debugger_sleep(caplog: pytest.LogCaptureFixture) ->
|
||||||
assert "Detected blocking call inside the event loop" not in caplog.text
|
assert "Detected blocking call inside the event loop" not in caplog.text
|
||||||
|
|
||||||
|
|
||||||
async def test_gather_with_concurrency() -> None:
|
async def test_gather_with_limited_concurrency() -> None:
|
||||||
"""Test gather_with_concurrency limits the number of running tasks."""
|
"""Test gather_with_limited_concurrency limits the number of running tasks."""
|
||||||
|
|
||||||
runs = 0
|
runs = 0
|
||||||
now_time = time.time()
|
now_time = time.time()
|
||||||
|
@ -198,7 +198,7 @@ async def test_gather_with_concurrency() -> None:
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
return runs
|
return runs
|
||||||
|
|
||||||
results = await hasync.gather_with_concurrency(
|
results = await hasync.gather_with_limited_concurrency(
|
||||||
2, *(_increment_runs_if_in_time() for i in range(4))
|
2, *(_increment_runs_if_in_time() for i in range(4))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue