Compare commits

...
Sign in to create a new pull request.

4 commits

Author SHA1 Message Date
Michael Hansen
db1745aa85
Update homeassistant/util/async_.py
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
2024-08-27 14:00:44 -05:00
Michael Hansen
0e4c21d1c2
Update homeassistant/util/async_.py
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
2024-08-27 13:11:02 -05:00
Michael Hansen
d2a078601c
Merge branch 'dev' into synesthesiam-20240827-queue-to-iterable 2024-08-27 12:08:37 -05:00
Michael Hansen
dc83187261 Add queue_to_iterable util function 2024-08-27 11:32:37 -05:00
2 changed files with 63 additions and 1 deletions

View file

@ -5,22 +5,28 @@ from __future__ import annotations
from asyncio import (
AbstractEventLoop,
Future,
Queue,
Semaphore,
Task,
TimerHandle,
gather,
get_running_loop,
timeout as async_timeout,
)
from collections.abc import Awaitable, Callable, Coroutine
from collections.abc import AsyncIterable, Awaitable, Callable, Coroutine
import concurrent.futures
import logging
import threading
from typing import Any
from typing_extensions import TypeVar
_LOGGER = logging.getLogger(__name__)
_SHUTDOWN_RUN_CALLBACK_THREADSAFE = "_shutdown_run_callback_threadsafe"
_DataT = TypeVar("_DataT", default=Any)
def create_eager_task[_T](
coro: Coroutine[Any, Any, _T],
@ -138,3 +144,19 @@ def get_scheduled_timer_handles(loop: AbstractEventLoop) -> list[TimerHandle]:
"""Return a list of scheduled TimerHandles."""
handles: list[TimerHandle] = loop._scheduled # type: ignore[attr-defined] # noqa: SLF001
return handles
async def queue_to_iterable(
queue: Queue[_DataT | None], timeout: float | None = None
) -> AsyncIterable[_DataT]:
"""Stream items from a queue until None with an optional timeout per item."""
if timeout is None:
while (item := await queue.get()) is not None:
yield item
else:
while True:
async with async_timeout(timeout):
item = await queue.get()
if item is None:
break
yield item

View file

@ -213,3 +213,43 @@ async def test_get_scheduled_timer_handles(hass: HomeAssistant) -> None:
timer_handle.cancel()
timer_handle2.cancel()
timer_handle3.cancel()
async def test_queue_to_iterable() -> None:
"""Test queue_to_iterable."""
queue: asyncio.Queue[int | None] = asyncio.Queue()
expected_items = list(range(10))
for i in expected_items:
await queue.put(i)
# Will terminate the stream
await queue.put(None)
actual_items = [item async for item in hasync.queue_to_iterable(queue)]
assert expected_items == actual_items
# Check timeout
assert queue.empty()
# Time out on first item
async with asyncio.timeout(1):
with pytest.raises(asyncio.TimeoutError): # noqa: PT012
# Should time out very quickly
async for _item in hasync.queue_to_iterable(queue, timeout=0.01):
await asyncio.sleep(1)
# Check timeout on second item
assert queue.empty()
await queue.put(12345)
# Time out on second item
async with asyncio.timeout(1):
with pytest.raises(asyncio.TimeoutError): # noqa: PT012
# Should time out very quickly
async for item in hasync.queue_to_iterable(queue, timeout=0.01):
if item != 12345:
await asyncio.sleep(1)
assert queue.empty()