Generate large history responses in the executor (#89606)

This commit is contained in:
J. Nick Koston 2023-03-12 15:32:26 -10:00 committed by GitHub
parent 41b4c5532d
commit 977a07de13
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -189,21 +189,78 @@ def _async_send_empty_response(
"""Send an empty response when we know all results are filtered away."""
connection.send_result(msg_id)
stream_end_time = end_time or dt_util.utcnow()
_async_send_response(connection, msg_id, start_time, stream_end_time, {})
connection.send_message(
_generate_websocket_response(msg_id, start_time, stream_end_time, {})
)
@callback
def _async_send_response(
connection: ActiveConnection,
def _generate_websocket_response(
msg_id: int,
start_time: dt,
end_time: dt,
states: MutableMapping[str, list[dict[str, Any]]],
) -> None:
"""Send a response."""
empty_stream_message = _generate_stream_message(states, start_time, end_time)
empty_response = messages.event_message(msg_id, empty_stream_message)
connection.send_message(JSON_DUMP(empty_response))
) -> str:
"""Generate a websocket response."""
return JSON_DUMP(
messages.event_message(
msg_id, _generate_stream_message(states, start_time, end_time)
)
)
def _generate_historical_response(
hass: HomeAssistant,
msg_id: int,
start_time: dt,
end_time: dt,
entity_ids: list[str] | None,
filters: Filters | None,
include_start_time_state: bool,
significant_changes_only: bool,
minimal_response: bool,
no_attributes: bool,
send_empty: bool,
) -> tuple[float, dt | None, str | None]:
"""Generate a historical response."""
states = cast(
MutableMapping[str, list[dict[str, Any]]],
history.get_significant_states(
hass,
start_time,
end_time,
entity_ids,
filters,
include_start_time_state,
significant_changes_only,
minimal_response,
no_attributes,
True,
),
)
last_time_ts = 0.0
for state_list in states.values():
if (
state_list
and (state_last_time := state_list[-1][COMPRESSED_STATE_LAST_UPDATED])
> last_time_ts
):
last_time_ts = cast(float, state_last_time)
if last_time_ts == 0:
# If we did not send any states ever, we need to send an empty response
# so the websocket client knows it should render/process/consume the
# data.
if not send_empty:
return last_time_ts, None, None
last_time_dt = end_time
else:
last_time_dt = dt_util.utc_from_timestamp(last_time_ts)
return (
last_time_ts,
last_time_dt,
_generate_websocket_response(msg_id, start_time, last_time_dt, states),
)
async def _async_send_historical_states(
@ -221,43 +278,24 @@ async def _async_send_historical_states(
send_empty: bool,
) -> dt | None:
"""Fetch history significant_states and send them to the client."""
states = cast(
MutableMapping[str, list[dict[str, Any]]],
await get_instance(hass).async_add_executor_job(
history.get_significant_states,
hass,
start_time,
end_time,
entity_ids,
filters,
include_start_time_state,
significant_changes_only,
minimal_response,
no_attributes,
True,
),
instance = get_instance(hass)
last_time_ts, last_time_dt, payload = await instance.async_add_executor_job(
_generate_historical_response,
hass,
msg_id,
start_time,
end_time,
entity_ids,
filters,
include_start_time_state,
significant_changes_only,
minimal_response,
no_attributes,
send_empty,
)
last_time = 0
for state_list in states.values():
if (
state_list
and (state_last_time := state_list[-1][COMPRESSED_STATE_LAST_UPDATED])
> last_time
):
last_time = state_last_time
if last_time == 0:
# If we did not send any states ever, we need to send an empty response
# so the websocket client knows it should render/process/consume the
# data.
if not send_empty:
return None
last_time_dt = end_time
else:
last_time_dt = dt_util.utc_from_timestamp(last_time)
_async_send_response(connection, msg_id, start_time, last_time_dt, states)
return last_time_dt if last_time != 0 else None
if payload:
connection.send_message(payload)
return last_time_dt if last_time_ts != 0 else None
def _history_compressed_state(state: State, no_attributes: bool) -> dict[str, Any]: