Chunk large logbook queries and add an end_time to the api so we stop sending events (#72351)

This commit is contained in:
J. Nick Koston 2022-05-23 17:40:00 -05:00 committed by GitHub
parent 52808562ab
commit 9d95b9ab05
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 483 additions and 65 deletions

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime as dt, timedelta
import logging
from typing import Any
@ -15,6 +16,7 @@ from homeassistant.components.websocket_api import messages
from homeassistant.components.websocket_api.connection import ActiveConnection
from homeassistant.components.websocket_api.const import JSON_DUMP
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
from homeassistant.helpers.event import async_track_point_in_utc_time
import homeassistant.util.dt as dt_util
from .helpers import (
@ -26,12 +28,26 @@ from .models import async_event_to_row
from .processor import EventProcessor
MAX_PENDING_LOGBOOK_EVENTS = 2048
EVENT_COALESCE_TIME = 0.5
EVENT_COALESCE_TIME = 0.35
MAX_RECORDER_WAIT = 10
# minimum size that we will split the query
BIG_QUERY_HOURS = 25
# how many hours to deliver in the first chunk when we split the query
BIG_QUERY_RECENT_HOURS = 24
_LOGGER = logging.getLogger(__name__)
@dataclass
class LogbookLiveStream:
"""Track a logbook live stream."""
stream_queue: asyncio.Queue[Event]
subscriptions: list[CALLBACK_TYPE]
end_time_unsub: CALLBACK_TYPE | None = None
task: asyncio.Task | None = None
@callback
def async_setup(hass: HomeAssistant) -> None:
"""Set up the logbook websocket API."""
@ -39,6 +55,94 @@ def async_setup(hass: HomeAssistant) -> None:
websocket_api.async_register_command(hass, ws_event_stream)
async def _async_wait_for_recorder_sync(hass: HomeAssistant) -> None:
"""Wait for the recorder to sync."""
try:
await asyncio.wait_for(
get_instance(hass).async_block_till_done(), MAX_RECORDER_WAIT
)
except asyncio.TimeoutError:
_LOGGER.debug(
"Recorder is behind more than %s seconds, starting live stream; Some results may be missing"
)
async def _async_send_historical_events(
hass: HomeAssistant,
connection: ActiveConnection,
msg_id: int,
start_time: dt,
end_time: dt,
formatter: Callable[[int, Any], dict[str, Any]],
event_processor: EventProcessor,
) -> dt | None:
"""Select historical data from the database and deliver it to the websocket.
If the query is considered a big query we will split the request into
two chunks so that they get the recent events first and the select
that is expected to take a long time comes in after to ensure
they are not stuck at a loading screen and can start looking at
the data right away.
This function returns the time of the most recent event we sent to the
websocket.
"""
is_big_query = (
not event_processor.entity_ids
and not event_processor.device_ids
and ((end_time - start_time) > timedelta(hours=BIG_QUERY_HOURS))
)
if not is_big_query:
message, last_event_time = await _async_get_ws_formatted_events(
hass,
msg_id,
start_time,
end_time,
formatter,
event_processor,
)
# If there is no last_time, there are no historical
# results, but we still send an empty message so
# consumers of the api know their request was
# answered but there were no results
connection.send_message(message)
return last_event_time
# This is a big query so we deliver
# the first three hours and then
# we fetch the old data
recent_query_start = end_time - timedelta(hours=BIG_QUERY_RECENT_HOURS)
recent_message, recent_query_last_event_time = await _async_get_ws_formatted_events(
hass,
msg_id,
recent_query_start,
end_time,
formatter,
event_processor,
)
if recent_query_last_event_time:
connection.send_message(recent_message)
older_message, older_query_last_event_time = await _async_get_ws_formatted_events(
hass,
msg_id,
start_time,
recent_query_start,
formatter,
event_processor,
)
# If there is no last_time, there are no historical
# results, but we still send an empty message so
# consumers of the api know their request was
# answered but there were no results
if older_query_last_event_time or not recent_query_last_event_time:
connection.send_message(older_message)
# Returns the time of the newest event
return recent_query_last_event_time or older_query_last_event_time
async def _async_get_ws_formatted_events(
hass: HomeAssistant,
msg_id: int,
@ -75,14 +179,13 @@ def _ws_formatted_get_events(
async def _async_events_consumer(
setup_complete_future: asyncio.Future[dt],
subscriptions_setup_complete_time: dt,
connection: ActiveConnection,
msg_id: int,
stream_queue: asyncio.Queue[Event],
event_processor: EventProcessor,
) -> None:
"""Stream events from the queue."""
subscriptions_setup_complete_time = await setup_complete_future
event_processor.switch_to_live()
while True:
@ -116,6 +219,7 @@ async def _async_events_consumer(
{
vol.Required("type"): "logbook/event_stream",
vol.Required("start_time"): str,
vol.Optional("end_time"): str,
vol.Optional("entity_ids"): [str],
vol.Optional("device_ids"): [str],
}
@ -126,21 +230,32 @@ async def ws_event_stream(
) -> None:
"""Handle logbook stream events websocket command."""
start_time_str = msg["start_time"]
msg_id: int = msg["id"]
utc_now = dt_util.utcnow()
if start_time := dt_util.parse_datetime(start_time_str):
start_time = dt_util.as_utc(start_time)
if not start_time or start_time > utc_now:
connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time")
connection.send_error(msg_id, "invalid_start_time", "Invalid start_time")
return
end_time_str = msg.get("end_time")
end_time: dt | None = None
if end_time_str:
if not (end_time := dt_util.parse_datetime(end_time_str)):
connection.send_error(msg_id, "invalid_end_time", "Invalid end_time")
return
end_time = dt_util.as_utc(end_time)
if end_time < start_time:
connection.send_error(msg_id, "invalid_end_time", "Invalid end_time")
return
device_ids = msg.get("device_ids")
entity_ids = msg.get("entity_ids")
if entity_ids:
entity_ids = async_filter_entities(hass, entity_ids)
event_types = async_determine_event_types(hass, entity_ids, device_ids)
event_processor = EventProcessor(
hass,
event_types,
@ -151,26 +266,43 @@ async def ws_event_stream(
include_entity_name=False,
)
stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_LOGBOOK_EVENTS)
subscriptions: list[CALLBACK_TYPE] = []
setup_complete_future: asyncio.Future[dt] = asyncio.Future()
task = asyncio.create_task(
_async_events_consumer(
setup_complete_future,
if end_time and end_time <= utc_now:
# Not live stream but we it might be a big query
connection.subscriptions[msg_id] = callback(lambda: None)
connection.send_result(msg_id)
# Fetch everything from history
await _async_send_historical_events(
hass,
connection,
msg["id"],
stream_queue,
msg_id,
start_time,
end_time,
messages.event_message,
event_processor,
)
return
subscriptions: list[CALLBACK_TYPE] = []
stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_LOGBOOK_EVENTS)
live_stream = LogbookLiveStream(
subscriptions=subscriptions, stream_queue=stream_queue
)
def _unsub() -> None:
@callback
def _unsub(*time: Any) -> None:
"""Unsubscribe from all events."""
for subscription in subscriptions:
subscription()
subscriptions.clear()
if task:
task.cancel()
if live_stream.task:
live_stream.task.cancel()
if live_stream.end_time_unsub:
live_stream.end_time_unsub()
if end_time:
live_stream.end_time_unsub = async_track_point_in_utc_time(
hass, _unsub, end_time
)
@callback
def _queue_or_cancel(event: Event) -> None:
@ -188,33 +320,21 @@ async def ws_event_stream(
hass, subscriptions, _queue_or_cancel, event_types, entity_ids, device_ids
)
subscriptions_setup_complete_time = dt_util.utcnow()
connection.subscriptions[msg["id"]] = _unsub
connection.send_result(msg["id"])
connection.subscriptions[msg_id] = _unsub
connection.send_result(msg_id)
# Fetch everything from history
message, last_event_time = await _async_get_ws_formatted_events(
last_event_time = await _async_send_historical_events(
hass,
msg["id"],
connection,
msg_id,
start_time,
subscriptions_setup_complete_time,
messages.event_message,
event_processor,
)
# If there is no last_time there are no historical
# results, but we still send an empty message so
# consumers of the api know their request was
# answered but there were no results
connection.send_message(message)
try:
await asyncio.wait_for(
get_instance(hass).async_block_till_done(), MAX_RECORDER_WAIT
)
except asyncio.TimeoutError:
_LOGGER.debug(
"Recorder is behind more than %s seconds, starting live stream; Some results may be missing"
)
if setup_complete_future.cancelled():
await _async_wait_for_recorder_sync(hass)
if not subscriptions:
# Unsubscribe happened while waiting for recorder
return
@ -235,7 +355,7 @@ async def ws_event_stream(
)
message, final_cutoff_time = await _async_get_ws_formatted_events(
hass,
msg["id"],
msg_id,
second_fetch_start_time,
subscriptions_setup_complete_time,
messages.event_message,
@ -244,9 +364,19 @@ async def ws_event_stream(
if final_cutoff_time: # Only sends results if we have them
connection.send_message(message)
if not setup_complete_future.cancelled():
if not subscriptions:
# Unsubscribe happened while waiting for formatted events
setup_complete_future.set_result(subscriptions_setup_complete_time)
return
live_stream.task = asyncio.create_task(
_async_events_consumer(
subscriptions_setup_complete_time,
connection,
msg_id,
stream_queue,
event_processor,
)
)
@websocket_api.websocket_command(