Revert to using call_soon for event triggers and state changed event trackers (#122735)

This commit is contained in:
J. Nick Koston 2024-07-29 04:45:39 -05:00 committed by GitHub
parent 869ec3f670
commit 1879db9f8f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 233 additions and 6 deletions

View file

@ -154,7 +154,8 @@ async def async_attach_trigger(
# If event doesn't match, skip event # If event doesn't match, skip event
return return
hass.async_run_hass_job( hass.loop.call_soon(
hass.async_run_hass_job,
job, job,
{ {
"trigger": { "trigger": {

View file

@ -328,6 +328,16 @@ def async_track_state_change_event(
return _async_track_state_change_event(hass, entity_ids, action, job_type) return _async_track_state_change_event(hass, entity_ids, action, job_type)
@callback
def _async_dispatch_entity_id_event_soon(
hass: HomeAssistant,
callbacks: dict[str, list[HassJob[[Event[_StateEventDataT]], Any]]],
event: Event[_StateEventDataT],
) -> None:
"""Dispatch to listeners soon to ensure one event loop runs before dispatch."""
hass.loop.call_soon(_async_dispatch_entity_id_event, hass, callbacks, event)
@callback @callback
def _async_dispatch_entity_id_event( def _async_dispatch_entity_id_event(
hass: HomeAssistant, hass: HomeAssistant,
@ -361,7 +371,7 @@ def _async_state_filter(
_KEYED_TRACK_STATE_CHANGE = _KeyedEventTracker( _KEYED_TRACK_STATE_CHANGE = _KeyedEventTracker(
key=_TRACK_STATE_CHANGE_DATA, key=_TRACK_STATE_CHANGE_DATA,
event_type=EVENT_STATE_CHANGED, event_type=EVENT_STATE_CHANGED,
dispatcher_callable=_async_dispatch_entity_id_event, dispatcher_callable=_async_dispatch_entity_id_event_soon,
filter_callable=_async_state_filter, filter_callable=_async_state_filter,
) )

View file

@ -3229,6 +3229,7 @@ async def test_two_automations_call_restart_script_same_time(
hass.states.async_set("binary_sensor.presence", "on") hass.states.async_set("binary_sensor.presence", "on")
await hass.async_block_till_done() await hass.async_block_till_done()
await hass.async_block_till_done()
assert len(events) == 2 assert len(events) == 2
cancel() cancel()

View file

@ -343,6 +343,7 @@ async def test_functional_device_trigger(
assert len(hass.states.async_entity_ids(AUTOMATION_DOMAIN)) == 1 assert len(hass.states.async_entity_ids(AUTOMATION_DOMAIN)) == 1
await sensor_ws_data({"state": {"buttonevent": 1002}}) await sensor_ws_data({"state": {"buttonevent": 1002}})
await hass.async_block_till_done()
assert len(service_calls) == 1 assert len(service_calls) == 1
assert service_calls[0].data["some"] == "test_trigger_button_press" assert service_calls[0].data["some"] == "test_trigger_button_press"

View file

@ -2,7 +2,7 @@
import asyncio import asyncio
from datetime import timedelta from datetime import timedelta
from unittest.mock import patch from unittest.mock import ANY, patch
from freezegun import freeze_time from freezegun import freeze_time
import pytest import pytest
@ -10,8 +10,9 @@ import pytest
from homeassistant.components import history from homeassistant.components import history
from homeassistant.components.history import websocket_api from homeassistant.components.history import websocket_api
from homeassistant.components.recorder import Recorder from homeassistant.components.recorder import Recorder
from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE, STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
@ -2072,3 +2073,84 @@ async def test_history_stream_historical_only_with_start_time_state_past(
"id": 1, "id": 1,
"type": "event", "type": "event",
} }
async def test_history_stream_live_chained_events(
hass: HomeAssistant, recorder_mock: Recorder, hass_ws_client: WebSocketGenerator
) -> None:
"""Test history stream with history with a chained event."""
now = dt_util.utcnow()
await async_setup_component(hass, "history", {})
await async_wait_recording_done(hass)
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "history/stream",
"entity_ids": ["binary_sensor.is_light"],
"start_time": now.isoformat(),
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": False,
"minimal_response": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 1
assert response["type"] == "result"
response = await client.receive_json()
assert response == {
"event": {
"end_time": ANY,
"start_time": ANY,
"states": {
"binary_sensor.is_light": [
{
"a": {},
"lu": ANY,
"s": STATE_OFF,
},
],
},
},
"id": 1,
"type": "event",
}
await async_recorder_block_till_done(hass)
@callback
def auto_off_listener(event):
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
async_track_state_change_event(hass, ["binary_sensor.is_light"], auto_off_listener)
hass.states.async_set("binary_sensor.is_light", STATE_ON)
response = await client.receive_json()
assert response == {
"event": {
"states": {
"binary_sensor.is_light": [
{
"lu": ANY,
"s": STATE_ON,
"a": {},
},
{
"lu": ANY,
"s": STATE_OFF,
"a": {},
},
],
},
},
"id": 1,
"type": "event",
}

View file

@ -3,6 +3,7 @@
import asyncio import asyncio
from collections.abc import Callable from collections.abc import Callable
from datetime import timedelta from datetime import timedelta
from typing import Any
from unittest.mock import ANY, patch from unittest.mock import ANY, patch
from freezegun import freeze_time from freezegun import freeze_time
@ -31,9 +32,10 @@ from homeassistant.const import (
STATE_OFF, STATE_OFF,
STATE_ON, STATE_ON,
) )
from homeassistant.core import Event, HomeAssistant, State from homeassistant.core import Event, HomeAssistant, State, callback
from homeassistant.helpers import device_registry as dr, entity_registry as er from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.entityfilter import CONF_ENTITY_GLOBS from homeassistant.helpers.entityfilter import CONF_ENTITY_GLOBS
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
@ -2965,3 +2967,79 @@ async def test_subscribe_all_entities_are_continuous_with_device(
assert listeners_without_writes( assert listeners_without_writes(
hass.bus.async_listeners() hass.bus.async_listeners()
) == listeners_without_writes(init_listeners) ) == listeners_without_writes(init_listeners)
@pytest.mark.parametrize("params", [{"entity_ids": ["binary_sensor.is_light"]}, {}])
async def test_live_stream_with_changed_state_change(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
params: dict[str, Any],
) -> None:
"""Test the live logbook stream with chained events."""
config = {recorder.CONF_COMMIT_INTERVAL: 0.5}
await async_setup_recorder_instance(hass, config)
now = dt_util.utcnow()
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
hass.states.async_set("binary_sensor.is_light", "ignored")
hass.states.async_set("binary_sensor.is_light", "init")
await async_wait_recording_done(hass)
@callback
def auto_off_listener(event):
hass.states.async_set("binary_sensor.is_light", STATE_OFF)
async_track_state_change_event(hass, ["binary_sensor.is_light"], auto_off_listener)
websocket_client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
**params,
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
await hass.async_block_till_done()
hass.states.async_set("binary_sensor.is_light", STATE_ON)
recieved_rows = []
while len(recieved_rows) < 3:
msg = await asyncio.wait_for(websocket_client.receive_json(), 2.5)
assert msg["id"] == 7
assert msg["type"] == "event"
recieved_rows.extend(msg["event"]["events"])
# Make sure we get rows back in order
assert recieved_rows == [
{"entity_id": "binary_sensor.is_light", "state": "init", "when": ANY},
{"entity_id": "binary_sensor.is_light", "state": "on", "when": ANY},
{"entity_id": "binary_sensor.is_light", "state": "off", "when": ANY},
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 8
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Check our listener got unsubscribed
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)

View file

@ -1280,6 +1280,7 @@ async def test_master_state_with_template(hass: HomeAssistant) -> None:
context = Context() context = Context()
hass.states.async_set("input_boolean.test", STATE_ON, context=context) hass.states.async_set("input_boolean.test", STATE_ON, context=context)
await hass.async_block_till_done() await hass.async_block_till_done()
await hass.async_block_till_done()
assert hass.states.get("media_player.tv").state == STATE_OFF assert hass.states.get("media_player.tv").state == STATE_OFF
assert events[0].context == context assert events[0].context == context

View file

@ -24,6 +24,7 @@ from homeassistant.core import Context, HomeAssistant, State, SupportsResponse,
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from homeassistant.helpers import device_registry as dr from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.dispatcher import async_dispatcher_send from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.loader import async_get_integration from homeassistant.loader import async_get_integration
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
from homeassistant.util.json import json_loads from homeassistant.util.json import json_loads
@ -2814,3 +2815,54 @@ async def test_integration_descriptions(
assert response["success"] assert response["success"]
assert response["result"] assert response["result"]
async def test_subscribe_entities_chained_state_change(
hass: HomeAssistant,
websocket_client: MockHAClientWebSocket,
hass_admin_user: MockUser,
) -> None:
"""Test chaining state changed events.
Ensure the websocket sends the off state after
the on state.
"""
@callback
def auto_off_listener(event):
hass.states.async_set("light.permitted", "off")
async_track_state_change_event(hass, ["light.permitted"], auto_off_listener)
await websocket_client.send_json({"id": 7, "type": "subscribe_entities"})
data = await websocket_client.receive_str()
msg = json_loads(data)
assert msg["id"] == 7
assert msg["type"] == const.TYPE_RESULT
assert msg["success"]
data = await websocket_client.receive_str()
msg = json_loads(data)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == {"a": {}}
hass.states.async_set("light.permitted", "on")
data = await websocket_client.receive_str()
msg = json_loads(data)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == {
"a": {"light.permitted": {"a": {}, "c": ANY, "lc": ANY, "s": "on"}}
}
data = await websocket_client.receive_str()
msg = json_loads(data)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == {
"c": {"light.permitted": {"+": {"c": ANY, "lc": ANY, "s": "off"}}}
}
await websocket_client.close()
await hass.async_block_till_done()

View file

@ -306,6 +306,7 @@ async def test_device_offline_fires(
assert zha_device.available is True assert zha_device.available is True
zha_device.available = False zha_device.available = False
zha_device.emit_zha_event({"device_event_type": "device_offline"}) zha_device.emit_zha_event({"device_event_type": "device_offline"})
await hass.async_block_till_done()
assert len(service_calls) == 1 assert len(service_calls) == 1
assert service_calls[0].data["message"] == "service called" assert service_calls[0].data["message"] == "service called"