Send an empty logbook response when all requested entity_ids are filtered away (#73046)
This commit is contained in:
parent
c66b000d34
commit
6b2e5858b3
2 changed files with 184 additions and 11 deletions
|
@ -67,6 +67,23 @@ async def _async_wait_for_recorder_sync(hass: HomeAssistant) -> None:
|
|||
)
|
||||
|
||||
|
||||
@callback
|
||||
def _async_send_empty_response(
|
||||
connection: ActiveConnection, msg_id: int, start_time: dt, end_time: dt | None
|
||||
) -> None:
|
||||
"""Send an empty response.
|
||||
|
||||
The current case for this is when they ask for entity_ids
|
||||
that will all be filtered away because they have UOMs or
|
||||
state_class.
|
||||
"""
|
||||
connection.send_result(msg_id)
|
||||
stream_end_time = end_time or dt_util.utcnow()
|
||||
empty_stream_message = _generate_stream_message([], start_time, stream_end_time)
|
||||
empty_response = messages.event_message(msg_id, empty_stream_message)
|
||||
connection.send_message(JSON_DUMP(empty_response))
|
||||
|
||||
|
||||
async def _async_send_historical_events(
|
||||
hass: HomeAssistant,
|
||||
connection: ActiveConnection,
|
||||
|
@ -171,6 +188,17 @@ async def _async_get_ws_stream_events(
|
|||
)
|
||||
|
||||
|
||||
def _generate_stream_message(
|
||||
events: list[dict[str, Any]], start_day: dt, end_day: dt
|
||||
) -> dict[str, Any]:
|
||||
"""Generate a logbook stream message response."""
|
||||
return {
|
||||
"events": events,
|
||||
"start_time": dt_util.utc_to_timestamp(start_day),
|
||||
"end_time": dt_util.utc_to_timestamp(end_day),
|
||||
}
|
||||
|
||||
|
||||
def _ws_stream_get_events(
|
||||
msg_id: int,
|
||||
start_day: dt,
|
||||
|
@ -184,11 +212,7 @@ def _ws_stream_get_events(
|
|||
last_time = None
|
||||
if events:
|
||||
last_time = dt_util.utc_from_timestamp(events[-1]["when"])
|
||||
message = {
|
||||
"events": events,
|
||||
"start_time": dt_util.utc_to_timestamp(start_day),
|
||||
"end_time": dt_util.utc_to_timestamp(end_day),
|
||||
}
|
||||
message = _generate_stream_message(events, start_day, end_day)
|
||||
if partial:
|
||||
# This is a hint to consumers of the api that
|
||||
# we are about to send a another block of historical
|
||||
|
@ -275,6 +299,10 @@ async def ws_event_stream(
|
|||
entity_ids = msg.get("entity_ids")
|
||||
if entity_ids:
|
||||
entity_ids = async_filter_entities(hass, entity_ids)
|
||||
if not entity_ids:
|
||||
_async_send_empty_response(connection, msg_id, start_time, end_time)
|
||||
return
|
||||
|
||||
event_types = async_determine_event_types(hass, entity_ids, device_ids)
|
||||
event_processor = EventProcessor(
|
||||
hass,
|
||||
|
|
|
@ -2102,11 +2102,17 @@ async def test_subscribe_all_entities_have_uom(hass, recorder_mock, hass_ws_clie
|
|||
]
|
||||
)
|
||||
await async_wait_recording_done(hass)
|
||||
entity_ids = ("sensor.uom", "sensor.uom_two")
|
||||
|
||||
def _cycle_entities():
|
||||
for entity_id in entity_ids:
|
||||
for state in ("1", "2", "3"):
|
||||
hass.states.async_set(
|
||||
entity_id, state, {ATTR_UNIT_OF_MEASUREMENT: "any"}
|
||||
)
|
||||
|
||||
init_count = sum(hass.bus.async_listeners().values())
|
||||
hass.states.async_set("sensor.uom", "1", {ATTR_UNIT_OF_MEASUREMENT: "any"})
|
||||
hass.states.async_set("sensor.uom", "2", {ATTR_UNIT_OF_MEASUREMENT: "any"})
|
||||
hass.states.async_set("sensor.uom", "3", {ATTR_UNIT_OF_MEASUREMENT: "any"})
|
||||
_cycle_entities()
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
websocket_client = await hass_ws_client()
|
||||
|
@ -2124,9 +2130,61 @@ async def test_subscribe_all_entities_have_uom(hass, recorder_mock, hass_ws_clie
|
|||
assert msg["type"] == TYPE_RESULT
|
||||
assert msg["success"]
|
||||
|
||||
hass.states.async_set("sensor.uom", "1", {ATTR_UNIT_OF_MEASUREMENT: "any"})
|
||||
hass.states.async_set("sensor.uom", "2", {ATTR_UNIT_OF_MEASUREMENT: "any"})
|
||||
hass.states.async_set("sensor.uom", "3", {ATTR_UNIT_OF_MEASUREMENT: "any"})
|
||||
_cycle_entities()
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == "event"
|
||||
assert msg["event"]["events"] == []
|
||||
|
||||
await websocket_client.close()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Check our listener got unsubscribed
|
||||
assert sum(hass.bus.async_listeners().values()) == init_count
|
||||
|
||||
|
||||
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
|
||||
async def test_subscribe_all_entities_have_uom_multiple(
|
||||
hass, recorder_mock, hass_ws_client
|
||||
):
|
||||
"""Test logbook stream with specific request for multiple entities that are always filtered."""
|
||||
now = dt_util.utcnow()
|
||||
await asyncio.gather(
|
||||
*[
|
||||
async_setup_component(hass, comp, {})
|
||||
for comp in ("homeassistant", "logbook", "automation", "script")
|
||||
]
|
||||
)
|
||||
await async_wait_recording_done(hass)
|
||||
entity_ids = ("sensor.uom", "sensor.uom_two")
|
||||
|
||||
def _cycle_entities():
|
||||
for entity_id in entity_ids:
|
||||
for state in ("1", "2", "3"):
|
||||
hass.states.async_set(
|
||||
entity_id, state, {ATTR_UNIT_OF_MEASUREMENT: "any"}
|
||||
)
|
||||
|
||||
init_count = sum(hass.bus.async_listeners().values())
|
||||
_cycle_entities()
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
websocket_client = await hass_ws_client()
|
||||
await websocket_client.send_json(
|
||||
{
|
||||
"id": 7,
|
||||
"type": "logbook/event_stream",
|
||||
"start_time": now.isoformat(),
|
||||
"entity_ids": [*entity_ids],
|
||||
}
|
||||
)
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == TYPE_RESULT
|
||||
assert msg["success"]
|
||||
|
||||
_cycle_entities()
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
|
@ -2138,3 +2196,90 @@ async def test_subscribe_all_entities_have_uom(hass, recorder_mock, hass_ws_clie
|
|||
|
||||
# Check our listener got unsubscribed
|
||||
assert sum(hass.bus.async_listeners().values()) == init_count
|
||||
|
||||
|
||||
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
|
||||
async def test_subscribe_entities_some_have_uom_multiple(
|
||||
hass, recorder_mock, hass_ws_client
|
||||
):
|
||||
"""Test logbook stream with uom filtered entities and non-fitlered entities."""
|
||||
now = dt_util.utcnow()
|
||||
await asyncio.gather(
|
||||
*[
|
||||
async_setup_component(hass, comp, {})
|
||||
for comp in ("homeassistant", "logbook", "automation", "script")
|
||||
]
|
||||
)
|
||||
await async_wait_recording_done(hass)
|
||||
filtered_entity_ids = ("sensor.uom", "sensor.uom_two")
|
||||
non_filtered_entity_ids = ("sensor.keep", "sensor.keep_two")
|
||||
|
||||
def _cycle_entities():
|
||||
for entity_id in filtered_entity_ids:
|
||||
for state in ("1", "2", "3"):
|
||||
hass.states.async_set(
|
||||
entity_id, state, {ATTR_UNIT_OF_MEASUREMENT: "any"}
|
||||
)
|
||||
for entity_id in non_filtered_entity_ids:
|
||||
for state in (STATE_ON, STATE_OFF):
|
||||
hass.states.async_set(entity_id, state)
|
||||
|
||||
init_count = sum(hass.bus.async_listeners().values())
|
||||
_cycle_entities()
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
websocket_client = await hass_ws_client()
|
||||
await websocket_client.send_json(
|
||||
{
|
||||
"id": 7,
|
||||
"type": "logbook/event_stream",
|
||||
"start_time": now.isoformat(),
|
||||
"entity_ids": [*filtered_entity_ids, *non_filtered_entity_ids],
|
||||
}
|
||||
)
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == TYPE_RESULT
|
||||
assert msg["success"]
|
||||
|
||||
_cycle_entities()
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == "event"
|
||||
assert msg["event"]["partial"] is True
|
||||
assert msg["event"]["events"] == [
|
||||
{"entity_id": "sensor.keep", "state": "off", "when": ANY},
|
||||
{"entity_id": "sensor.keep_two", "state": "off", "when": ANY},
|
||||
]
|
||||
|
||||
_cycle_entities()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == "event"
|
||||
assert msg["event"]["events"] == []
|
||||
assert "partial" not in msg["event"]
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == "event"
|
||||
assert msg["event"]["events"] == [
|
||||
{"entity_id": "sensor.keep", "state": "on", "when": ANY},
|
||||
{"entity_id": "sensor.keep", "state": "off", "when": ANY},
|
||||
{"entity_id": "sensor.keep_two", "state": "on", "when": ANY},
|
||||
{"entity_id": "sensor.keep_two", "state": "off", "when": ANY},
|
||||
{"entity_id": "sensor.keep", "state": "on", "when": ANY},
|
||||
{"entity_id": "sensor.keep", "state": "off", "when": ANY},
|
||||
{"entity_id": "sensor.keep_two", "state": "on", "when": ANY},
|
||||
{"entity_id": "sensor.keep_two", "state": "off", "when": ANY},
|
||||
]
|
||||
assert "partial" not in msg["event"]
|
||||
|
||||
await websocket_client.close()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Check our listener got unsubscribed
|
||||
assert sum(hass.bus.async_listeners().values()) == init_count
|
||||
|
|
Loading…
Add table
Reference in a new issue