diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 35a664cac65..1d81495c3a5 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -1123,7 +1123,10 @@ class Recorder(threading.Thread): def _process_one_event(self, event: Event) -> None: if not self.enabled: return - self._process_event_into_session(event) + if event.event_type == EVENT_STATE_CHANGED: + self._process_state_changed_event_into_session(event) + else: + self._process_non_state_changed_event_into_session(event) # Commit if the commit interval is zero if not self.commit_interval: self._commit_event_session_or_retry() @@ -1164,40 +1167,44 @@ class Recorder(threading.Thread): return cast(int, data_id[0]) return None - def _process_event_into_session(self, event: Event) -> None: + def _process_non_state_changed_event_into_session(self, event: Event) -> None: + """Process any event into the session except state changed.""" assert self.event_session is not None dbevent = Events.from_event(event) - - if event.event_type != EVENT_STATE_CHANGED and event.data: - try: - shared_data = EventData.shared_data_from_event(event) - except (TypeError, ValueError): - _LOGGER.warning("Event is not JSON serializable: %s", event) - return - - # Matching attributes found in the pending commit - if pending_event_data := self._pending_event_data.get(shared_data): - dbevent.event_data_rel = pending_event_data - # Matching attributes id found in the cache - elif data_id := self._event_data_ids.get(shared_data): - dbevent.data_id = data_id - else: - data_hash = EventData.hash_shared_data(shared_data) - # Matching attributes found in the database - if data_id := self._find_shared_data_in_db(data_hash, shared_data): - self._event_data_ids[shared_data] = dbevent.data_id = data_id - # No matching attributes found, save them in the DB - else: - dbevent_data = EventData(shared_data=shared_data, hash=data_hash) - dbevent.event_data_rel = self._pending_event_data[ - shared_data - ] = dbevent_data - self.event_session.add(dbevent_data) - - if event.event_type != EVENT_STATE_CHANGED: + if not event.data: self.event_session.add(dbevent) return + try: + shared_data = EventData.shared_data_from_event(event) + except (TypeError, ValueError) as ex: + _LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex) + return + + # Matching attributes found in the pending commit + if pending_event_data := self._pending_event_data.get(shared_data): + dbevent.event_data_rel = pending_event_data + # Matching attributes id found in the cache + elif data_id := self._event_data_ids.get(shared_data): + dbevent.data_id = data_id + else: + data_hash = EventData.hash_shared_data(shared_data) + # Matching attributes found in the database + if data_id := self._find_shared_data_in_db(data_hash, shared_data): + self._event_data_ids[shared_data] = dbevent.data_id = data_id + # No matching attributes found, save them in the DB + else: + dbevent_data = EventData(shared_data=shared_data, hash=data_hash) + dbevent.event_data_rel = self._pending_event_data[ + shared_data + ] = dbevent_data + self.event_session.add(dbevent_data) + + self.event_session.add(dbevent) + + def _process_state_changed_event_into_session(self, event: Event) -> None: + """Process a state_changed event into the session.""" + assert self.event_session is not None try: dbstate = States.from_event(event) shared_attrs = StateAttributes.shared_attrs_from_event( diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 0b03283547e..6d4f2e1106a 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -1456,3 +1456,63 @@ async def test_database_connection_keep_alive_disabled_on_sqlite( ) await async_wait_recording_done(hass) assert "Sending keepalive" not in caplog.text + + +def test_deduplication_event_data_inside_commit_interval(hass_recorder, caplog): + """Test deduplication of event data inside the commit interval.""" + hass = hass_recorder() + + for _ in range(10): + hass.bus.fire("this_event", {"de": "dupe"}) + wait_recording_done(hass) + for _ in range(10): + hass.bus.fire("this_event", {"de": "dupe"}) + wait_recording_done(hass) + + with session_scope(hass=hass) as session: + events = list( + session.query(Events) + .filter(Events.event_type == "this_event") + .outerjoin(EventData, (Events.data_id == EventData.data_id)) + ) + assert len(events) == 20 + first_data_id = events[0].data_id + assert all(event.data_id == first_data_id for event in events) + + +# Patch STATE_ATTRIBUTES_ID_CACHE_SIZE since otherwise +# the CI can fail because the test takes too long to run +@patch("homeassistant.components.recorder.STATE_ATTRIBUTES_ID_CACHE_SIZE", 5) +def test_deduplication_state_attributes_inside_commit_interval(hass_recorder, caplog): + """Test deduplication of state attributes inside the commit interval.""" + hass = hass_recorder() + + entity_id = "test.recorder" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + + hass.states.set(entity_id, "on", attributes) + hass.states.set(entity_id, "off", attributes) + + # Now exaust the cache to ensure we go back to the db + for attr_id in range(5): + hass.states.set(entity_id, "on", {"test_attr": attr_id}) + hass.states.set(entity_id, "off", {"test_attr": attr_id}) + + wait_recording_done(hass) + for _ in range(5): + hass.states.set(entity_id, "on", attributes) + hass.states.set(entity_id, "off", attributes) + wait_recording_done(hass) + + with session_scope(hass=hass) as session: + states = list( + session.query(States) + .filter(States.entity_id == entity_id) + .outerjoin( + StateAttributes, (States.attributes_id == StateAttributes.attributes_id) + ) + ) + assert len(states) == 22 + first_attributes_id = states[0].attributes_id + last_attributes_id = states[-1].attributes_id + assert first_attributes_id == last_attributes_id