Start logbook stream faster (#77921)

Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
This commit is contained in:
J. Nick Koston 2022-09-09 17:16:02 -05:00 committed by GitHub
parent 8cc0b41daf
commit fcb6888f87
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 106 additions and 93 deletions

View file

@ -31,7 +31,6 @@ from .processor import EventProcessor
MAX_PENDING_LOGBOOK_EVENTS = 2048
EVENT_COALESCE_TIME = 0.35
MAX_RECORDER_WAIT = 10
# minimum size that we will split the query
BIG_QUERY_HOURS = 25
# how many hours to deliver in the first chunk when we split the query
@ -48,6 +47,7 @@ class LogbookLiveStream:
subscriptions: list[CALLBACK_TYPE]
end_time_unsub: CALLBACK_TYPE | None = None
task: asyncio.Task | None = None
wait_sync_task: asyncio.Task | None = None
@callback
@ -57,18 +57,6 @@ def async_setup(hass: HomeAssistant) -> None:
websocket_api.async_register_command(hass, ws_event_stream)
async def _async_wait_for_recorder_sync(hass: HomeAssistant) -> None:
"""Wait for the recorder to sync."""
try:
await asyncio.wait_for(
get_instance(hass).async_block_till_done(), MAX_RECORDER_WAIT
)
except asyncio.TimeoutError:
_LOGGER.debug(
"Recorder is behind more than %s seconds, starting live stream; Some results may be missing"
)
@callback
def _async_send_empty_response(
connection: ActiveConnection, msg_id: int, start_time: dt, end_time: dt | None
@ -347,8 +335,11 @@ async def ws_event_stream(
subscriptions.clear()
if live_stream.task:
live_stream.task.cancel()
if live_stream.wait_sync_task:
live_stream.wait_sync_task.cancel()
if live_stream.end_time_unsub:
live_stream.end_time_unsub()
live_stream.end_time_unsub = None
if end_time:
live_stream.end_time_unsub = async_track_point_in_utc_time(
@ -395,43 +386,6 @@ async def ws_event_stream(
partial=True,
)
await _async_wait_for_recorder_sync(hass)
if msg_id not in connection.subscriptions:
# Unsubscribe happened while waiting for recorder
return
#
# Fetch any events from the database that have
# not been committed since the original fetch
# so we can switch over to using the subscriptions
#
# We only want events that happened after the last event
# we had from the last database query or the maximum
# time we allow the recorder to be behind
#
max_recorder_behind = subscriptions_setup_complete_time - timedelta(
seconds=MAX_RECORDER_WAIT
)
second_fetch_start_time = max(
last_event_time or max_recorder_behind, max_recorder_behind
)
await _async_send_historical_events(
hass,
connection,
msg_id,
second_fetch_start_time,
subscriptions_setup_complete_time,
messages.event_message,
event_processor,
partial=False,
)
if not subscriptions:
# Unsubscribe happened while waiting for formatted events
# or there are no supported entities (all UOM or state class)
# or devices
return
live_stream.task = asyncio.create_task(
_async_events_consumer(
subscriptions_setup_complete_time,
@ -442,6 +396,34 @@ async def ws_event_stream(
)
)
if msg_id not in connection.subscriptions:
# Unsubscribe happened while sending historical events
return
live_stream.wait_sync_task = asyncio.create_task(
get_instance(hass).async_block_till_done()
)
await live_stream.wait_sync_task
#
# Fetch any events from the database that have
# not been committed since the original fetch
# so we can switch over to using the subscriptions
#
# We only want events that happened after the last event
# we had from the last database query
#
await _async_send_historical_events(
hass,
connection,
msg_id,
last_event_time or start_time,
subscriptions_setup_complete_time,
messages.event_message,
event_processor,
partial=False,
)
def _ws_formatted_get_events(
msg_id: int,

View file

@ -11,6 +11,7 @@ from homeassistant import core
from homeassistant.components import logbook, recorder
from homeassistant.components.automation import ATTR_SOURCE, EVENT_AUTOMATION_TRIGGERED
from homeassistant.components.logbook import websocket_api
from homeassistant.components.recorder.util import get_instance
from homeassistant.components.script import EVENT_SCRIPT_STARTED
from homeassistant.components.websocket_api.const import TYPE_RESULT
from homeassistant.const import (
@ -566,6 +567,15 @@ async def test_subscribe_unsubscribe_logbook_stream_excluded_entities(
assert msg["event"]["end_time"] > msg["event"]["start_time"]
assert msg["event"]["partial"] is True
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
hass.states.async_set("light.exc", STATE_ON)
hass.states.async_set("light.exc", STATE_OFF)
hass.states.async_set("switch.any", STATE_ON)
@ -588,12 +598,8 @@ async def test_subscribe_unsubscribe_logbook_stream_excluded_entities(
await hass.async_block_till_done()
hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"})
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
@ -747,6 +753,13 @@ async def test_subscribe_unsubscribe_logbook_stream_included_entities(
assert msg["event"]["end_time"] > msg["event"]["start_time"]
assert msg["event"]["partial"] is True
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
for entity_id in test_entities:
hass.states.async_set(entity_id, STATE_ON)
hass.states.async_set(entity_id, STATE_OFF)
@ -756,12 +769,7 @@ async def test_subscribe_unsubscribe_logbook_stream_included_entities(
await hass.async_block_till_done()
hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"})
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
@ -958,6 +966,14 @@ async def test_logbook_stream_excluded_entities_inherits_filters_from_recorder(
assert msg["event"]["end_time"] > msg["event"]["start_time"]
assert msg["event"]["partial"] is True
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
hass.states.async_set("light.exc", STATE_ON)
hass.states.async_set("light.exc", STATE_OFF)
hass.states.async_set("switch.any", STATE_ON)
@ -982,12 +998,7 @@ async def test_logbook_stream_excluded_entities_inherits_filters_from_recorder(
await hass.async_block_till_done()
hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"})
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
@ -1121,6 +1132,14 @@ async def test_subscribe_unsubscribe_logbook_stream(
assert msg["event"]["end_time"] > msg["event"]["start_time"]
assert msg["event"]["partial"] is True
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
hass.states.async_set("light.alpha", "on")
hass.states.async_set("light.alpha", "off")
alpha_off_state: State = hass.states.get("light.alpha")
@ -1138,12 +1157,6 @@ async def test_subscribe_unsubscribe_logbook_stream(
hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"})
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
@ -1427,12 +1440,8 @@ async def test_subscribe_unsubscribe_logbook_stream_entities(
}
]
hass.states.async_set("light.alpha", STATE_ON)
hass.states.async_set("light.alpha", STATE_OFF)
hass.states.async_set("light.small", STATE_OFF, {"effect": "help", "color": "blue"})
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
@ -1441,6 +1450,13 @@ async def test_subscribe_unsubscribe_logbook_stream_entities(
assert "partial" not in msg["event"]
assert msg["event"]["events"] == []
hass.states.async_set("light.alpha", STATE_ON)
hass.states.async_set("light.alpha", STATE_OFF)
hass.states.async_set("light.small", STATE_OFF, {"effect": "help", "color": "blue"})
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
@ -1455,6 +1471,7 @@ async def test_subscribe_unsubscribe_logbook_stream_entities(
hass.states.async_remove("light.alpha")
hass.states.async_remove("light.small")
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
await websocket_client.send_json(
@ -1520,10 +1537,7 @@ async def test_subscribe_unsubscribe_logbook_stream_entities_with_end_time(
}
]
hass.states.async_set("light.alpha", STATE_ON)
hass.states.async_set("light.alpha", STATE_OFF)
hass.states.async_set("light.small", STATE_OFF, {"effect": "help", "color": "blue"})
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
@ -1532,6 +1546,12 @@ async def test_subscribe_unsubscribe_logbook_stream_entities_with_end_time(
assert "partial" not in msg["event"]
assert msg["event"]["events"] == []
hass.states.async_set("light.alpha", STATE_ON)
hass.states.async_set("light.alpha", STATE_OFF)
hass.states.async_set("light.small", STATE_OFF, {"effect": "help", "color": "blue"})
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
@ -2176,7 +2196,6 @@ async def test_stream_consumer_stop_processing(hass, recorder_mock, hass_ws_clie
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
@patch("homeassistant.components.logbook.websocket_api.MAX_RECORDER_WAIT", 0.15)
async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplog):
"""Test we still start live streaming if the recorder is far behind."""
now = dt_util.utcnow()
@ -2250,8 +2269,6 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo
assert msg["type"] == TYPE_RESULT
assert msg["success"]
assert "Recorder is behind" in caplog.text
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
async def test_subscribe_all_entities_are_continuous(
@ -2409,16 +2426,28 @@ async def test_subscribe_entities_some_have_uom_multiple(
assert msg["type"] == TYPE_RESULT
assert msg["success"]
_cycle_entities()
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["partial"] is True
assert msg["event"]["events"] == [
{"entity_id": "sensor.keep", "state": "off", "when": ANY},
{"entity_id": "sensor.keep_two", "state": "off", "when": ANY},
]
assert msg["event"]["partial"] is True
await get_instance(hass).async_block_till_done()
await hass.async_block_till_done()
_cycle_entities()
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]
assert msg["event"]["events"] == []
_cycle_entities()
await hass.async_block_till_done()
@ -2426,7 +2455,12 @@ async def test_subscribe_entities_some_have_uom_multiple(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
assert msg["event"]["events"] == [
{"entity_id": "sensor.keep", "state": "on", "when": ANY},
{"entity_id": "sensor.keep", "state": "off", "when": ANY},
{"entity_id": "sensor.keep_two", "state": "on", "when": ANY},
{"entity_id": "sensor.keep_two", "state": "off", "when": ANY},
]
assert "partial" not in msg["event"]
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
@ -2437,11 +2471,8 @@ async def test_subscribe_entities_some_have_uom_multiple(
{"entity_id": "sensor.keep", "state": "off", "when": ANY},
{"entity_id": "sensor.keep_two", "state": "on", "when": ANY},
{"entity_id": "sensor.keep_two", "state": "off", "when": ANY},
{"entity_id": "sensor.keep", "state": "on", "when": ANY},
{"entity_id": "sensor.keep", "state": "off", "when": ANY},
{"entity_id": "sensor.keep_two", "state": "on", "when": ANY},
{"entity_id": "sensor.keep_two", "state": "off", "when": ANY},
]
assert "partial" not in msg["event"]
await websocket_client.close()