diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py
index 27a999e3f2c..3bf95b4303a 100644
--- a/homeassistant/components/recorder/__init__.py
+++ b/homeassistant/components/recorder/__init__.py
@@ -239,7 +239,7 @@ class Recorder(threading.Thread):
 
         self._timechanges_seen = 0
         self._keepalive_count = 0
-        self._old_state_ids = {}
+        self._old_states = {}
         self.event_session = None
         self.get_session = None
         self._completed_database_setup = False
@@ -385,7 +385,6 @@ class Recorder(threading.Thread):
                 if event.event_type == EVENT_STATE_CHANGED:
                     dbevent.event_data = "{}"
                 self.event_session.add(dbevent)
-                self.event_session.flush()
             except (TypeError, ValueError):
                 _LOGGER.warning("Event is not JSON serializable: %s", event)
             except Exception as err:  # pylint: disable=broad-except
@@ -396,16 +395,14 @@ class Recorder(threading.Thread):
                 try:
                     dbstate = States.from_event(event)
                     has_new_state = event.data.get("new_state")
-                    dbstate.old_state_id = self._old_state_ids.get(dbstate.entity_id)
+                    if dbstate.entity_id in self._old_states:
+                        dbstate.old_state = self._old_states.pop(dbstate.entity_id)
                     if not has_new_state:
                         dbstate.state = None
-                    dbstate.event_id = dbevent.event_id
+                    dbstate.event = dbevent
                     self.event_session.add(dbstate)
-                    self.event_session.flush()
                     if has_new_state:
-                        self._old_state_ids[dbstate.entity_id] = dbstate.state_id
-                    elif dbstate.entity_id in self._old_state_ids:
-                        del self._old_state_ids[dbstate.entity_id]
+                        self._old_states[dbstate.entity_id] = dbstate
                 except (TypeError, ValueError):
                     _LOGGER.warning(
                         "State is not JSON serializable: %s",
diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py
index ea1490ef053..642407cf0c6 100644
--- a/homeassistant/components/recorder/models.py
+++ b/homeassistant/components/recorder/models.py
@@ -14,6 +14,7 @@ from sqlalchemy import (
     distinct,
 )
 from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship
 from sqlalchemy.orm.session import Session
 
 from homeassistant.core import Context, Event, EventOrigin, State, split_entity_id
@@ -105,7 +106,9 @@ class States(Base):  # type: ignore
     last_changed = Column(DateTime(timezone=True), default=dt_util.utcnow)
     last_updated = Column(DateTime(timezone=True), default=dt_util.utcnow, index=True)
     created = Column(DateTime(timezone=True), default=dt_util.utcnow)
-    old_state_id = Column(Integer)
+    old_state_id = Column(Integer, ForeignKey("states.state_id"))
+    event = relationship("Events", uselist=False)
+    old_state = relationship("States", remote_side=[state_id])
 
     __table_args__ = (
         # Used for fetching the state of entities at a specific time
diff --git a/tests/components/logbook/test_init.py b/tests/components/logbook/test_init.py
index dff2c686afa..ad4d16798a6 100644
--- a/tests/components/logbook/test_init.py
+++ b/tests/components/logbook/test_init.py
@@ -92,6 +92,7 @@ class TestComponentLogbook(unittest.TestCase):
         # Logbook entry service call results in firing an event.
         # Our service call will unblock when the event listeners have been
         # scheduled. This means that they may not have been processed yet.
+        trigger_db_commit(self.hass)
         self.hass.block_till_done()
         self.hass.data[recorder.DATA_INSTANCE].block_till_done()
 
@@ -935,7 +936,7 @@ async def test_logbook_view(hass, hass_client):
     """Test the logbook view."""
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", {})
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
     client = await hass_client()
     response = await client.get(f"/api/logbook/{dt_util.utcnow().isoformat()}")
     assert response.status == 200
@@ -945,7 +946,7 @@ async def test_logbook_view_period_entity(hass, hass_client):
     """Test the logbook view with period and entity."""
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", {})
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     entity_id_test = "switch.test"
     hass.states.async_set(entity_id_test, STATE_OFF)
@@ -953,9 +954,9 @@ async def test_logbook_view_period_entity(hass, hass_client):
     entity_id_second = "switch.second"
     hass.states.async_set(entity_id_second, STATE_OFF)
     hass.states.async_set(entity_id_second, STATE_ON)
-    await hass.async_add_job(trigger_db_commit, hass)
+    await hass.async_add_executor_job(trigger_db_commit, hass)
     await hass.async_block_till_done()
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     client = await hass_client()
 
@@ -1051,6 +1052,8 @@ async def test_logbook_describe_event(hass, hass_client):
     ):
         hass.bus.async_fire("some_event")
         await hass.async_block_till_done()
+        await hass.async_add_executor_job(trigger_db_commit, hass)
+        await hass.async_block_till_done()
         await hass.async_add_executor_job(
             hass.data[recorder.DATA_INSTANCE].block_till_done
         )
@@ -1119,6 +1122,8 @@ async def test_exclude_described_event(hass, hass_client):
             "some_event", {logbook.ATTR_NAME: name, logbook.ATTR_ENTITY_ID: entity_id3}
         )
         await hass.async_block_till_done()
+        await hass.async_add_executor_job(trigger_db_commit, hass)
+        await hass.async_block_till_done()
         await hass.async_add_executor_job(
             hass.data[recorder.DATA_INSTANCE].block_till_done
         )
@@ -1138,7 +1143,7 @@ async def test_logbook_view_end_time_entity(hass, hass_client):
     """Test the logbook view with end_time and entity."""
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", {})
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     entity_id_test = "switch.test"
     hass.states.async_set(entity_id_test, STATE_OFF)
@@ -1146,9 +1151,9 @@ async def test_logbook_view_end_time_entity(hass, hass_client):
     entity_id_second = "switch.second"
     hass.states.async_set(entity_id_second, STATE_OFF)
     hass.states.async_set(entity_id_second, STATE_ON)
-    await hass.async_add_job(trigger_db_commit, hass)
+    await hass.async_add_executor_job(trigger_db_commit, hass)
     await hass.async_block_till_done()
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     client = await hass_client()
 
@@ -1199,7 +1204,7 @@ async def test_logbook_entity_filter_with_automations(hass, hass_client):
     await async_setup_component(hass, "automation", {})
     await async_setup_component(hass, "script", {})
 
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     entity_id_test = "alarm_control_panel.area_001"
     hass.states.async_set(entity_id_test, STATE_OFF)
@@ -1218,9 +1223,9 @@ async def test_logbook_entity_filter_with_automations(hass, hass_client):
     )
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
 
-    await hass.async_add_job(trigger_db_commit, hass)
+    await hass.async_add_executor_job(trigger_db_commit, hass)
     await hass.async_block_till_done()
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     client = await hass_client()
 
@@ -1271,7 +1276,7 @@ async def test_filter_continuous_sensor_values(hass, hass_client):
     """Test remove continuous sensor events from logbook."""
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", {})
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     entity_id_test = "switch.test"
     hass.states.async_set(entity_id_test, STATE_OFF)
@@ -1283,9 +1288,9 @@ async def test_filter_continuous_sensor_values(hass, hass_client):
     hass.states.async_set(entity_id_third, STATE_OFF, {"unit_of_measurement": "foo"})
     hass.states.async_set(entity_id_third, STATE_ON, {"unit_of_measurement": "foo"})
 
-    await hass.async_add_job(trigger_db_commit, hass)
+    await hass.async_add_executor_job(trigger_db_commit, hass)
     await hass.async_block_till_done()
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     client = await hass_client()
 
@@ -1307,7 +1312,7 @@ async def test_exclude_new_entities(hass, hass_client):
     """Test if events are excluded on first update."""
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", {})
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     entity_id = "climate.bla"
     entity_id2 = "climate.blu"
@@ -1317,9 +1322,9 @@ async def test_exclude_new_entities(hass, hass_client):
     hass.states.async_set(entity_id2, STATE_OFF)
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
 
-    await hass.async_add_job(trigger_db_commit, hass)
+    await hass.async_add_executor_job(trigger_db_commit, hass)
     await hass.async_block_till_done()
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     client = await hass_client()
 
@@ -1342,7 +1347,7 @@ async def test_exclude_removed_entities(hass, hass_client):
     """Test if events are excluded on last update."""
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", {})
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     entity_id = "climate.bla"
     entity_id2 = "climate.blu"
@@ -1358,9 +1363,9 @@ async def test_exclude_removed_entities(hass, hass_client):
     hass.states.async_remove(entity_id)
     hass.states.async_remove(entity_id2)
 
-    await hass.async_add_job(trigger_db_commit, hass)
+    await hass.async_add_executor_job(trigger_db_commit, hass)
     await hass.async_block_till_done()
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     client = await hass_client()
 
@@ -1384,7 +1389,7 @@ async def test_exclude_attribute_changes(hass, hass_client):
     """Test if events of attribute changes are filtered."""
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", {})
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
 
@@ -1397,9 +1402,9 @@ async def test_exclude_attribute_changes(hass, hass_client):
 
     await hass.async_block_till_done()
 
-    await hass.async_add_job(trigger_db_commit, hass)
+    await hass.async_add_executor_job(trigger_db_commit, hass)
     await hass.async_block_till_done()
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     client = await hass_client()
 
@@ -1427,7 +1432,7 @@ async def test_logbook_entity_context_id(hass, hass_client):
     await async_setup_component(hass, "automation", {})
     await async_setup_component(hass, "script", {})
 
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     context = ha.Context(
         id="ac5bd62de45711eaaeb351041eec8dd9",
@@ -1467,7 +1472,7 @@ async def test_logbook_entity_context_id(hass, hass_client):
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
     await hass.async_block_till_done()
 
-    await hass.async_add_job(
+    await hass.async_add_executor_job(
         logbook.log_entry,
         hass,
         "mock_name",
@@ -1478,7 +1483,7 @@ async def test_logbook_entity_context_id(hass, hass_client):
     )
     await hass.async_block_till_done()
 
-    await hass.async_add_job(
+    await hass.async_add_executor_job(
         logbook.log_entry,
         hass,
         "mock_name",
@@ -1513,9 +1518,9 @@ async def test_logbook_entity_context_id(hass, hass_client):
     )
     await hass.async_block_till_done()
 
-    await hass.async_add_job(trigger_db_commit, hass)
+    await hass.async_add_executor_job(trigger_db_commit, hass)
     await hass.async_block_till_done()
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     client = await hass_client()
 
@@ -1846,7 +1851,7 @@ async def test_icon_and_state(hass, hass_client):
     """Test to ensure state and custom icons are returned."""
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", {})
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
 
@@ -1895,7 +1900,7 @@ async def test_exclude_events_domain(hass, hass_client):
     )
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", config)
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
     hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
@@ -1935,7 +1940,7 @@ async def test_exclude_events_domain_glob(hass, hass_client):
     )
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", config)
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
     hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
@@ -1975,7 +1980,7 @@ async def test_include_events_entity(hass, hass_client):
     )
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", config)
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
     hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
@@ -2008,7 +2013,7 @@ async def test_exclude_events_entity(hass, hass_client):
     )
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", config)
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
     hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
@@ -2042,7 +2047,7 @@ async def test_include_events_domain(hass, hass_client):
     )
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", config)
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
     hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
@@ -2086,7 +2091,7 @@ async def test_include_events_domain_glob(hass, hass_client):
     )
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", config)
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
     hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
@@ -2138,7 +2143,7 @@ async def test_include_exclude_events(hass, hass_client):
     )
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", config)
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
     hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
@@ -2192,7 +2197,7 @@ async def test_include_exclude_events_with_glob_filters(hass, hass_client):
     )
     await hass.async_add_executor_job(init_recorder_component, hass)
     await async_setup_component(hass, "logbook", config)
-    await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
+    await hass.async_add_executor_job(hass.data[recorder.DATA_INSTANCE].block_till_done)
 
     hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
     hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py
index 6116b383341..08af027bce3 100644
--- a/tests/components/recorder/test_init.py
+++ b/tests/components/recorder/test_init.py
@@ -181,7 +181,20 @@ def test_saving_state_incl_entities(hass_recorder):
 
 def test_saving_event_exclude_event_type(hass_recorder):
     """Test saving and restoring an event."""
-    hass = hass_recorder({"exclude": {"event_types": "test"}})
+    hass = hass_recorder(
+        {
+            "exclude": {
+                "event_types": [
+                    "service_registered",
+                    "homeassistant_start",
+                    "component_loaded",
+                    "core_config_updated",
+                    "homeassistant_started",
+                    "test",
+                ]
+            }
+        }
+    )
     events = _add_events(hass, ["test", "test2"])
     assert len(events) == 1
     assert events[0].event_type == "test2"