"""The image integration.""" from __future__ import annotations import asyncio import collections from contextlib import suppress from dataclasses import dataclass from datetime import datetime, timedelta import logging import os from random import SystemRandom from typing import Final, final from aiohttp import hdrs, web import httpx from propcache import cached_property import voluptuous as vol from homeassistant.components.http import KEY_AUTHENTICATED, KEY_HASS, HomeAssistantView from homeassistant.config_entries import ConfigEntry from homeassistant.const import CONTENT_TYPE_MULTIPART, EVENT_HOMEASSISTANT_STOP from homeassistant.core import ( Event, EventStateChangedData, HomeAssistant, ServiceCall, callback, ) from homeassistant.exceptions import HomeAssistantError import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entity import Entity, EntityDescription from homeassistant.helpers.entity_component import EntityComponent from homeassistant.helpers.event import ( async_track_state_change_event, async_track_time_interval, ) from homeassistant.helpers.httpx_client import get_async_client from homeassistant.helpers.typing import ( UNDEFINED, ConfigType, UndefinedType, VolDictType, ) from .const import DATA_COMPONENT, DOMAIN, IMAGE_TIMEOUT _LOGGER = logging.getLogger(__name__) SERVICE_SNAPSHOT: Final = "snapshot" ENTITY_ID_FORMAT: Final = DOMAIN + ".{}" PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA PLATFORM_SCHEMA_BASE = cv.PLATFORM_SCHEMA_BASE SCAN_INTERVAL: Final = timedelta(seconds=30) ATTR_FILENAME: Final = "filename" DEFAULT_CONTENT_TYPE: Final = "image/jpeg" ENTITY_IMAGE_URL: Final = "/api/image_proxy/{0}?token={1}" TOKEN_CHANGE_INTERVAL: Final = timedelta(minutes=5) _RND: Final = SystemRandom() GET_IMAGE_TIMEOUT: Final = 10 FRAME_BOUNDARY = "frame-boundary" FRAME_SEPARATOR = bytes(f"\r\n--{FRAME_BOUNDARY}\r\n", "utf-8") LAST_FRAME_MARKER = bytes(f"\r\n--{FRAME_BOUNDARY}--\r\n", "utf-8") IMAGE_SERVICE_SNAPSHOT: VolDictType = {vol.Required(ATTR_FILENAME): cv.string} class ImageEntityDescription(EntityDescription, frozen_or_thawed=True): """A class that describes image entities.""" @dataclass class Image: """Represent an image.""" content_type: str content: bytes class ImageContentTypeError(HomeAssistantError): """Error with the content type while loading an image.""" def valid_image_content_type(content_type: str | None) -> str: """Validate the assigned content type is one of an image.""" if content_type is None or content_type.split("/", 1)[0].lower() != "image": raise ImageContentTypeError return content_type async def _async_get_image(image_entity: ImageEntity, timeout: int) -> Image: """Fetch image from an image entity.""" with suppress(asyncio.CancelledError, TimeoutError, ImageContentTypeError): async with asyncio.timeout(timeout): if image_bytes := await image_entity.async_image(): content_type = valid_image_content_type(image_entity.content_type) return Image(content_type, image_bytes) raise HomeAssistantError("Unable to get image") async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the image component.""" component = hass.data[DATA_COMPONENT] = EntityComponent[ImageEntity]( _LOGGER, DOMAIN, hass, SCAN_INTERVAL ) hass.http.register_view(ImageView(component)) hass.http.register_view(ImageStreamView(component)) await component.async_setup(config) @callback def update_tokens(time: datetime) -> None: """Update tokens of the entities.""" for entity in component.entities: entity.async_update_token() entity.async_write_ha_state() unsub = async_track_time_interval( hass, update_tokens, TOKEN_CHANGE_INTERVAL, name="Image update tokens" ) @callback def unsub_track_time_interval(_event: Event) -> None: """Unsubscribe track time interval timer.""" unsub() hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, unsub_track_time_interval) component.async_register_entity_service( SERVICE_SNAPSHOT, IMAGE_SERVICE_SNAPSHOT, async_handle_snapshot_service ) return True async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up a config entry.""" return await hass.data[DATA_COMPONENT].async_setup_entry(entry) async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload a config entry.""" return await hass.data[DATA_COMPONENT].async_unload_entry(entry) CACHED_PROPERTIES_WITH_ATTR_ = { "content_type", "image_last_updated", "image_url", } class ImageEntity(Entity, cached_properties=CACHED_PROPERTIES_WITH_ATTR_): """The base class for image entities.""" _entity_component_unrecorded_attributes = frozenset( {"access_token", "entity_picture"} ) # Entity Properties _attr_content_type: str = DEFAULT_CONTENT_TYPE _attr_image_last_updated: datetime | None = None _attr_image_url: str | None | UndefinedType = UNDEFINED _attr_should_poll: bool = False # No need to poll image entities _attr_state: None = None # State is determined by last_updated _cached_image: Image | None = None def __init__(self, hass: HomeAssistant, verify_ssl: bool = False) -> None: """Initialize an image entity.""" self._client = get_async_client(hass, verify_ssl=verify_ssl) self.access_tokens: collections.deque = collections.deque([], 2) self.async_update_token() @cached_property def content_type(self) -> str: """Image content type.""" return self._attr_content_type @property def entity_picture(self) -> str | None: """Return a link to the image as entity picture.""" if self._attr_entity_picture is not None: return self._attr_entity_picture return ENTITY_IMAGE_URL.format(self.entity_id, self.access_tokens[-1]) @cached_property def image_last_updated(self) -> datetime | None: """Time the image was last updated.""" return self._attr_image_last_updated @cached_property def image_url(self) -> str | None | UndefinedType: """Return URL of image.""" return self._attr_image_url def image(self) -> bytes | None: """Return bytes of image.""" raise NotImplementedError async def _fetch_url(self, url: str) -> httpx.Response | None: """Fetch a URL.""" try: response = await self._client.get( url, timeout=GET_IMAGE_TIMEOUT, follow_redirects=True ) response.raise_for_status() except httpx.TimeoutException: _LOGGER.error("%s: Timeout getting image from %s", self.entity_id, url) return None except (httpx.RequestError, httpx.HTTPStatusError) as err: _LOGGER.error( "%s: Error getting new image from %s: %s", self.entity_id, url, err, ) return None return response async def _async_load_image_from_url(self, url: str) -> Image | None: """Load an image by url.""" if response := await self._fetch_url(url): content_type = response.headers.get("content-type") try: return Image( content=response.content, content_type=valid_image_content_type(content_type), ) except ImageContentTypeError: _LOGGER.error( "%s: Image from %s has invalid content type: %s", self.entity_id, url, content_type, ) return None return None async def async_image(self) -> bytes | None: """Return bytes of image.""" if self._cached_image: return self._cached_image.content if (url := self.image_url) is not UNDEFINED: if not url or (image := await self._async_load_image_from_url(url)) is None: return None self._cached_image = image self._attr_content_type = image.content_type return image.content return await self.hass.async_add_executor_job(self.image) @property @final def state(self) -> str | None: """Return the state.""" if self.image_last_updated is None: return None return self.image_last_updated.isoformat() @final @property def state_attributes(self) -> dict[str, str | None]: """Return the state attributes.""" return {"access_token": self.access_tokens[-1]} @callback def async_update_token(self) -> None: """Update the used token.""" self.access_tokens.append(hex(_RND.getrandbits(256))[2:]) class ImageView(HomeAssistantView): """View to serve an image.""" name = "api:image:image" requires_auth = False url = "/api/image_proxy/{entity_id}" def __init__(self, component: EntityComponent[ImageEntity]) -> None: """Initialize an image view.""" self.component = component async def get(self, request: web.Request, entity_id: str) -> web.StreamResponse: """Start a GET request.""" if (image_entity := self.component.get_entity(entity_id)) is None: raise web.HTTPNotFound authenticated = ( request[KEY_AUTHENTICATED] or request.query.get("token") in image_entity.access_tokens ) if not authenticated: # Attempt with invalid bearer token, raise unauthorized # so ban middleware can handle it. if hdrs.AUTHORIZATION in request.headers: raise web.HTTPUnauthorized # Invalid sigAuth or image entity access token raise web.HTTPForbidden return await self.handle(request, image_entity) async def handle( self, request: web.Request, image_entity: ImageEntity ) -> web.StreamResponse: """Serve image.""" try: image = await _async_get_image(image_entity, IMAGE_TIMEOUT) except (HomeAssistantError, ValueError) as ex: raise web.HTTPInternalServerError from ex return web.Response(body=image.content, content_type=image.content_type) async def async_get_still_stream( request: web.Request, image_entity: ImageEntity, ) -> web.StreamResponse: """Generate an HTTP multipart stream from the Image.""" response = web.StreamResponse() response.content_type = CONTENT_TYPE_MULTIPART.format(FRAME_BOUNDARY) await response.prepare(request) async def _write_frame() -> bool: img_bytes = await image_entity.async_image() if img_bytes is None: await response.write(LAST_FRAME_MARKER) return False frame = bytearray(FRAME_SEPARATOR) header = bytes( f"Content-Type: {image_entity.content_type}\r\n" f"Content-Length: {len(img_bytes)}\r\n\r\n", "utf-8", ) frame.extend(header) frame.extend(img_bytes) # Chrome shows the n-1 frame so send the frame twice # https://issues.chromium.org/issues/41199053 # https://issues.chromium.org/issues/40791855 # While this results in additional bandwidth usage, # given the low frequency of image updates, it is acceptable. frame.extend(frame) await response.write(frame) return True event = asyncio.Event() timed_out = False @callback def _async_image_state_update(_event: Event[EventStateChangedData]) -> None: """Write image to stream.""" event.set() @callback def _async_timeout_reached() -> None: """Handle timeout.""" nonlocal timed_out timed_out = True event.set() hass = request.app[KEY_HASS] loop = hass.loop remove = async_track_state_change_event( hass, image_entity.entity_id, _async_image_state_update, ) timeout_handle = None try: while True: if not await _write_frame(): return response # Ensure that an image is sent at least every 55 seconds # Otherwise some devices go blank timeout_handle = loop.call_later(55, _async_timeout_reached) await event.wait() event.clear() if not timed_out: timeout_handle.cancel() timed_out = False finally: if timeout_handle: timeout_handle.cancel() remove() class ImageStreamView(ImageView): """Image View to serve an multipart stream.""" url = "/api/image_proxy_stream/{entity_id}" name = "api:image:stream" async def handle( self, request: web.Request, image_entity: ImageEntity ) -> web.StreamResponse: """Serve image stream.""" return await async_get_still_stream(request, image_entity) async def async_handle_snapshot_service( image: ImageEntity, service_call: ServiceCall ) -> None: """Handle snapshot services calls.""" hass = image.hass snapshot_file: str = service_call.data[ATTR_FILENAME] # check if we allow to access to that file if not hass.config.is_allowed_path(snapshot_file): raise HomeAssistantError( f"Cannot write `{snapshot_file}`, no access to path; `allowlist_external_dirs` may need to be adjusted in `configuration.yaml`" ) async with asyncio.timeout(IMAGE_TIMEOUT): image_data = await image.async_image() if image_data is None: return def _write_image(to_file: str, image_data: bytes) -> None: """Executor helper to write image.""" os.makedirs(os.path.dirname(to_file), exist_ok=True) with open(to_file, "wb") as img_file: img_file.write(image_data) try: await hass.async_add_executor_job(_write_image, snapshot_file, image_data) except OSError as err: raise HomeAssistantError("Can't write image to file") from err