Deduplicate event_types in the events table (#89465)
* Deduplicate event_types in the events table
* Deduplicate event_types in the events table
* more fixes
* adjust
* adjust
* fix product
* fix tests
* adjust
* migrate
* migrate
* migrate
* more test fixes
* more test fixes
* fix
* migration test
* adjust
* speed up
* fix index
* fix more tests
* handle db failure
* preload
* tweak
* adjust
* fix stale docs strings, remove dead code
* refactor
* fix slow tests
* coverage
* self join to resolve query performance
* fix typo
* no need for quiet
* no need to drop index already dropped
* remove index that will never be used
* drop index sooner as we no longer use it
* Revert "remove index that will never be used"
This reverts commit 461aad2c52
.
* typo
This commit is contained in:
parent
56454c8580
commit
8bd43760b6
22 changed files with 725 additions and 39 deletions
|
@ -17,10 +17,12 @@ from homeassistant.components.recorder.db_schema import (
|
|||
STATES_CONTEXT_ID_BIN_INDEX,
|
||||
EventData,
|
||||
Events,
|
||||
EventTypes,
|
||||
StateAttributes,
|
||||
States,
|
||||
)
|
||||
from homeassistant.components.recorder.filters import like_domain_matchers
|
||||
from homeassistant.components.recorder.queries import select_event_type_ids
|
||||
|
||||
from ..const import ALWAYS_CONTINUOUS_DOMAINS, CONDITIONALLY_CONTINUOUS_DOMAINS
|
||||
|
||||
|
@ -44,7 +46,7 @@ PSEUDO_EVENT_STATE_CHANGED: Final = None
|
|||
|
||||
EVENT_COLUMNS = (
|
||||
Events.event_id.label("event_id"),
|
||||
Events.event_type.label("event_type"),
|
||||
EventTypes.event_type.label("event_type"),
|
||||
Events.event_data.label("event_data"),
|
||||
Events.time_fired_ts.label("time_fired_ts"),
|
||||
Events.context_id_bin.label("context_id_bin"),
|
||||
|
@ -115,7 +117,8 @@ def select_events_context_id_subquery(
|
|||
return (
|
||||
select(Events.context_id_bin)
|
||||
.where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day))
|
||||
.where(Events.event_type.in_(event_types))
|
||||
.where(Events.event_type_id.in_(select_event_type_ids(event_types)))
|
||||
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
|
||||
.outerjoin(EventData, (Events.data_id == EventData.data_id))
|
||||
)
|
||||
|
||||
|
@ -147,7 +150,8 @@ def select_events_without_states(
|
|||
return (
|
||||
select(*EVENT_ROWS_NO_STATES, NOT_CONTEXT_ONLY)
|
||||
.where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day))
|
||||
.where(Events.event_type.in_(event_types))
|
||||
.where(Events.event_type_id.in_(select_event_type_ids(event_types)))
|
||||
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
|
||||
.outerjoin(EventData, (Events.data_id == EventData.data_id))
|
||||
)
|
||||
|
||||
|
@ -182,6 +186,7 @@ def legacy_select_events_context_id(
|
|||
.outerjoin(
|
||||
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
|
||||
)
|
||||
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
|
||||
.where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day))
|
||||
.where(Events.context_id_bin == context_id_bin)
|
||||
)
|
||||
|
|
|
@ -13,6 +13,7 @@ from homeassistant.components.recorder.db_schema import (
|
|||
DEVICE_ID_IN_EVENT,
|
||||
EventData,
|
||||
Events,
|
||||
EventTypes,
|
||||
States,
|
||||
)
|
||||
|
||||
|
@ -60,7 +61,9 @@ def _apply_devices_context_union(
|
|||
select_events_context_only()
|
||||
.select_from(devices_cte)
|
||||
.outerjoin(Events, devices_cte.c.context_id_bin == Events.context_id_bin)
|
||||
).outerjoin(EventData, (Events.data_id == EventData.data_id)),
|
||||
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
|
||||
.outerjoin(EventData, (Events.data_id == EventData.data_id)),
|
||||
),
|
||||
apply_states_context_hints(
|
||||
select_states_context_only()
|
||||
.select_from(devices_cte)
|
||||
|
|
|
@ -15,6 +15,7 @@ from homeassistant.components.recorder.db_schema import (
|
|||
OLD_ENTITY_ID_IN_EVENT,
|
||||
EventData,
|
||||
Events,
|
||||
EventTypes,
|
||||
States,
|
||||
)
|
||||
|
||||
|
@ -78,7 +79,9 @@ def _apply_entities_context_union(
|
|||
select_events_context_only()
|
||||
.select_from(entities_cte)
|
||||
.outerjoin(Events, entities_cte.c.context_id_bin == Events.context_id_bin)
|
||||
).outerjoin(EventData, (Events.data_id == EventData.data_id)),
|
||||
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
|
||||
.outerjoin(EventData, (Events.data_id == EventData.data_id))
|
||||
),
|
||||
apply_states_context_hints(
|
||||
select_states_context_only()
|
||||
.select_from(entities_cte)
|
||||
|
|
|
@ -8,7 +8,12 @@ from sqlalchemy.sql.elements import ColumnElement
|
|||
from sqlalchemy.sql.lambdas import StatementLambdaElement
|
||||
from sqlalchemy.sql.selectable import CTE, CompoundSelect, Select
|
||||
|
||||
from homeassistant.components.recorder.db_schema import EventData, Events, States
|
||||
from homeassistant.components.recorder.db_schema import (
|
||||
EventData,
|
||||
Events,
|
||||
EventTypes,
|
||||
States,
|
||||
)
|
||||
|
||||
from .common import (
|
||||
apply_events_context_hints,
|
||||
|
@ -80,7 +85,9 @@ def _apply_entities_devices_context_union(
|
|||
.outerjoin(
|
||||
Events, devices_entities_cte.c.context_id_bin == Events.context_id_bin
|
||||
)
|
||||
).outerjoin(EventData, (Events.data_id == EventData.data_id)),
|
||||
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
|
||||
.outerjoin(EventData, (Events.data_id == EventData.data_id)),
|
||||
),
|
||||
apply_states_context_hints(
|
||||
select_states_context_only()
|
||||
.select_from(devices_entities_cte)
|
||||
|
|
|
@ -61,6 +61,7 @@ from .db_schema import (
|
|||
Base,
|
||||
EventData,
|
||||
Events,
|
||||
EventTypes,
|
||||
StateAttributes,
|
||||
States,
|
||||
Statistics,
|
||||
|
@ -81,8 +82,10 @@ from .queries import (
|
|||
find_shared_data_id,
|
||||
get_shared_attributes,
|
||||
get_shared_event_datas,
|
||||
has_event_type_to_migrate,
|
||||
)
|
||||
from .run_history import RunHistory
|
||||
from .table_managers.event_types import EventTypeManager
|
||||
from .tasks import (
|
||||
AdjustLRUSizeTask,
|
||||
AdjustStatisticsTask,
|
||||
|
@ -92,6 +95,7 @@ from .tasks import (
|
|||
ContextIDMigrationTask,
|
||||
DatabaseLockTask,
|
||||
EventTask,
|
||||
EventTypeIDMigrationTask,
|
||||
ImportStatisticsTask,
|
||||
KeepAliveTask,
|
||||
PerodicCleanupTask,
|
||||
|
@ -135,6 +139,7 @@ EXPIRE_AFTER_COMMITS = 120
|
|||
STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048
|
||||
EVENT_DATA_ID_CACHE_SIZE = 2048
|
||||
|
||||
|
||||
SHUTDOWN_TASK = object()
|
||||
|
||||
COMMIT_TASK = CommitTask()
|
||||
|
@ -209,6 +214,7 @@ class Recorder(threading.Thread):
|
|||
self._old_states: dict[str | None, States] = {}
|
||||
self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE)
|
||||
self._event_data_ids: LRU = LRU(EVENT_DATA_ID_CACHE_SIZE)
|
||||
self.event_type_manager = EventTypeManager()
|
||||
self._pending_state_attributes: dict[str, StateAttributes] = {}
|
||||
self._pending_event_data: dict[str, EventData] = {}
|
||||
self._pending_expunge: list[States] = []
|
||||
|
@ -688,10 +694,26 @@ class Recorder(threading.Thread):
|
|||
_LOGGER.debug("Recorder processing the queue")
|
||||
self._adjust_lru_size()
|
||||
self.hass.add_job(self._async_set_recorder_ready_migration_done)
|
||||
self.queue_task(ContextIDMigrationTask())
|
||||
self._activate_table_managers_or_migrate()
|
||||
self._run_event_loop()
|
||||
self._shutdown()
|
||||
|
||||
def _activate_table_managers_or_migrate(self) -> None:
|
||||
"""Activate the table managers or schedule migrations."""
|
||||
# Currently we always check if context ids need to be migrated
|
||||
# since there are multiple tables. This could be optimized
|
||||
# to check both the states and events table to see if there
|
||||
# are any missing and avoid inserting the task but it currently
|
||||
# is not needed since there is no dependent code branching
|
||||
# on the result of the migration.
|
||||
self.queue_task(ContextIDMigrationTask())
|
||||
with session_scope(session=self.get_session()) as session:
|
||||
if session.execute(has_event_type_to_migrate()).scalar():
|
||||
self.queue_task(EventTypeIDMigrationTask())
|
||||
else:
|
||||
_LOGGER.debug("Activating event type manager as all data is migrated")
|
||||
self.event_type_manager.active = True
|
||||
|
||||
def _run_event_loop(self) -> None:
|
||||
"""Run the event loop for the recorder."""
|
||||
# Use a session for the event read loop
|
||||
|
@ -724,8 +746,10 @@ class Recorder(threading.Thread):
|
|||
else:
|
||||
non_state_change_events.append(event_)
|
||||
|
||||
assert self.event_session is not None
|
||||
self._pre_process_state_change_events(state_change_events)
|
||||
self._pre_process_non_state_change_events(non_state_change_events)
|
||||
self.event_type_manager.load(non_state_change_events, self.event_session)
|
||||
|
||||
def _pre_process_state_change_events(self, events: list[Event]) -> None:
|
||||
"""Load startup state attributes from the database.
|
||||
|
@ -944,13 +968,30 @@ class Recorder(threading.Thread):
|
|||
|
||||
def _process_non_state_changed_event_into_session(self, event: Event) -> None:
|
||||
"""Process any event into the session except state changed."""
|
||||
assert self.event_session is not None
|
||||
event_session = self.event_session
|
||||
assert event_session is not None
|
||||
dbevent = Events.from_event(event)
|
||||
|
||||
# Map the event_type to the EventTypes table
|
||||
event_type_manager = self.event_type_manager
|
||||
if pending_event_types := event_type_manager.get_pending(event.event_type):
|
||||
dbevent.event_type_rel = pending_event_types
|
||||
elif event_type_id := event_type_manager.get(event.event_type, event_session):
|
||||
dbevent.event_type_id = event_type_id
|
||||
else:
|
||||
event_types = EventTypes(event_type=event.event_type)
|
||||
event_type_manager.add_pending(event_types)
|
||||
event_session.add(event_types)
|
||||
dbevent.event_type_rel = event_types
|
||||
|
||||
if not event.data:
|
||||
self.event_session.add(dbevent)
|
||||
event_session.add(dbevent)
|
||||
return
|
||||
|
||||
if not (shared_data_bytes := self._serialize_event_data_from_event(event)):
|
||||
return
|
||||
|
||||
# Map the event data to the EventData table
|
||||
shared_data = shared_data_bytes.decode("utf-8")
|
||||
# Matching attributes found in the pending commit
|
||||
if pending_event_data := self._pending_event_data.get(shared_data):
|
||||
|
@ -969,9 +1010,9 @@ class Recorder(threading.Thread):
|
|||
dbevent.event_data_rel = self._pending_event_data[
|
||||
shared_data
|
||||
] = dbevent_data
|
||||
self.event_session.add(dbevent_data)
|
||||
event_session.add(dbevent_data)
|
||||
|
||||
self.event_session.add(dbevent)
|
||||
event_session.add(dbevent)
|
||||
|
||||
def _serialize_state_attributes_from_event(self, event: Event) -> bytes | None:
|
||||
"""Serialize state changed event data."""
|
||||
|
@ -1096,6 +1137,7 @@ class Recorder(threading.Thread):
|
|||
for event_data in self._pending_event_data.values():
|
||||
self._event_data_ids[event_data.shared_data] = event_data.data_id
|
||||
self._pending_event_data = {}
|
||||
self.event_type_manager.post_commit_pending()
|
||||
|
||||
# Expire is an expensive operation (frequently more expensive
|
||||
# than the flush and commit itself) so we only
|
||||
|
@ -1122,6 +1164,7 @@ class Recorder(threading.Thread):
|
|||
self._event_data_ids.clear()
|
||||
self._pending_state_attributes.clear()
|
||||
self._pending_event_data.clear()
|
||||
self.event_type_manager.reset()
|
||||
|
||||
if not self.event_session:
|
||||
return
|
||||
|
@ -1152,6 +1195,10 @@ class Recorder(threading.Thread):
|
|||
"""Migrate context ids if needed."""
|
||||
return migration.migrate_context_ids(self)
|
||||
|
||||
def _migrate_event_type_ids(self) -> bool:
|
||||
"""Migrate event type ids if needed."""
|
||||
return migration.migrate_event_type_ids(self)
|
||||
|
||||
def _send_keep_alive(self) -> None:
|
||||
"""Send a keep alive to keep the db connection open."""
|
||||
assert self.event_session is not None
|
||||
|
|
|
@ -68,12 +68,13 @@ class Base(DeclarativeBase):
|
|||
"""Base class for tables."""
|
||||
|
||||
|
||||
SCHEMA_VERSION = 36
|
||||
SCHEMA_VERSION = 37
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
TABLE_EVENTS = "events"
|
||||
TABLE_EVENT_DATA = "event_data"
|
||||
TABLE_EVENT_TYPES = "event_types"
|
||||
TABLE_STATES = "states"
|
||||
TABLE_STATE_ATTRIBUTES = "state_attributes"
|
||||
TABLE_RECORDER_RUNS = "recorder_runs"
|
||||
|
@ -93,6 +94,7 @@ ALL_TABLES = [
|
|||
TABLE_STATE_ATTRIBUTES,
|
||||
TABLE_EVENTS,
|
||||
TABLE_EVENT_DATA,
|
||||
TABLE_EVENT_TYPES,
|
||||
TABLE_RECORDER_RUNS,
|
||||
TABLE_SCHEMA_CHANGES,
|
||||
TABLE_STATISTICS,
|
||||
|
@ -176,7 +178,9 @@ class Events(Base):
|
|||
__table_args__ = (
|
||||
# Used for fetching events at a specific time
|
||||
# see logbook
|
||||
Index("ix_events_event_type_time_fired_ts", "event_type", "time_fired_ts"),
|
||||
Index(
|
||||
"ix_events_event_type_id_time_fired_ts", "event_type_id", "time_fired_ts"
|
||||
),
|
||||
Index(
|
||||
EVENTS_CONTEXT_ID_BIN_INDEX,
|
||||
"context_id_bin",
|
||||
|
@ -187,7 +191,9 @@ class Events(Base):
|
|||
)
|
||||
__tablename__ = TABLE_EVENTS
|
||||
event_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True)
|
||||
event_type: Mapped[str | None] = mapped_column(String(MAX_LENGTH_EVENT_EVENT_TYPE))
|
||||
event_type: Mapped[str | None] = mapped_column(
|
||||
String(MAX_LENGTH_EVENT_EVENT_TYPE)
|
||||
) # no longer used
|
||||
event_data: Mapped[str | None] = mapped_column(
|
||||
Text().with_variant(mysql.LONGTEXT, "mysql", "mariadb")
|
||||
)
|
||||
|
@ -220,13 +226,17 @@ class Events(Base):
|
|||
context_parent_id_bin: Mapped[bytes | None] = mapped_column(
|
||||
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
|
||||
)
|
||||
event_type_id: Mapped[int | None] = mapped_column(
|
||||
Integer, ForeignKey("event_types.event_type_id"), index=True
|
||||
)
|
||||
event_data_rel: Mapped[EventData | None] = relationship("EventData")
|
||||
event_type_rel: Mapped[EventTypes | None] = relationship("EventTypes")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Return string representation of instance for debugging."""
|
||||
return (
|
||||
"<recorder.Events("
|
||||
f"id={self.event_id}, type='{self.event_type}', "
|
||||
f"id={self.event_id}, event_type_id='{self.event_type_id}', "
|
||||
f"origin_idx='{self.origin_idx}', time_fired='{self._time_fired_isotime}'"
|
||||
f", data_id={self.data_id})>"
|
||||
)
|
||||
|
@ -247,7 +257,7 @@ class Events(Base):
|
|||
def from_event(event: Event) -> Events:
|
||||
"""Create an event database object from a native event."""
|
||||
return Events(
|
||||
event_type=event.event_type,
|
||||
event_type=None,
|
||||
event_data=None,
|
||||
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
|
||||
time_fired=None,
|
||||
|
@ -330,6 +340,23 @@ class EventData(Base):
|
|||
return {}
|
||||
|
||||
|
||||
class EventTypes(Base):
|
||||
"""Event type history."""
|
||||
|
||||
__table_args__ = (_DEFAULT_TABLE_ARGS,)
|
||||
__tablename__ = TABLE_EVENT_TYPES
|
||||
event_type_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True)
|
||||
event_type: Mapped[str | None] = mapped_column(String(MAX_LENGTH_EVENT_EVENT_TYPE))
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Return string representation of instance for debugging."""
|
||||
return (
|
||||
"<recorder.EventTypes("
|
||||
f"id={self.event_type_id}, event_type='{self.event_type}'"
|
||||
")>"
|
||||
)
|
||||
|
||||
|
||||
class States(Base):
|
||||
"""State change history."""
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ from .db_schema import (
|
|||
TABLE_STATES,
|
||||
Base,
|
||||
Events,
|
||||
EventTypes,
|
||||
SchemaChanges,
|
||||
States,
|
||||
Statistics,
|
||||
|
@ -44,6 +45,7 @@ from .db_schema import (
|
|||
)
|
||||
from .models import process_timestamp
|
||||
from .queries import (
|
||||
find_event_type_to_migrate,
|
||||
find_events_context_ids_to_migrate,
|
||||
find_states_context_ids_to_migrate,
|
||||
)
|
||||
|
@ -978,6 +980,11 @@ def _apply_update( # noqa: C901
|
|||
)
|
||||
_create_index(session_maker, "events", "ix_events_context_id_bin")
|
||||
_create_index(session_maker, "states", "ix_states_context_id_bin")
|
||||
elif new_version == 37:
|
||||
_add_columns(session_maker, "events", [f"event_type_id {big_int}"])
|
||||
_create_index(session_maker, "events", "ix_events_event_type_id")
|
||||
_drop_index(session_maker, "events", "ix_events_event_type_time_fired_ts")
|
||||
_create_index(session_maker, "events", "ix_events_event_type_id_time_fired_ts")
|
||||
else:
|
||||
raise ValueError(f"No schema migration defined for version {new_version}")
|
||||
|
||||
|
@ -1288,6 +1295,57 @@ def migrate_context_ids(instance: Recorder) -> bool:
|
|||
return is_done
|
||||
|
||||
|
||||
def migrate_event_type_ids(instance: Recorder) -> bool:
|
||||
"""Migrate event_type to event_type_ids."""
|
||||
session_maker = instance.get_session
|
||||
_LOGGER.debug("Migrating event_types")
|
||||
event_type_manager = instance.event_type_manager
|
||||
with session_scope(session=session_maker()) as session:
|
||||
if events := session.execute(find_event_type_to_migrate()).all():
|
||||
event_types = {event_type for _, event_type in events}
|
||||
event_type_to_id = event_type_manager.get_many(event_types, session)
|
||||
if missing_event_types := {
|
||||
event_type
|
||||
for event_type, event_id in event_type_to_id.items()
|
||||
if event_id is None
|
||||
}:
|
||||
missing_db_event_types = [
|
||||
EventTypes(event_type=event_type)
|
||||
for event_type in missing_event_types
|
||||
]
|
||||
session.add_all(missing_db_event_types)
|
||||
session.flush() # Assign ids
|
||||
for db_event_type in missing_db_event_types:
|
||||
# We cannot add the assigned ids to the event_type_manager
|
||||
# because the commit could get rolled back
|
||||
assert db_event_type.event_type is not None
|
||||
event_type_to_id[
|
||||
db_event_type.event_type
|
||||
] = db_event_type.event_type_id
|
||||
|
||||
session.execute(
|
||||
update(Events),
|
||||
[
|
||||
{
|
||||
"event_id": event_id,
|
||||
"event_type": None,
|
||||
"event_type_id": event_type_to_id[event_type],
|
||||
}
|
||||
for event_id, event_type in events
|
||||
],
|
||||
)
|
||||
|
||||
# If there is more work to do return False
|
||||
# so that we can be called again
|
||||
is_done = not events
|
||||
|
||||
if is_done:
|
||||
instance.event_type_manager.active = True
|
||||
|
||||
_LOGGER.debug("Migrating event_types done=%s", is_done)
|
||||
return is_done
|
||||
|
||||
|
||||
def _initialize_database(session: Session) -> bool:
|
||||
"""Initialize a new database.
|
||||
|
||||
|
|
|
@ -24,12 +24,14 @@ from .queries import (
|
|||
data_ids_exist_in_events_with_fast_in_distinct,
|
||||
delete_event_data_rows,
|
||||
delete_event_rows,
|
||||
delete_event_types_rows,
|
||||
delete_recorder_runs_rows,
|
||||
delete_states_attributes_rows,
|
||||
delete_states_rows,
|
||||
delete_statistics_runs_rows,
|
||||
delete_statistics_short_term_rows,
|
||||
disconnect_states_rows,
|
||||
find_event_types_to_purge,
|
||||
find_events_to_purge,
|
||||
find_latest_statistics_runs_run_id,
|
||||
find_legacy_event_state_and_attributes_and_data_ids_to_purge,
|
||||
|
@ -109,6 +111,11 @@ def purge_old_data(
|
|||
_LOGGER.debug("Cleanup filtered data hasn't fully completed yet")
|
||||
return False
|
||||
|
||||
# This purge cycle is finished, clean up old event types and
|
||||
# recorder runs
|
||||
if instance.event_type_manager.active:
|
||||
_purge_old_event_types(instance, session)
|
||||
|
||||
_purge_old_recorder_runs(instance, session, purge_before)
|
||||
if repack:
|
||||
repack_database(instance)
|
||||
|
@ -564,6 +571,25 @@ def _purge_old_recorder_runs(
|
|||
_LOGGER.debug("Deleted %s recorder_runs", deleted_rows)
|
||||
|
||||
|
||||
def _purge_old_event_types(instance: Recorder, session: Session) -> None:
|
||||
"""Purge all old event types."""
|
||||
# Event types is small, no need to batch run it
|
||||
purge_event_types = set()
|
||||
event_type_ids = set()
|
||||
for event_type_id, event_type in session.execute(find_event_types_to_purge()):
|
||||
purge_event_types.add(event_type)
|
||||
event_type_ids.add(event_type_id)
|
||||
|
||||
if not event_type_ids:
|
||||
return
|
||||
|
||||
deleted_rows = session.execute(delete_event_types_rows(event_type_ids))
|
||||
_LOGGER.debug("Deleted %s event types", deleted_rows)
|
||||
|
||||
# Evict any entries in the event_type cache referring to a purged state
|
||||
instance.event_type_manager.evict_purged(purge_event_types)
|
||||
|
||||
|
||||
def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
|
||||
"""Remove filtered states and events that shouldn't be in the database."""
|
||||
_LOGGER.debug("Cleanup filtered data")
|
||||
|
|
|
@ -12,6 +12,7 @@ from .const import SQLITE_MAX_BIND_VARS
|
|||
from .db_schema import (
|
||||
EventData,
|
||||
Events,
|
||||
EventTypes,
|
||||
RecorderRuns,
|
||||
StateAttributes,
|
||||
States,
|
||||
|
@ -20,6 +21,17 @@ from .db_schema import (
|
|||
)
|
||||
|
||||
|
||||
def select_event_type_ids(event_types: tuple[str, ...]) -> Select:
|
||||
"""Generate a select for event type ids.
|
||||
|
||||
This query is intentionally not a lambda statement as it is used inside
|
||||
other lambda statements.
|
||||
"""
|
||||
return select(EventTypes.event_type_id).where(
|
||||
EventTypes.event_type.in_(event_types)
|
||||
)
|
||||
|
||||
|
||||
def get_shared_attributes(hashes: list[int]) -> StatementLambdaElement:
|
||||
"""Load shared attributes from the database."""
|
||||
return lambda_stmt(
|
||||
|
@ -38,6 +50,15 @@ def get_shared_event_datas(hashes: list[int]) -> StatementLambdaElement:
|
|||
)
|
||||
|
||||
|
||||
def find_event_type_ids(event_types: Iterable[str]) -> StatementLambdaElement:
|
||||
"""Find an event_type id by event_type."""
|
||||
return lambda_stmt(
|
||||
lambda: select(EventTypes.event_type_id, EventTypes.event_type).filter(
|
||||
EventTypes.event_type.in_(event_types)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def find_shared_attributes_id(
|
||||
data_hash: int, shared_attrs: str
|
||||
) -> StatementLambdaElement:
|
||||
|
@ -683,6 +704,25 @@ def find_events_context_ids_to_migrate() -> StatementLambdaElement:
|
|||
)
|
||||
|
||||
|
||||
def find_event_type_to_migrate() -> StatementLambdaElement:
|
||||
"""Find events event_type to migrate."""
|
||||
return lambda_stmt(
|
||||
lambda: select(
|
||||
Events.event_id,
|
||||
Events.event_type,
|
||||
)
|
||||
.filter(Events.event_type_id.is_(None))
|
||||
.limit(SQLITE_MAX_BIND_VARS)
|
||||
)
|
||||
|
||||
|
||||
def has_event_type_to_migrate() -> StatementLambdaElement:
|
||||
"""Check if there are event_types to migrate."""
|
||||
return lambda_stmt(
|
||||
lambda: select(Events.event_id).filter(Events.event_type_id.is_(None)).limit(1)
|
||||
)
|
||||
|
||||
|
||||
def find_states_context_ids_to_migrate() -> StatementLambdaElement:
|
||||
"""Find events context_ids to migrate."""
|
||||
return lambda_stmt(
|
||||
|
@ -695,3 +735,29 @@ def find_states_context_ids_to_migrate() -> StatementLambdaElement:
|
|||
.filter(States.context_id_bin.is_(None))
|
||||
.limit(SQLITE_MAX_BIND_VARS)
|
||||
)
|
||||
|
||||
|
||||
def find_event_types_to_purge() -> StatementLambdaElement:
|
||||
"""Find event_type_ids to purge."""
|
||||
return lambda_stmt(
|
||||
lambda: select(EventTypes.event_type_id, EventTypes.event_type).where(
|
||||
EventTypes.event_type_id.not_in(
|
||||
select(EventTypes.event_type_id).join(
|
||||
used_event_type_ids := select(
|
||||
distinct(Events.event_type_id).label("used_event_type_id")
|
||||
).subquery(),
|
||||
EventTypes.event_type_id
|
||||
== used_event_type_ids.c.used_event_type_id,
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def delete_event_types_rows(event_type_ids: Iterable[int]) -> StatementLambdaElement:
|
||||
"""Delete EventTypes rows."""
|
||||
return lambda_stmt(
|
||||
lambda: delete(EventTypes)
|
||||
.where(EventTypes.event_type_id.in_(event_type_ids))
|
||||
.execution_options(synchronize_session=False)
|
||||
)
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
"""Support managing EventTypes."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterable
|
||||
from typing import cast
|
||||
|
||||
from lru import LRU # pylint: disable=no-name-in-module
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
from homeassistant.core import Event
|
||||
|
||||
from ..db_schema import EventTypes
|
||||
from ..queries import find_event_type_ids
|
||||
|
||||
CACHE_SIZE = 2048
|
||||
|
||||
|
||||
class EventTypeManager:
|
||||
"""Manage the EventTypes table."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize the event type manager."""
|
||||
self._id_map: dict[str, int] = LRU(CACHE_SIZE)
|
||||
self._pending: dict[str, EventTypes] = {}
|
||||
self.active = False
|
||||
|
||||
def load(self, events: list[Event], session: Session) -> None:
|
||||
"""Load the event_type to event_type_ids mapping into memory."""
|
||||
self.get_many(
|
||||
(event.event_type for event in events if event.event_type is not None),
|
||||
session,
|
||||
)
|
||||
|
||||
def get(self, event_type: str, session: Session) -> int | None:
|
||||
"""Resolve event_type to the event_type_id."""
|
||||
return self.get_many((event_type,), session)[event_type]
|
||||
|
||||
def get_many(
|
||||
self, event_types: Iterable[str], session: Session
|
||||
) -> dict[str, int | None]:
|
||||
"""Resolve event_types to event_type_ids."""
|
||||
results: dict[str, int | None] = {}
|
||||
missing: list[str] = []
|
||||
for event_type in event_types:
|
||||
if (event_type_id := self._id_map.get(event_type)) is None:
|
||||
missing.append(event_type)
|
||||
|
||||
results[event_type] = event_type_id
|
||||
|
||||
if not missing:
|
||||
return results
|
||||
|
||||
with session.no_autoflush:
|
||||
for event_type_id, event_type in session.execute(
|
||||
find_event_type_ids(missing)
|
||||
):
|
||||
results[event_type] = self._id_map[event_type] = cast(
|
||||
int, event_type_id
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
def get_pending(self, event_type: str) -> EventTypes | None:
|
||||
"""Get pending EventTypes that have not be assigned ids yet."""
|
||||
return self._pending.get(event_type)
|
||||
|
||||
def add_pending(self, db_event_type: EventTypes) -> None:
|
||||
"""Add a pending EventTypes that will be committed at the next interval."""
|
||||
assert db_event_type.event_type is not None
|
||||
event_type: str = db_event_type.event_type
|
||||
self._pending[event_type] = db_event_type
|
||||
|
||||
def post_commit_pending(self) -> None:
|
||||
"""Call after commit to load the event_type_ids of the new EventTypes into the LRU."""
|
||||
for event_type, db_event_types in self._pending.items():
|
||||
self._id_map[event_type] = db_event_types.event_type_id
|
||||
self._pending.clear()
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset the event manager after the database has been reset or changed."""
|
||||
self._id_map.clear()
|
||||
self._pending.clear()
|
||||
|
||||
def evict_purged(self, event_types: Iterable[str]) -> None:
|
||||
"""Evict purged event_types from the cache when they are no longer used."""
|
||||
for event_type in event_types:
|
||||
self._id_map.pop(event_type, None)
|
|
@ -356,3 +356,19 @@ class ContextIDMigrationTask(RecorderTask):
|
|||
if not instance._migrate_context_ids(): # pylint: disable=[protected-access]
|
||||
# Schedule a new migration task if this one didn't finish
|
||||
instance.queue_task(ContextIDMigrationTask())
|
||||
|
||||
|
||||
@dataclass
|
||||
class EventTypeIDMigrationTask(RecorderTask):
|
||||
"""An object to insert into the recorder queue to migrate event type ids."""
|
||||
|
||||
commit_before = True
|
||||
# We have to commit before to make sure there are
|
||||
# no new pending event_types about to be added to
|
||||
# the db since this happens live
|
||||
|
||||
def run(self, instance: Recorder) -> None:
|
||||
"""Run event type id migration task."""
|
||||
if not instance._migrate_event_type_ids(): # pylint: disable=[protected-access]
|
||||
# Schedule a new migration task if this one didn't finish
|
||||
instance.queue_task(EventTypeIDMigrationTask())
|
||||
|
|
|
@ -69,7 +69,9 @@ def db_schema_30():
|
|||
|
||||
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
|
||||
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
|
||||
), patch.object(core, "EventData", old_db_schema.EventData), patch.object(
|
||||
), patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(
|
||||
core, "EventData", old_db_schema.EventData
|
||||
), patch.object(
|
||||
core, "States", old_db_schema.States
|
||||
), patch.object(
|
||||
core, "Events", old_db_schema.Events
|
||||
|
|
|
@ -69,10 +69,12 @@ TABLE_STATISTICS_META = "statistics_meta"
|
|||
TABLE_STATISTICS_RUNS = "statistics_runs"
|
||||
TABLE_STATISTICS_SHORT_TERM = "statistics_short_term"
|
||||
TABLE_EVENT_DATA = "event_data"
|
||||
TABLE_EVENT_TYPES = "event_types"
|
||||
|
||||
ALL_TABLES = [
|
||||
TABLE_STATES,
|
||||
TABLE_EVENTS,
|
||||
TABLE_EVENT_TYPES,
|
||||
TABLE_RECORDER_RUNS,
|
||||
TABLE_SCHEMA_CHANGES,
|
||||
TABLE_STATISTICS,
|
||||
|
@ -141,9 +143,13 @@ class Events(Base): # type: ignore
|
|||
context_parent_id_bin = Column(
|
||||
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
|
||||
) # *** Not originally in v23, only added for recorder to startup ok
|
||||
event_type_id = Column(
|
||||
Integer, ForeignKey("event_types.event_type_id"), index=True
|
||||
) # *** Not originally in v23, only added for recorder to startup ok
|
||||
event_data_rel = relationship(
|
||||
"EventData"
|
||||
) # *** Not originally in v23, only added for recorder to startup ok
|
||||
event_type_rel = relationship("EventTypes")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Return string representation of instance for debugging."""
|
||||
|
@ -204,6 +210,19 @@ class EventData(Base): # type: ignore[misc,valid-type]
|
|||
shared_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
|
||||
|
||||
|
||||
# *** Not originally in v23, only added for recorder to startup ok
|
||||
# This is not being tested by the v23 statistics migration tests
|
||||
class EventTypes(Base): # type: ignore[misc,valid-type]
|
||||
"""Event type history."""
|
||||
|
||||
__table_args__ = (
|
||||
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
|
||||
)
|
||||
__tablename__ = TABLE_EVENT_TYPES
|
||||
event_type_id = Column(Integer, Identity(), primary_key=True)
|
||||
event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE))
|
||||
|
||||
|
||||
class States(Base): # type: ignore
|
||||
"""State change history."""
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ from sqlalchemy import (
|
|||
Identity,
|
||||
Index,
|
||||
Integer,
|
||||
LargeBinary,
|
||||
SmallInteger,
|
||||
String,
|
||||
Text,
|
||||
|
@ -54,6 +55,7 @@ DB_TIMEZONE = "+00:00"
|
|||
|
||||
TABLE_EVENTS = "events"
|
||||
TABLE_EVENT_DATA = "event_data"
|
||||
TABLE_EVENT_TYPES = "event_types"
|
||||
TABLE_STATES = "states"
|
||||
TABLE_STATE_ATTRIBUTES = "state_attributes"
|
||||
TABLE_RECORDER_RUNS = "recorder_runs"
|
||||
|
@ -68,6 +70,7 @@ ALL_TABLES = [
|
|||
TABLE_STATE_ATTRIBUTES,
|
||||
TABLE_EVENTS,
|
||||
TABLE_EVENT_DATA,
|
||||
TABLE_EVENT_TYPES,
|
||||
TABLE_RECORDER_RUNS,
|
||||
TABLE_SCHEMA_CHANGES,
|
||||
TABLE_STATISTICS,
|
||||
|
@ -98,6 +101,11 @@ DOUBLE_TYPE = (
|
|||
)
|
||||
EVENT_ORIGIN_ORDER = [EventOrigin.local, EventOrigin.remote]
|
||||
EVENT_ORIGIN_TO_IDX = {origin: idx for idx, origin in enumerate(EVENT_ORIGIN_ORDER)}
|
||||
CONTEXT_ID_BIN_MAX_LENGTH = 16
|
||||
EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin"
|
||||
STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin"
|
||||
|
||||
TIMESTAMP_TYPE = DOUBLE_TYPE
|
||||
|
||||
|
||||
class Events(Base): # type: ignore[misc,valid-type]
|
||||
|
@ -107,6 +115,12 @@ class Events(Base): # type: ignore[misc,valid-type]
|
|||
# Used for fetching events at a specific time
|
||||
# see logbook
|
||||
Index("ix_events_event_type_time_fired", "event_type", "time_fired"),
|
||||
Index(
|
||||
EVENTS_CONTEXT_ID_BIN_INDEX,
|
||||
"context_id_bin",
|
||||
mysql_length=CONTEXT_ID_BIN_MAX_LENGTH,
|
||||
mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH,
|
||||
),
|
||||
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
|
||||
)
|
||||
__tablename__ = TABLE_EVENTS
|
||||
|
@ -116,11 +130,27 @@ class Events(Base): # type: ignore[misc,valid-type]
|
|||
origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used
|
||||
origin_idx = Column(SmallInteger)
|
||||
time_fired = Column(DATETIME_TYPE, index=True)
|
||||
time_fired_ts = Column(
|
||||
TIMESTAMP_TYPE, index=True
|
||||
) # *** Not originally in v30, only added for recorder to startup ok
|
||||
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
|
||||
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
|
||||
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
|
||||
data_id = Column(Integer, ForeignKey("event_data.data_id"), index=True)
|
||||
context_id_bin = Column(
|
||||
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
|
||||
) # *** Not originally in v28, only added for recorder to startup ok
|
||||
context_user_id_bin = Column(
|
||||
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
|
||||
) # *** Not originally in v28, only added for recorder to startup ok
|
||||
context_parent_id_bin = Column(
|
||||
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
|
||||
) # *** Not originally in v28, only added for recorder to startup ok
|
||||
event_type_id = Column(
|
||||
Integer, ForeignKey("event_types.event_type_id"), index=True
|
||||
) # *** Not originally in v28, only added for recorder to startup ok
|
||||
event_data_rel = relationship("EventData")
|
||||
event_type_rel = relationship("EventTypes")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Return string representation of instance for debugging."""
|
||||
|
@ -214,6 +244,19 @@ class EventData(Base): # type: ignore[misc,valid-type]
|
|||
return {}
|
||||
|
||||
|
||||
# *** Not originally in v28, only added for recorder to startup ok
|
||||
# This is not being tested by the v28 statistics migration tests
|
||||
class EventTypes(Base): # type: ignore[misc,valid-type]
|
||||
"""Event type history."""
|
||||
|
||||
__table_args__ = (
|
||||
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
|
||||
)
|
||||
__tablename__ = TABLE_EVENT_TYPES
|
||||
event_type_id = Column(Integer, Identity(), primary_key=True)
|
||||
event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE))
|
||||
|
||||
|
||||
class States(Base): # type: ignore[misc,valid-type]
|
||||
"""State change history."""
|
||||
|
||||
|
|
|
@ -64,6 +64,7 @@ _LOGGER = logging.getLogger(__name__)
|
|||
|
||||
TABLE_EVENTS = "events"
|
||||
TABLE_EVENT_DATA = "event_data"
|
||||
TABLE_EVENT_TYPES = "event_types"
|
||||
TABLE_STATES = "states"
|
||||
TABLE_STATE_ATTRIBUTES = "state_attributes"
|
||||
TABLE_RECORDER_RUNS = "recorder_runs"
|
||||
|
@ -78,6 +79,7 @@ ALL_TABLES = [
|
|||
TABLE_STATE_ATTRIBUTES,
|
||||
TABLE_EVENTS,
|
||||
TABLE_EVENT_DATA,
|
||||
TABLE_EVENT_TYPES,
|
||||
TABLE_RECORDER_RUNS,
|
||||
TABLE_SCHEMA_CHANGES,
|
||||
TABLE_STATISTICS,
|
||||
|
@ -212,6 +214,9 @@ class Events(Base): # type: ignore[misc,valid-type]
|
|||
origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used for new rows
|
||||
origin_idx = Column(SmallInteger)
|
||||
time_fired = Column(DATETIME_TYPE, index=True)
|
||||
time_fired_ts = Column(
|
||||
TIMESTAMP_TYPE, index=True
|
||||
) # *** Not originally in v30, only added for recorder to startup ok
|
||||
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
|
||||
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
|
||||
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
|
||||
|
@ -225,7 +230,11 @@ class Events(Base): # type: ignore[misc,valid-type]
|
|||
context_parent_id_bin = Column(
|
||||
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
|
||||
) # *** Not originally in v30, only added for recorder to startup ok
|
||||
event_type_id = Column(
|
||||
Integer, ForeignKey("event_types.event_type_id"), index=True
|
||||
) # *** Not originally in v30, only added for recorder to startup ok
|
||||
event_data_rel = relationship("EventData")
|
||||
event_type_rel = relationship("EventTypes")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Return string representation of instance for debugging."""
|
||||
|
@ -322,6 +331,19 @@ class EventData(Base): # type: ignore[misc,valid-type]
|
|||
return {}
|
||||
|
||||
|
||||
# *** Not originally in v30, only added for recorder to startup ok
|
||||
# This is not being tested by the v30 statistics migration tests
|
||||
class EventTypes(Base): # type: ignore[misc,valid-type]
|
||||
"""Event type history."""
|
||||
|
||||
__table_args__ = (
|
||||
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
|
||||
)
|
||||
__tablename__ = TABLE_EVENT_TYPES
|
||||
event_type_id = Column(Integer, Identity(), primary_key=True)
|
||||
event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE))
|
||||
|
||||
|
||||
class States(Base): # type: ignore[misc,valid-type]
|
||||
"""State change history."""
|
||||
|
||||
|
|
|
@ -65,7 +65,9 @@ def db_schema_30():
|
|||
|
||||
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
|
||||
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
|
||||
), patch.object(core, "EventData", old_db_schema.EventData), patch.object(
|
||||
), patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(
|
||||
core, "EventData", old_db_schema.EventData
|
||||
), patch.object(
|
||||
core, "States", old_db_schema.States
|
||||
), patch.object(
|
||||
core, "Events", old_db_schema.Events
|
||||
|
|
|
@ -39,12 +39,14 @@ from homeassistant.components.recorder.db_schema import (
|
|||
SCHEMA_VERSION,
|
||||
EventData,
|
||||
Events,
|
||||
EventTypes,
|
||||
RecorderRuns,
|
||||
StateAttributes,
|
||||
States,
|
||||
StatisticsRuns,
|
||||
)
|
||||
from homeassistant.components.recorder.models import process_timestamp
|
||||
from homeassistant.components.recorder.queries import select_event_type_ids
|
||||
from homeassistant.components.recorder.services import (
|
||||
SERVICE_DISABLE,
|
||||
SERVICE_ENABLE,
|
||||
|
@ -483,16 +485,19 @@ def test_saving_event(hass_recorder: Callable[..., HomeAssistant]) -> None:
|
|||
events: list[Event] = []
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
for select_event, event_data in (
|
||||
session.query(Events, EventData)
|
||||
.filter_by(event_type=event_type)
|
||||
for select_event, event_data, event_types in (
|
||||
session.query(Events, EventData, EventTypes)
|
||||
.filter(Events.event_type_id.in_(select_event_type_ids((event_type,))))
|
||||
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
|
||||
.outerjoin(EventData, Events.data_id == EventData.data_id)
|
||||
):
|
||||
select_event = cast(Events, select_event)
|
||||
event_data = cast(EventData, event_data)
|
||||
event_types = cast(EventTypes, event_types)
|
||||
|
||||
native_event = select_event.to_native()
|
||||
native_event.data = event_data.to_native()
|
||||
native_event.event_type = event_types.event_type
|
||||
events.append(native_event)
|
||||
|
||||
db_event = events[0]
|
||||
|
@ -555,15 +560,19 @@ def _add_events(hass, events):
|
|||
|
||||
with session_scope(hass=hass) as session:
|
||||
events = []
|
||||
for event, event_data in session.query(Events, EventData).outerjoin(
|
||||
EventData, Events.data_id == EventData.data_id
|
||||
for event, event_data, event_types in (
|
||||
session.query(Events, EventData, EventTypes)
|
||||
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
|
||||
.outerjoin(EventData, Events.data_id == EventData.data_id)
|
||||
):
|
||||
event = cast(Events, event)
|
||||
event_data = cast(EventData, event_data)
|
||||
event_types = cast(EventTypes, event_types)
|
||||
|
||||
native_event = event.to_native()
|
||||
if event_data:
|
||||
native_event.data = event_data.to_native()
|
||||
native_event.event_type = event_types.event_type
|
||||
events.append(native_event)
|
||||
return events
|
||||
|
||||
|
@ -1349,7 +1358,11 @@ def test_service_disable_events_not_recording(
|
|||
event = events[0]
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
db_events = list(session.query(Events).filter_by(event_type=event_type))
|
||||
db_events = list(
|
||||
session.query(Events)
|
||||
.filter(Events.event_type_id.in_(select_event_type_ids((event_type,))))
|
||||
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
|
||||
)
|
||||
assert len(db_events) == 0
|
||||
|
||||
assert hass.services.call(
|
||||
|
@ -1369,16 +1382,19 @@ def test_service_disable_events_not_recording(
|
|||
|
||||
db_events = []
|
||||
with session_scope(hass=hass) as session:
|
||||
for select_event, event_data in (
|
||||
session.query(Events, EventData)
|
||||
.filter_by(event_type=event_type)
|
||||
for select_event, event_data, event_types in (
|
||||
session.query(Events, EventData, EventTypes)
|
||||
.filter(Events.event_type_id.in_(select_event_type_ids((event_type,))))
|
||||
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
|
||||
.outerjoin(EventData, Events.data_id == EventData.data_id)
|
||||
):
|
||||
select_event = cast(Events, select_event)
|
||||
event_data = cast(EventData, event_data)
|
||||
event_types = cast(EventTypes, event_types)
|
||||
|
||||
native_event = select_event.to_native()
|
||||
native_event.data = event_data.to_native()
|
||||
native_event.event_type = event_types.event_type
|
||||
db_events.append(native_event)
|
||||
|
||||
assert len(db_events) == 1
|
||||
|
@ -1558,6 +1574,7 @@ def test_entity_id_filter(hass_recorder: Callable[..., HomeAssistant]) -> None:
|
|||
hass = hass_recorder(
|
||||
{"include": {"domains": "hello"}, "exclude": {"domains": "hidden_domain"}}
|
||||
)
|
||||
event_types = ("hello",)
|
||||
|
||||
for idx, data in enumerate(
|
||||
(
|
||||
|
@ -1572,7 +1589,11 @@ def test_entity_id_filter(hass_recorder: Callable[..., HomeAssistant]) -> None:
|
|||
wait_recording_done(hass)
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
db_events = list(session.query(Events).filter_by(event_type="hello"))
|
||||
db_events = list(
|
||||
session.query(Events).filter(
|
||||
Events.event_type_id.in_(select_event_type_ids(event_types))
|
||||
)
|
||||
)
|
||||
assert len(db_events) == idx + 1, data
|
||||
|
||||
for data in (
|
||||
|
@ -1583,7 +1604,11 @@ def test_entity_id_filter(hass_recorder: Callable[..., HomeAssistant]) -> None:
|
|||
wait_recording_done(hass)
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
db_events = list(session.query(Events).filter_by(event_type="hello"))
|
||||
db_events = list(
|
||||
session.query(Events).filter(
|
||||
Events.event_type_id.in_(select_event_type_ids(event_types))
|
||||
)
|
||||
)
|
||||
# Keep referring idx + 1, as no new events are being added
|
||||
assert len(db_events) == idx + 1, data
|
||||
|
||||
|
@ -1608,10 +1633,16 @@ async def test_database_lock_and_unlock(
|
|||
}
|
||||
await async_setup_recorder_instance(hass, config)
|
||||
await hass.async_block_till_done()
|
||||
event_type = "EVENT_TEST"
|
||||
event_types = (event_type,)
|
||||
|
||||
def _get_db_events():
|
||||
with session_scope(hass=hass) as session:
|
||||
return list(session.query(Events).filter_by(event_type=event_type))
|
||||
return list(
|
||||
session.query(Events).filter(
|
||||
Events.event_type_id.in_(select_event_type_ids(event_types))
|
||||
)
|
||||
)
|
||||
|
||||
instance = get_instance(hass)
|
||||
|
||||
|
@ -1619,7 +1650,6 @@ async def test_database_lock_and_unlock(
|
|||
|
||||
assert not await instance.lock_database()
|
||||
|
||||
event_type = "EVENT_TEST"
|
||||
event_data = {"test_attr": 5, "test_attr_10": "nice"}
|
||||
hass.bus.async_fire(event_type, event_data)
|
||||
task = asyncio.create_task(async_wait_recording_done(hass))
|
||||
|
@ -1658,10 +1688,16 @@ async def test_database_lock_and_overflow(
|
|||
}
|
||||
await async_setup_recorder_instance(hass, config)
|
||||
await hass.async_block_till_done()
|
||||
event_type = "EVENT_TEST"
|
||||
event_types = (event_type,)
|
||||
|
||||
def _get_db_events():
|
||||
with session_scope(hass=hass) as session:
|
||||
return list(session.query(Events).filter_by(event_type=event_type))
|
||||
return list(
|
||||
session.query(Events).filter(
|
||||
Events.event_type_id.in_(select_event_type_ids(event_types))
|
||||
)
|
||||
)
|
||||
|
||||
instance = get_instance(hass)
|
||||
|
||||
|
@ -1670,7 +1706,6 @@ async def test_database_lock_and_overflow(
|
|||
):
|
||||
await instance.lock_database()
|
||||
|
||||
event_type = "EVENT_TEST"
|
||||
event_data = {"test_attr": 5, "test_attr_10": "nice"}
|
||||
hass.bus.fire(event_type, event_data)
|
||||
|
||||
|
@ -1793,9 +1828,11 @@ def test_deduplication_event_data_inside_commit_interval(
|
|||
wait_recording_done(hass)
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
event_types = ("this_event",)
|
||||
events = list(
|
||||
session.query(Events)
|
||||
.filter(Events.event_type == "this_event")
|
||||
.filter(Events.event_type_id.in_(select_event_type_ids(event_types)))
|
||||
.outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
|
||||
.outerjoin(EventData, (Events.data_id == EventData.data_id))
|
||||
)
|
||||
assert len(events) == 20
|
||||
|
|
|
@ -25,10 +25,15 @@ from homeassistant.components.recorder import db_schema, migration
|
|||
from homeassistant.components.recorder.db_schema import (
|
||||
SCHEMA_VERSION,
|
||||
Events,
|
||||
EventTypes,
|
||||
RecorderRuns,
|
||||
States,
|
||||
)
|
||||
from homeassistant.components.recorder.tasks import ContextIDMigrationTask
|
||||
from homeassistant.components.recorder.queries import select_event_type_ids
|
||||
from homeassistant.components.recorder.tasks import (
|
||||
ContextIDMigrationTask,
|
||||
EventTypeIDMigrationTask,
|
||||
)
|
||||
from homeassistant.components.recorder.util import session_scope
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import recorder as recorder_helper
|
||||
|
@ -688,3 +693,74 @@ async def test_migrate_context_ids(
|
|||
assert invalid_context_id_event["context_id_bin"] == b"\x00" * 16
|
||||
assert invalid_context_id_event["context_user_id_bin"] is None
|
||||
assert invalid_context_id_event["context_parent_id_bin"] is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("enable_migrate_event_type_ids", [True])
|
||||
async def test_migrate_event_type_ids(
|
||||
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Test we can migrate event_types to the EventTypes table."""
|
||||
instance = await async_setup_recorder_instance(hass)
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
def _insert_events():
|
||||
with session_scope(hass=hass) as session:
|
||||
session.add_all(
|
||||
(
|
||||
Events(
|
||||
event_type="event_type_one",
|
||||
origin_idx=0,
|
||||
time_fired_ts=1677721632.452529,
|
||||
),
|
||||
Events(
|
||||
event_type="event_type_one",
|
||||
origin_idx=0,
|
||||
time_fired_ts=1677721632.552529,
|
||||
),
|
||||
Events(
|
||||
event_type="event_type_two",
|
||||
origin_idx=0,
|
||||
time_fired_ts=1677721632.552529,
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
await instance.async_add_executor_job(_insert_events)
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
# This is a threadsafe way to add a task to the recorder
|
||||
instance.queue_task(EventTypeIDMigrationTask())
|
||||
await async_recorder_block_till_done(hass)
|
||||
|
||||
def _fetch_migrated_events():
|
||||
with session_scope(hass=hass) as session:
|
||||
events = (
|
||||
session.query(Events.event_id, Events.time_fired, EventTypes.event_type)
|
||||
.filter(
|
||||
Events.event_type_id.in_(
|
||||
select_event_type_ids(
|
||||
(
|
||||
"event_type_one",
|
||||
"event_type_two",
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
.outerjoin(EventTypes, Events.event_type_id == EventTypes.event_type_id)
|
||||
.all()
|
||||
)
|
||||
assert len(events) == 3
|
||||
result = {}
|
||||
for event in events:
|
||||
result.setdefault(event.event_type, []).append(
|
||||
{
|
||||
"event_id": event.event_id,
|
||||
"time_fired": event.time_fired,
|
||||
"event_type": event.event_type,
|
||||
}
|
||||
)
|
||||
return result
|
||||
|
||||
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
|
||||
assert len(events_by_type["event_type_one"]) == 2
|
||||
assert len(events_by_type["event_type_two"]) == 1
|
||||
|
|
|
@ -31,6 +31,7 @@ def test_from_event_to_db_event() -> None:
|
|||
db_event = Events.from_event(event)
|
||||
dialect = SupportedDialect.MYSQL
|
||||
db_event.event_data = EventData.shared_data_bytes_from_event(event, dialect)
|
||||
db_event.event_type = event.event_type
|
||||
assert event.as_dict() == db_event.to_native().as_dict()
|
||||
|
||||
|
||||
|
@ -232,11 +233,13 @@ async def test_event_to_db_model() -> None:
|
|||
db_event = Events.from_event(event)
|
||||
dialect = SupportedDialect.MYSQL
|
||||
db_event.event_data = EventData.shared_data_bytes_from_event(event, dialect)
|
||||
db_event.event_type = event.event_type
|
||||
native = db_event.to_native()
|
||||
assert native.as_dict() == event.as_dict()
|
||||
|
||||
native = Events.from_event(event).to_native()
|
||||
event.data = {}
|
||||
native.event_type = event.event_type
|
||||
assert native.as_dict() == event.as_dict()
|
||||
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ from homeassistant.components.recorder.const import (
|
|||
from homeassistant.components.recorder.db_schema import (
|
||||
EventData,
|
||||
Events,
|
||||
EventTypes,
|
||||
RecorderRuns,
|
||||
StateAttributes,
|
||||
States,
|
||||
|
@ -31,6 +32,7 @@ from homeassistant.components.recorder.tasks import PurgeTask
|
|||
from homeassistant.components.recorder.util import session_scope
|
||||
from homeassistant.const import EVENT_STATE_CHANGED, EVENT_THEMES_UPDATED, STATE_ON
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.json import json_dumps
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
|
@ -1684,3 +1686,113 @@ async def test_purge_can_mix_legacy_and_new_format(
|
|||
# does not prevent future purges. Its ignored.
|
||||
assert states_with_event_id.count() == 0
|
||||
assert states_without_event_id.count() == 1
|
||||
|
||||
|
||||
async def test_purge_old_events_purges_the_event_type_ids(
|
||||
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Test deleting old events purges event type ids."""
|
||||
instance = await async_setup_recorder_instance(hass)
|
||||
assert instance.event_type_manager.active is True
|
||||
|
||||
utcnow = dt_util.utcnow()
|
||||
five_days_ago = utcnow - timedelta(days=5)
|
||||
eleven_days_ago = utcnow - timedelta(days=11)
|
||||
far_past = utcnow - timedelta(days=1000)
|
||||
event_data = {"test_attr": 5, "test_attr_10": "nice"}
|
||||
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
def _insert_events():
|
||||
with session_scope(hass=hass) as session:
|
||||
event_type_test_auto_purge = EventTypes(event_type="EVENT_TEST_AUTOPURGE")
|
||||
event_type_test_purge = EventTypes(event_type="EVENT_TEST_PURGE")
|
||||
event_type_test = EventTypes(event_type="EVENT_TEST")
|
||||
event_type_unused = EventTypes(event_type="EVENT_TEST_UNUSED")
|
||||
session.add_all(
|
||||
(
|
||||
event_type_test_auto_purge,
|
||||
event_type_test_purge,
|
||||
event_type_test,
|
||||
event_type_unused,
|
||||
)
|
||||
)
|
||||
session.flush()
|
||||
for _ in range(5):
|
||||
for event_id in range(6):
|
||||
if event_id < 2:
|
||||
timestamp = eleven_days_ago
|
||||
event_type = event_type_test_auto_purge
|
||||
elif event_id < 4:
|
||||
timestamp = five_days_ago
|
||||
event_type = event_type_test_purge
|
||||
else:
|
||||
timestamp = utcnow
|
||||
event_type = event_type_test
|
||||
|
||||
session.add(
|
||||
Events(
|
||||
event_type=None,
|
||||
event_type_id=event_type.event_type_id,
|
||||
event_data=json_dumps(event_data),
|
||||
origin="LOCAL",
|
||||
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
||||
)
|
||||
)
|
||||
return instance.event_type_manager.get_many(
|
||||
[
|
||||
"EVENT_TEST_AUTOPURGE",
|
||||
"EVENT_TEST_PURGE",
|
||||
"EVENT_TEST",
|
||||
"EVENT_TEST_UNUSED",
|
||||
],
|
||||
session,
|
||||
)
|
||||
|
||||
event_type_to_id = await instance.async_add_executor_job(_insert_events)
|
||||
test_event_type_ids = event_type_to_id.values()
|
||||
with session_scope(hass=hass) as session:
|
||||
events = session.query(Events).where(
|
||||
Events.event_type_id.in_(test_event_type_ids)
|
||||
)
|
||||
event_types = session.query(EventTypes).where(
|
||||
EventTypes.event_type_id.in_(test_event_type_ids)
|
||||
)
|
||||
|
||||
assert events.count() == 30
|
||||
assert event_types.count() == 4
|
||||
|
||||
# run purge_old_data()
|
||||
finished = purge_old_data(
|
||||
instance,
|
||||
far_past,
|
||||
repack=False,
|
||||
)
|
||||
assert finished
|
||||
assert events.count() == 30
|
||||
# We should remove the unused event type
|
||||
assert event_types.count() == 3
|
||||
|
||||
assert "EVENT_TEST_UNUSED" not in instance.event_type_manager._id_map
|
||||
|
||||
# we should only have 10 events left since
|
||||
# only one event type was recorded now
|
||||
finished = purge_old_data(
|
||||
instance,
|
||||
utcnow,
|
||||
repack=False,
|
||||
)
|
||||
assert finished
|
||||
assert events.count() == 10
|
||||
assert event_types.count() == 1
|
||||
|
||||
# Purge everything
|
||||
finished = purge_old_data(
|
||||
instance,
|
||||
utcnow + timedelta(seconds=1),
|
||||
repack=False,
|
||||
)
|
||||
assert finished
|
||||
assert events.count() == 0
|
||||
assert event_types.count() == 0
|
||||
|
|
|
@ -11,6 +11,7 @@ from sqlalchemy.orm import Session
|
|||
|
||||
from homeassistant.components import recorder
|
||||
from homeassistant.components.recorder import SQLITE_URL_PREFIX, core, statistics
|
||||
from homeassistant.components.recorder.queries import select_event_type_ids
|
||||
from homeassistant.components.recorder.util import session_scope
|
||||
from homeassistant.core import EVENT_STATE_CHANGED, Event, EventOrigin, State
|
||||
from homeassistant.helpers import recorder as recorder_helper
|
||||
|
@ -87,7 +88,9 @@ def test_migrate_times(caplog: pytest.LogCaptureFixture, tmpdir) -> None:
|
|||
|
||||
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
|
||||
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
|
||||
), patch.object(core, "EventData", old_db_schema.EventData), patch.object(
|
||||
), patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(
|
||||
core, "EventData", old_db_schema.EventData
|
||||
), patch.object(
|
||||
core, "States", old_db_schema.States
|
||||
), patch.object(
|
||||
core, "Events", old_db_schema.Events
|
||||
|
@ -117,8 +120,10 @@ def test_migrate_times(caplog: pytest.LogCaptureFixture, tmpdir) -> None:
|
|||
wait_recording_done(hass)
|
||||
with session_scope(hass=hass) as session:
|
||||
result = list(
|
||||
session.query(recorder.db_schema.Events).where(
|
||||
recorder.db_schema.Events.event_type == "custom_event"
|
||||
session.query(recorder.db_schema.Events).filter(
|
||||
recorder.db_schema.Events.event_type_id.in_(
|
||||
select_event_type_ids(("custom_event",))
|
||||
)
|
||||
)
|
||||
)
|
||||
assert len(result) == 1
|
||||
|
|
|
@ -1148,6 +1148,16 @@ def enable_migrate_context_ids() -> bool:
|
|||
return False
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def enable_migrate_event_type_ids() -> bool:
|
||||
"""Fixture to control enabling of recorder's event type id migration.
|
||||
|
||||
To enable context id migration, tests can be marked with:
|
||||
@pytest.mark.parametrize("enable_migrate_event_type_ids", [True])
|
||||
"""
|
||||
return False
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def recorder_config() -> dict[str, Any] | None:
|
||||
"""Fixture to override recorder config.
|
||||
|
@ -1291,6 +1301,7 @@ async def async_setup_recorder_instance(
|
|||
enable_statistics: bool,
|
||||
enable_statistics_table_validation: bool,
|
||||
enable_migrate_context_ids: bool,
|
||||
enable_migrate_event_type_ids: bool,
|
||||
) -> AsyncGenerator[RecorderInstanceGenerator, None]:
|
||||
"""Yield callable to setup recorder instance."""
|
||||
# pylint: disable-next=import-outside-toplevel
|
||||
|
@ -1309,6 +1320,11 @@ async def async_setup_recorder_instance(
|
|||
migrate_context_ids = (
|
||||
recorder.Recorder._migrate_context_ids if enable_migrate_context_ids else None
|
||||
)
|
||||
migrate_event_type_ids = (
|
||||
recorder.Recorder._migrate_event_type_ids
|
||||
if enable_migrate_event_type_ids
|
||||
else None
|
||||
)
|
||||
with patch(
|
||||
"homeassistant.components.recorder.Recorder.async_nightly_tasks",
|
||||
side_effect=nightly,
|
||||
|
@ -1325,6 +1341,10 @@ async def async_setup_recorder_instance(
|
|||
"homeassistant.components.recorder.Recorder._migrate_context_ids",
|
||||
side_effect=migrate_context_ids,
|
||||
autospec=True,
|
||||
), patch(
|
||||
"homeassistant.components.recorder.Recorder._migrate_event_type_ids",
|
||||
side_effect=migrate_event_type_ids,
|
||||
autospec=True,
|
||||
):
|
||||
|
||||
async def async_setup_recorder(
|
||||
|
|
Loading…
Add table
Reference in a new issue