Separate recorder logic for state_changed and non-state_changed events (#71204)

This commit is contained in:
J. Nick Koston 2022-05-02 17:22:53 -05:00 committed by GitHub
parent ea456893f9
commit 75026f9fed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 97 additions and 30 deletions

View file

@ -1123,7 +1123,10 @@ class Recorder(threading.Thread):
def _process_one_event(self, event: Event) -> None:
if not self.enabled:
return
self._process_event_into_session(event)
if event.event_type == EVENT_STATE_CHANGED:
self._process_state_changed_event_into_session(event)
else:
self._process_non_state_changed_event_into_session(event)
# Commit if the commit interval is zero
if not self.commit_interval:
self._commit_event_session_or_retry()
@ -1164,40 +1167,44 @@ class Recorder(threading.Thread):
return cast(int, data_id[0])
return None
def _process_event_into_session(self, event: Event) -> None:
def _process_non_state_changed_event_into_session(self, event: Event) -> None:
"""Process any event into the session except state changed."""
assert self.event_session is not None
dbevent = Events.from_event(event)
if event.event_type != EVENT_STATE_CHANGED and event.data:
try:
shared_data = EventData.shared_data_from_event(event)
except (TypeError, ValueError):
_LOGGER.warning("Event is not JSON serializable: %s", event)
return
# Matching attributes found in the pending commit
if pending_event_data := self._pending_event_data.get(shared_data):
dbevent.event_data_rel = pending_event_data
# Matching attributes id found in the cache
elif data_id := self._event_data_ids.get(shared_data):
dbevent.data_id = data_id
else:
data_hash = EventData.hash_shared_data(shared_data)
# Matching attributes found in the database
if data_id := self._find_shared_data_in_db(data_hash, shared_data):
self._event_data_ids[shared_data] = dbevent.data_id = data_id
# No matching attributes found, save them in the DB
else:
dbevent_data = EventData(shared_data=shared_data, hash=data_hash)
dbevent.event_data_rel = self._pending_event_data[
shared_data
] = dbevent_data
self.event_session.add(dbevent_data)
if event.event_type != EVENT_STATE_CHANGED:
if not event.data:
self.event_session.add(dbevent)
return
try:
shared_data = EventData.shared_data_from_event(event)
except (TypeError, ValueError) as ex:
_LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex)
return
# Matching attributes found in the pending commit
if pending_event_data := self._pending_event_data.get(shared_data):
dbevent.event_data_rel = pending_event_data
# Matching attributes id found in the cache
elif data_id := self._event_data_ids.get(shared_data):
dbevent.data_id = data_id
else:
data_hash = EventData.hash_shared_data(shared_data)
# Matching attributes found in the database
if data_id := self._find_shared_data_in_db(data_hash, shared_data):
self._event_data_ids[shared_data] = dbevent.data_id = data_id
# No matching attributes found, save them in the DB
else:
dbevent_data = EventData(shared_data=shared_data, hash=data_hash)
dbevent.event_data_rel = self._pending_event_data[
shared_data
] = dbevent_data
self.event_session.add(dbevent_data)
self.event_session.add(dbevent)
def _process_state_changed_event_into_session(self, event: Event) -> None:
"""Process a state_changed event into the session."""
assert self.event_session is not None
try:
dbstate = States.from_event(event)
shared_attrs = StateAttributes.shared_attrs_from_event(

View file

@ -1456,3 +1456,63 @@ async def test_database_connection_keep_alive_disabled_on_sqlite(
)
await async_wait_recording_done(hass)
assert "Sending keepalive" not in caplog.text
def test_deduplication_event_data_inside_commit_interval(hass_recorder, caplog):
"""Test deduplication of event data inside the commit interval."""
hass = hass_recorder()
for _ in range(10):
hass.bus.fire("this_event", {"de": "dupe"})
wait_recording_done(hass)
for _ in range(10):
hass.bus.fire("this_event", {"de": "dupe"})
wait_recording_done(hass)
with session_scope(hass=hass) as session:
events = list(
session.query(Events)
.filter(Events.event_type == "this_event")
.outerjoin(EventData, (Events.data_id == EventData.data_id))
)
assert len(events) == 20
first_data_id = events[0].data_id
assert all(event.data_id == first_data_id for event in events)
# Patch STATE_ATTRIBUTES_ID_CACHE_SIZE since otherwise
# the CI can fail because the test takes too long to run
@patch("homeassistant.components.recorder.STATE_ATTRIBUTES_ID_CACHE_SIZE", 5)
def test_deduplication_state_attributes_inside_commit_interval(hass_recorder, caplog):
"""Test deduplication of state attributes inside the commit interval."""
hass = hass_recorder()
entity_id = "test.recorder"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
hass.states.set(entity_id, "on", attributes)
hass.states.set(entity_id, "off", attributes)
# Now exaust the cache to ensure we go back to the db
for attr_id in range(5):
hass.states.set(entity_id, "on", {"test_attr": attr_id})
hass.states.set(entity_id, "off", {"test_attr": attr_id})
wait_recording_done(hass)
for _ in range(5):
hass.states.set(entity_id, "on", attributes)
hass.states.set(entity_id, "off", attributes)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
states = list(
session.query(States)
.filter(States.entity_id == entity_id)
.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
)
assert len(states) == 22
first_attributes_id = states[0].attributes_id
last_attributes_id = states[-1].attributes_id
assert first_attributes_id == last_attributes_id