hass-core/homeassistant/components/logbook/websocket_api.py

320 lines
9.9 KiB
Python
Raw Normal View History

"""Event parser and human readable log generator."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from datetime import datetime as dt, timedelta
import logging
from typing import Any
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.components.recorder import get_instance
from homeassistant.components.websocket_api import messages
from homeassistant.components.websocket_api.connection import ActiveConnection
from homeassistant.components.websocket_api.const import JSON_DUMP
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
import homeassistant.util.dt as dt_util
from .helpers import (
async_determine_event_types,
async_filter_entities,
async_subscribe_events,
)
from .models import async_event_to_row
from .processor import EventProcessor
MAX_PENDING_LOGBOOK_EVENTS = 2048
EVENT_COALESCE_TIME = 0.5
MAX_RECORDER_WAIT = 10
_LOGGER = logging.getLogger(__name__)
@callback
def async_setup(hass: HomeAssistant) -> None:
"""Set up the logbook websocket API."""
websocket_api.async_register_command(hass, ws_get_events)
websocket_api.async_register_command(hass, ws_event_stream)
async def _async_get_ws_formatted_events(
hass: HomeAssistant,
msg_id: int,
start_time: dt,
end_time: dt,
formatter: Callable[[int, Any], dict[str, Any]],
event_processor: EventProcessor,
) -> tuple[str, dt | None]:
"""Async wrapper around _ws_formatted_get_events."""
return await get_instance(hass).async_add_executor_job(
_ws_formatted_get_events,
msg_id,
start_time,
end_time,
formatter,
event_processor,
)
def _ws_formatted_get_events(
msg_id: int,
start_day: dt,
end_day: dt,
formatter: Callable[[int, Any], dict[str, Any]],
event_processor: EventProcessor,
) -> tuple[str, dt | None]:
"""Fetch events and convert them to json in the executor."""
events = event_processor.get_events(start_day, end_day)
last_time = None
if events:
last_time = dt_util.utc_from_timestamp(events[-1]["when"])
result = formatter(msg_id, events)
return JSON_DUMP(result), last_time
async def _async_events_consumer(
setup_complete_future: asyncio.Future[dt],
connection: ActiveConnection,
msg_id: int,
stream_queue: asyncio.Queue[Event],
event_processor: EventProcessor,
) -> None:
"""Stream events from the queue."""
subscriptions_setup_complete_time = await setup_complete_future
event_processor.switch_to_live()
while True:
events: list[Event] = [await stream_queue.get()]
# If the event is older than the last db
# event we already sent it so we skip it.
if events[0].time_fired <= subscriptions_setup_complete_time:
continue
# We sleep for the EVENT_COALESCE_TIME so
# we can group events together to minimize
# the number of websocket messages when the
# system is overloaded with an event storm
await asyncio.sleep(EVENT_COALESCE_TIME)
while not stream_queue.empty():
events.append(stream_queue.get_nowait())
if logbook_events := event_processor.humanify(
async_event_to_row(e) for e in events
):
connection.send_message(
JSON_DUMP(
messages.event_message(
msg_id,
logbook_events,
)
)
)
@websocket_api.websocket_command(
{
vol.Required("type"): "logbook/event_stream",
vol.Required("start_time"): str,
vol.Optional("entity_ids"): [str],
vol.Optional("device_ids"): [str],
}
)
@websocket_api.async_response
async def ws_event_stream(
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
) -> None:
"""Handle logbook stream events websocket command."""
start_time_str = msg["start_time"]
utc_now = dt_util.utcnow()
if start_time := dt_util.parse_datetime(start_time_str):
start_time = dt_util.as_utc(start_time)
if not start_time or start_time > utc_now:
connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time")
return
device_ids = msg.get("device_ids")
entity_ids = msg.get("entity_ids")
if entity_ids:
entity_ids = async_filter_entities(hass, entity_ids)
event_types = async_determine_event_types(hass, entity_ids, device_ids)
event_processor = EventProcessor(
hass,
event_types,
entity_ids,
device_ids,
None,
timestamp=True,
include_entity_name=False,
)
stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_LOGBOOK_EVENTS)
subscriptions: list[CALLBACK_TYPE] = []
setup_complete_future: asyncio.Future[dt] = asyncio.Future()
task = asyncio.create_task(
_async_events_consumer(
setup_complete_future,
connection,
msg["id"],
stream_queue,
event_processor,
)
)
def _unsub() -> None:
"""Unsubscribe from all events."""
for subscription in subscriptions:
subscription()
subscriptions.clear()
if task:
task.cancel()
@callback
def _queue_or_cancel(event: Event) -> None:
"""Queue an event to be processed or cancel."""
try:
stream_queue.put_nowait(event)
except asyncio.QueueFull:
_LOGGER.debug(
"Client exceeded max pending messages of %s",
MAX_PENDING_LOGBOOK_EVENTS,
)
_unsub()
async_subscribe_events(
hass, subscriptions, _queue_or_cancel, event_types, entity_ids, device_ids
)
subscriptions_setup_complete_time = dt_util.utcnow()
connection.subscriptions[msg["id"]] = _unsub
connection.send_result(msg["id"])
# Fetch everything from history
message, last_event_time = await _async_get_ws_formatted_events(
hass,
msg["id"],
start_time,
subscriptions_setup_complete_time,
messages.event_message,
event_processor,
)
# If there is no last_time there are no historical
# results, but we still send an empty message so
# consumers of the api know their request was
# answered but there were no results
connection.send_message(message)
try:
await asyncio.wait_for(
get_instance(hass).async_block_till_done(), MAX_RECORDER_WAIT
)
except asyncio.TimeoutError:
_LOGGER.debug(
"Recorder is behind more than %s seconds, starting live stream; Some results may be missing"
)
if setup_complete_future.cancelled():
# Unsubscribe happened while waiting for recorder
return
#
# Fetch any events from the database that have
# not been committed since the original fetch
# so we can switch over to using the subscriptions
#
# We only want events that happened after the last event
# we had from the last database query or the maximum
# time we allow the recorder to be behind
#
max_recorder_behind = subscriptions_setup_complete_time - timedelta(
seconds=MAX_RECORDER_WAIT
)
second_fetch_start_time = max(
last_event_time or max_recorder_behind, max_recorder_behind
)
message, final_cutoff_time = await _async_get_ws_formatted_events(
hass,
msg["id"],
second_fetch_start_time,
subscriptions_setup_complete_time,
messages.event_message,
event_processor,
)
if final_cutoff_time: # Only sends results if we have them
connection.send_message(message)
if not setup_complete_future.cancelled():
# Unsubscribe happened while waiting for formatted events
setup_complete_future.set_result(subscriptions_setup_complete_time)
@websocket_api.websocket_command(
{
vol.Required("type"): "logbook/get_events",
vol.Required("start_time"): str,
vol.Optional("end_time"): str,
vol.Optional("entity_ids"): [str],
vol.Optional("device_ids"): [str],
vol.Optional("context_id"): str,
}
)
@websocket_api.async_response
async def ws_get_events(
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
) -> None:
"""Handle logbook get events websocket command."""
start_time_str = msg["start_time"]
end_time_str = msg.get("end_time")
utc_now = dt_util.utcnow()
if start_time := dt_util.parse_datetime(start_time_str):
start_time = dt_util.as_utc(start_time)
else:
connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time")
return
if not end_time_str:
end_time = utc_now
elif parsed_end_time := dt_util.parse_datetime(end_time_str):
end_time = dt_util.as_utc(parsed_end_time)
else:
connection.send_error(msg["id"], "invalid_end_time", "Invalid end_time")
return
if start_time > utc_now:
connection.send_result(msg["id"], [])
return
device_ids = msg.get("device_ids")
entity_ids = msg.get("entity_ids")
context_id = msg.get("context_id")
if entity_ids:
entity_ids = async_filter_entities(hass, entity_ids)
if not entity_ids and not device_ids:
# Everything has been filtered away
connection.send_result(msg["id"], [])
return
event_types = async_determine_event_types(hass, entity_ids, device_ids)
event_processor = EventProcessor(
hass,
event_types,
entity_ids,
device_ids,
context_id,
timestamp=True,
include_entity_name=False,
)
message, _ = await _async_get_ws_formatted_events(
hass,
msg["id"],
start_time,
end_time,
messages.result_message,
event_processor,
)
connection.send_message(message)