Refactor ESPHome camera to avoid creating tasks (#95818)
This commit is contained in:
parent
3d064b7d6b
commit
e39f023e3f
2 changed files with 36 additions and 29 deletions
|
@ -2,6 +2,8 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
from collections.abc import Callable, Coroutine
|
||||||
|
from functools import partial
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from aioesphomeapi import CameraInfo, CameraState
|
from aioesphomeapi import CameraInfo, CameraState
|
||||||
|
@ -40,48 +42,56 @@ class EsphomeCamera(Camera, EsphomeEntity[CameraInfo, CameraState]):
|
||||||
"""Initialize."""
|
"""Initialize."""
|
||||||
Camera.__init__(self)
|
Camera.__init__(self)
|
||||||
EsphomeEntity.__init__(self, *args, **kwargs)
|
EsphomeEntity.__init__(self, *args, **kwargs)
|
||||||
self._image_cond = asyncio.Condition()
|
self._loop = asyncio.get_running_loop()
|
||||||
|
self._image_futures: list[asyncio.Future[bool | None]] = []
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def _set_futures(self, result: bool) -> None:
|
||||||
|
"""Set futures to done."""
|
||||||
|
for future in self._image_futures:
|
||||||
|
if not future.done():
|
||||||
|
future.set_result(result)
|
||||||
|
self._image_futures.clear()
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def _on_device_update(self) -> None:
|
||||||
|
"""Handle device going available or unavailable."""
|
||||||
|
super()._on_device_update()
|
||||||
|
if not self.available:
|
||||||
|
self._set_futures(False)
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def _on_state_update(self) -> None:
|
def _on_state_update(self) -> None:
|
||||||
"""Notify listeners of new image when update arrives."""
|
"""Notify listeners of new image when update arrives."""
|
||||||
super()._on_state_update()
|
super()._on_state_update()
|
||||||
self.hass.async_create_task(self._on_state_update_coro())
|
self._set_futures(True)
|
||||||
|
|
||||||
async def _on_state_update_coro(self) -> None:
|
|
||||||
async with self._image_cond:
|
|
||||||
self._image_cond.notify_all()
|
|
||||||
|
|
||||||
async def async_camera_image(
|
async def async_camera_image(
|
||||||
self, width: int | None = None, height: int | None = None
|
self, width: int | None = None, height: int | None = None
|
||||||
) -> bytes | None:
|
) -> bytes | None:
|
||||||
"""Return single camera image bytes."""
|
"""Return single camera image bytes."""
|
||||||
if not self.available:
|
return await self._async_request_image(self._client.request_single_image)
|
||||||
return None
|
|
||||||
await self._client.request_single_image()
|
|
||||||
async with self._image_cond:
|
|
||||||
await self._image_cond.wait()
|
|
||||||
if not self.available:
|
|
||||||
# Availability can change while waiting for 'self._image.cond'
|
|
||||||
return None # type: ignore[unreachable]
|
|
||||||
return self._state.data[:]
|
|
||||||
|
|
||||||
async def _async_camera_stream_image(self) -> bytes | None:
|
async def _async_request_image(
|
||||||
"""Return a single camera image in a stream."""
|
self, request_method: Callable[[], Coroutine[Any, Any, None]]
|
||||||
|
) -> bytes | None:
|
||||||
|
"""Wait for an image to be available and return it."""
|
||||||
if not self.available:
|
if not self.available:
|
||||||
return None
|
return None
|
||||||
await self._client.request_image_stream()
|
image_future = self._loop.create_future()
|
||||||
async with self._image_cond:
|
self._image_futures.append(image_future)
|
||||||
await self._image_cond.wait()
|
await request_method()
|
||||||
if not self.available:
|
if not await image_future:
|
||||||
# Availability can change while waiting for 'self._image.cond'
|
return None
|
||||||
return None # type: ignore[unreachable]
|
return self._state.data
|
||||||
return self._state.data[:]
|
|
||||||
|
|
||||||
async def handle_async_mjpeg_stream(
|
async def handle_async_mjpeg_stream(
|
||||||
self, request: web.Request
|
self, request: web.Request
|
||||||
) -> web.StreamResponse:
|
) -> web.StreamResponse:
|
||||||
"""Serve an HTTP MJPEG stream from the camera."""
|
"""Serve an HTTP MJPEG stream from the camera."""
|
||||||
return await camera.async_get_still_stream(
|
stream_request = partial(
|
||||||
request, self._async_camera_stream_image, camera.DEFAULT_CONTENT_TYPE, 0.0
|
self._async_request_image, self._client.request_image_stream
|
||||||
|
)
|
||||||
|
return await camera.async_get_still_stream(
|
||||||
|
request, stream_request, camera.DEFAULT_CONTENT_TYPE, 0.0
|
||||||
)
|
)
|
||||||
|
|
|
@ -149,9 +149,6 @@ async def test_camera_single_image_unavailable_during_request(
|
||||||
|
|
||||||
async def _mock_camera_image():
|
async def _mock_camera_image():
|
||||||
await mock_device.mock_disconnect(False)
|
await mock_device.mock_disconnect(False)
|
||||||
# Currently there is a bug where the camera will block
|
|
||||||
# forever if we don't send a response
|
|
||||||
mock_device.set_state(CameraState(key=1, data=SMALLEST_VALID_JPEG_BYTES))
|
|
||||||
|
|
||||||
mock_client.request_single_image = _mock_camera_image
|
mock_client.request_single_image = _mock_camera_image
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue