Reduce overhead to store context ids in the database (#88942)

This commit is contained in:
J. Nick Koston 2023-03-08 14:51:45 -10:00 committed by GitHub
parent 386533a16f
commit 170a13302c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 676 additions and 160 deletions

View file

@ -2,14 +2,21 @@
from __future__ import annotations
from dataclasses import dataclass
import json
from typing import Any, cast
from sqlalchemy.engine.row import Row
from homeassistant.components.recorder.models import (
bytes_to_ulid_or_none,
bytes_to_uuid_hex_or_none,
ulid_to_bytes_or_none,
uuid_hex_to_bytes_or_none,
)
from homeassistant.const import ATTR_ICON, EVENT_STATE_CHANGED
from homeassistant.core import Context, Event, State, callback
import homeassistant.util.dt as dt_util
from homeassistant.util.json import json_loads
from homeassistant.util.ulid import ulid_to_bytes
class LazyEventPartialState:
@ -22,9 +29,9 @@ class LazyEventPartialState:
"event_type",
"entity_id",
"state",
"context_id",
"context_user_id",
"context_parent_id",
"context_id_bin",
"context_user_id_bin",
"context_parent_id_bin",
"data",
]
@ -40,9 +47,9 @@ class LazyEventPartialState:
self.event_type: str | None = self.row.event_type
self.entity_id: str | None = self.row.entity_id
self.state = self.row.state
self.context_id: str | None = self.row.context_id
self.context_user_id: str | None = self.row.context_user_id
self.context_parent_id: str | None = self.row.context_parent_id
self.context_id_bin: bytes | None = self.row.context_id_bin
self.context_user_id_bin: bytes | None = self.row.context_user_id_bin
self.context_parent_id_bin: bytes | None = self.row.context_parent_id_bin
if data := getattr(row, "data", None):
# If its an EventAsRow we can avoid the whole
# json decode process as we already have the data
@ -55,9 +62,24 @@ class LazyEventPartialState:
self.data = event_data
else:
self.data = self._event_data_cache[source] = cast(
dict[str, Any], json.loads(source)
dict[str, Any], json_loads(source)
)
@property
def context_id(self) -> str | None:
"""Return the context id."""
return bytes_to_ulid_or_none(self.context_id_bin)
@property
def context_user_id(self) -> str | None:
"""Return the context user id."""
return bytes_to_uuid_hex_or_none(self.context_user_id_bin)
@property
def context_parent_id(self) -> str | None:
"""Return the context parent id."""
return bytes_to_ulid_or_none(self.context_parent_id_bin)
@dataclass(frozen=True)
class EventAsRow:
@ -65,7 +87,7 @@ class EventAsRow:
data: dict[str, Any]
context: Context
context_id: str
context_id_bin: bytes
time_fired_ts: float
state_id: int
event_data: str | None = None
@ -73,8 +95,8 @@ class EventAsRow:
event_id: None = None
entity_id: str | None = None
icon: str | None = None
context_user_id: str | None = None
context_parent_id: str | None = None
context_user_id_bin: bytes | None = None
context_parent_id_bin: bytes | None = None
event_type: str | None = None
state: str | None = None
shared_data: str | None = None
@ -85,13 +107,14 @@ class EventAsRow:
def async_event_to_row(event: Event) -> EventAsRow:
"""Convert an event to a row."""
if event.event_type != EVENT_STATE_CHANGED:
context = event.context
return EventAsRow(
data=event.data,
context=event.context,
event_type=event.event_type,
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
context_id_bin=ulid_to_bytes(context.id),
context_user_id_bin=uuid_hex_to_bytes_or_none(context.user_id),
context_parent_id_bin=ulid_to_bytes_or_none(context.parent_id),
time_fired_ts=dt_util.utc_to_timestamp(event.time_fired),
state_id=hash(event),
)
@ -99,14 +122,15 @@ def async_event_to_row(event: Event) -> EventAsRow:
# that are missing new_state or old_state
# since the logbook does not show these
new_state: State = event.data["new_state"]
context = new_state.context
return EventAsRow(
data=event.data,
context=event.context,
entity_id=new_state.entity_id,
state=new_state.state,
context_id=new_state.context.id,
context_user_id=new_state.context.user_id,
context_parent_id=new_state.context.parent_id,
context_id_bin=ulid_to_bytes(context.id),
context_user_id_bin=uuid_hex_to_bytes_or_none(context.user_id),
context_parent_id_bin=ulid_to_bytes_or_none(context.parent_id),
time_fired_ts=dt_util.utc_to_timestamp(new_state.last_updated),
state_id=hash(event),
icon=new_state.attributes.get(ATTR_ICON),

View file

@ -12,6 +12,7 @@ from sqlalchemy.engine.row import Row
from homeassistant.components.recorder.filters import Filters
from homeassistant.components.recorder.models import (
bytes_to_uuid_hex_or_none,
process_datetime_to_timestamp,
process_timestamp_to_utc_isoformat,
)
@ -261,14 +262,14 @@ class ContextLookup:
"""Memorize context origin."""
self.hass = hass
self._memorize_new = True
self._lookup: dict[str | None, Row | EventAsRow | None] = {None: None}
self._lookup: dict[bytes | None, Row | EventAsRow | None] = {None: None}
def memorize(self, row: Row | EventAsRow) -> str | None:
def memorize(self, row: Row | EventAsRow) -> bytes | None:
"""Memorize a context from the database."""
if self._memorize_new:
context_id: str = row.context_id
self._lookup.setdefault(context_id, row)
return context_id
context_id_bin: bytes = row.context_id_bin
self._lookup.setdefault(context_id_bin, row)
return context_id_bin
return None
def clear(self) -> None:
@ -276,9 +277,9 @@ class ContextLookup:
self._lookup.clear()
self._memorize_new = False
def get(self, context_id: str) -> Row | EventAsRow | None:
def get(self, context_id_bin: bytes) -> Row | EventAsRow | None:
"""Get the context origin."""
return self._lookup.get(context_id)
return self._lookup.get(context_id_bin)
class ContextAugmenter:
@ -293,7 +294,7 @@ class ContextAugmenter:
self.include_entity_name = logbook_run.include_entity_name
def _get_context_row(
self, context_id: str | None, row: Row | EventAsRow
self, context_id: bytes | None, row: Row | EventAsRow
) -> Row | EventAsRow | None:
"""Get the context row from the id or row context."""
if context_id:
@ -305,11 +306,11 @@ class ContextAugmenter:
return None
def augment(
self, data: dict[str, Any], row: Row | EventAsRow, context_id: str | None
self, data: dict[str, Any], row: Row | EventAsRow, context_id: bytes | None
) -> None:
"""Augment data from the row and cache."""
if context_user_id := row.context_user_id:
data[CONTEXT_USER_ID] = context_user_id
if context_user_id_bin := row.context_user_id_bin:
data[CONTEXT_USER_ID] = bytes_to_uuid_hex_or_none(context_user_id_bin)
if not (context_row := self._get_context_row(context_id, row)):
return
@ -317,11 +318,12 @@ class ContextAugmenter:
if _rows_match(row, context_row):
# This is the first event with the given ID. Was it directly caused by
# a parent event?
context_parent_id_bin = row.context_parent_id_bin
if (
not row.context_parent_id
not context_parent_id_bin
or (
context_row := self._get_context_row(
row.context_parent_id, context_row
context_parent_id_bin, context_row
)
)
is None

View file

@ -6,6 +6,7 @@ from datetime import datetime as dt
from sqlalchemy.sql.lambdas import StatementLambdaElement
from homeassistant.components.recorder.filters import Filters
from homeassistant.components.recorder.models import ulid_to_bytes_or_none
from homeassistant.helpers.json import json_dumps
from homeassistant.util import dt as dt_util
@ -27,6 +28,7 @@ def statement_for_request(
"""Generate the logbook statement for a logbook request."""
start_day = dt_util.utc_to_timestamp(start_day_dt)
end_day = dt_util.utc_to_timestamp(end_day_dt)
context_id_bin = ulid_to_bytes_or_none(context_id)
# No entities: logbook sends everything for the timeframe
# limited by the context_id and the yaml configured filter
if not entity_ids and not device_ids:
@ -38,7 +40,7 @@ def statement_for_request(
event_types,
states_entity_filter,
events_entity_filter,
context_id,
context_id_bin,
)
# sqlalchemy caches object quoting, the

View file

@ -26,28 +26,28 @@ def all_stmt(
event_types: tuple[str, ...],
states_entity_filter: ColumnElement | None = None,
events_entity_filter: ColumnElement | None = None,
context_id: str | None = None,
context_id_bin: bytes | None = None,
) -> StatementLambdaElement:
"""Generate a logbook query for all entities."""
stmt = lambda_stmt(
lambda: select_events_without_states(start_day, end_day, event_types)
)
if context_id is not None:
if context_id_bin is not None:
# Once all the old `state_changed` events
# are gone from the database remove the
# _legacy_select_events_context_id()
stmt += lambda s: s.where(Events.context_id == context_id).union_all(
stmt += lambda s: s.where(Events.context_id_bin == context_id_bin).union_all(
_states_query_for_context_id(
start_day,
end_day,
# https://github.com/python/mypy/issues/2608
context_id, # type:ignore[arg-type]
context_id_bin, # type:ignore[arg-type]
),
legacy_select_events_context_id(
start_day,
end_day,
# https://github.com/python/mypy/issues/2608
context_id, # type:ignore[arg-type]
context_id_bin, # type:ignore[arg-type]
),
)
else:
@ -76,12 +76,14 @@ def _apply_all_hints(sel: Select) -> Select:
"""Force mysql to use the right index on large selects."""
return sel.with_hint(
States, f"FORCE INDEX ({LAST_UPDATED_INDEX_TS})", dialect_name="mysql"
).with_hint(
States, f"FORCE INDEX ({LAST_UPDATED_INDEX_TS})", dialect_name="mariadb"
)
def _states_query_for_context_id(
start_day: float, end_day: float, context_id: str
start_day: float, end_day: float, context_id_bin: bytes
) -> Select:
return apply_states_filters(select_states(), start_day, end_day).where(
States.context_id == context_id
States.context_id_bin == context_id_bin
)

View file

@ -10,11 +10,11 @@ from sqlalchemy.sql.expression import literal
from sqlalchemy.sql.selectable import Select
from homeassistant.components.recorder.db_schema import (
EVENTS_CONTEXT_ID_INDEX,
EVENTS_CONTEXT_ID_BIN_INDEX,
OLD_FORMAT_ATTRS_JSON,
OLD_STATE,
SHARED_ATTRS_JSON,
STATES_CONTEXT_ID_INDEX,
STATES_CONTEXT_ID_BIN_INDEX,
EventData,
Events,
StateAttributes,
@ -47,9 +47,9 @@ EVENT_COLUMNS = (
Events.event_type.label("event_type"),
Events.event_data.label("event_data"),
Events.time_fired_ts.label("time_fired_ts"),
Events.context_id.label("context_id"),
Events.context_user_id.label("context_user_id"),
Events.context_parent_id.label("context_parent_id"),
Events.context_id_bin.label("context_id_bin"),
Events.context_user_id_bin.label("context_user_id_bin"),
Events.context_parent_id_bin.label("context_parent_id_bin"),
)
STATE_COLUMNS = (
@ -79,9 +79,9 @@ EVENT_COLUMNS_FOR_STATE_SELECT = (
),
literal(value=None, type_=sqlalchemy.Text).label("event_data"),
States.last_updated_ts.label("time_fired_ts"),
States.context_id.label("context_id"),
States.context_user_id.label("context_user_id"),
States.context_parent_id.label("context_parent_id"),
States.context_id_bin.label("context_id_bin"),
States.context_user_id_bin.label("context_user_id_bin"),
States.context_parent_id_bin.label("context_parent_id_bin"),
literal(value=None, type_=sqlalchemy.Text).label("shared_data"),
)
@ -113,7 +113,7 @@ def select_events_context_id_subquery(
) -> Select:
"""Generate the select for a context_id subquery."""
return (
select(Events.context_id)
select(Events.context_id_bin)
.where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day))
.where(Events.event_type.in_(event_types))
.outerjoin(EventData, (Events.data_id == EventData.data_id))
@ -162,7 +162,7 @@ def select_states() -> Select:
def legacy_select_events_context_id(
start_day: float, end_day: float, context_id: str
start_day: float, end_day: float, context_id_bin: bytes
) -> Select:
"""Generate a legacy events context id select that also joins states."""
# This can be removed once we no longer have event_ids in the states table
@ -183,7 +183,7 @@ def legacy_select_events_context_id(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
.where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day))
.where(Events.context_id == context_id)
.where(Events.context_id_bin == context_id_bin)
)
@ -277,12 +277,16 @@ def _not_uom_attributes_matcher() -> BooleanClauseList:
def apply_states_context_hints(sel: Select) -> Select:
"""Force mysql to use the right index on large context_id selects."""
return sel.with_hint(
States, f"FORCE INDEX ({STATES_CONTEXT_ID_INDEX})", dialect_name="mysql"
States, f"FORCE INDEX ({STATES_CONTEXT_ID_BIN_INDEX})", dialect_name="mysql"
).with_hint(
States, f"FORCE INDEX ({STATES_CONTEXT_ID_BIN_INDEX})", dialect_name="mariadb"
)
def apply_events_context_hints(sel: Select) -> Select:
"""Force mysql to use the right index on large context_id selects."""
return sel.with_hint(
Events, f"FORCE INDEX ({EVENTS_CONTEXT_ID_INDEX})", dialect_name="mysql"
Events, f"FORCE INDEX ({EVENTS_CONTEXT_ID_BIN_INDEX})", dialect_name="mysql"
).with_hint(
Events, f"FORCE INDEX ({EVENTS_CONTEXT_ID_BIN_INDEX})", dialect_name="mariadb"
)

View file

@ -36,7 +36,7 @@ def _select_device_id_context_ids_sub_query(
inner = select_events_context_id_subquery(start_day, end_day, event_types).where(
apply_event_device_id_matchers(json_quotable_device_ids)
)
return select(inner.c.context_id).group_by(inner.c.context_id)
return select(inner.c.context_id_bin).group_by(inner.c.context_id_bin)
def _apply_devices_context_union(
@ -57,12 +57,12 @@ def _apply_devices_context_union(
apply_events_context_hints(
select_events_context_only()
.select_from(devices_cte)
.outerjoin(Events, devices_cte.c.context_id == Events.context_id)
.outerjoin(Events, devices_cte.c.context_id_bin == Events.context_id_bin)
).outerjoin(EventData, (Events.data_id == EventData.data_id)),
apply_states_context_hints(
select_states_context_only()
.select_from(devices_cte)
.outerjoin(States, devices_cte.c.context_id == States.context_id)
.outerjoin(States, devices_cte.c.context_id_bin == States.context_id_bin)
),
)

View file

@ -42,13 +42,13 @@ def _select_entities_context_ids_sub_query(
select_events_context_id_subquery(start_day, end_day, event_types).where(
apply_event_entity_id_matchers(json_quoted_entity_ids)
),
apply_entities_hints(select(States.context_id))
apply_entities_hints(select(States.context_id_bin))
.filter(
(States.last_updated_ts > start_day) & (States.last_updated_ts < end_day)
)
.where(States.entity_id.in_(entity_ids)),
)
return select(union.c.context_id).group_by(union.c.context_id)
return select(union.c.context_id_bin).group_by(union.c.context_id_bin)
def _apply_entities_context_union(
@ -77,12 +77,12 @@ def _apply_entities_context_union(
apply_events_context_hints(
select_events_context_only()
.select_from(entities_cte)
.outerjoin(Events, entities_cte.c.context_id == Events.context_id)
.outerjoin(Events, entities_cte.c.context_id_bin == Events.context_id_bin)
).outerjoin(EventData, (Events.data_id == EventData.data_id)),
apply_states_context_hints(
select_states_context_only()
.select_from(entities_cte)
.outerjoin(States, entities_cte.c.context_id == States.context_id)
.outerjoin(States, entities_cte.c.context_id_bin == States.context_id_bin)
),
)
@ -138,4 +138,8 @@ def apply_entities_hints(sel: Select) -> Select:
"""Force mysql to use the right index on large selects."""
return sel.with_hint(
States, f"FORCE INDEX ({ENTITY_ID_LAST_UPDATED_INDEX_TS})", dialect_name="mysql"
).with_hint(
States,
f"FORCE INDEX ({ENTITY_ID_LAST_UPDATED_INDEX_TS})",
dialect_name="mariadb",
)

View file

@ -41,13 +41,13 @@ def _select_entities_device_id_context_ids_sub_query(
json_quoted_entity_ids, json_quoted_device_ids
)
),
apply_entities_hints(select(States.context_id))
apply_entities_hints(select(States.context_id_bin))
.filter(
(States.last_updated_ts > start_day) & (States.last_updated_ts < end_day)
)
.where(States.entity_id.in_(entity_ids)),
)
return select(union.c.context_id).group_by(union.c.context_id)
return select(union.c.context_id_bin).group_by(union.c.context_id_bin)
def _apply_entities_devices_context_union(
@ -77,12 +77,16 @@ def _apply_entities_devices_context_union(
apply_events_context_hints(
select_events_context_only()
.select_from(devices_entities_cte)
.outerjoin(Events, devices_entities_cte.c.context_id == Events.context_id)
.outerjoin(
Events, devices_entities_cte.c.context_id_bin == Events.context_id_bin
)
).outerjoin(EventData, (Events.data_id == EventData.data_id)),
apply_states_context_hints(
select_states_context_only()
.select_from(devices_entities_cte)
.outerjoin(States, devices_entities_cte.c.context_id == States.context_id)
.outerjoin(
States, devices_entities_cte.c.context_id_bin == States.context_id_bin
)
),
)

View file

@ -89,6 +89,7 @@ from .tasks import (
ChangeStatisticsUnitTask,
ClearStatisticsTask,
CommitTask,
ContextIDMigrationTask,
DatabaseLockTask,
EventTask,
ImportStatisticsTask,
@ -687,6 +688,7 @@ class Recorder(threading.Thread):
_LOGGER.debug("Recorder processing the queue")
self._adjust_lru_size()
self.hass.add_job(self._async_set_recorder_ready_migration_done)
self.queue_task(ContextIDMigrationTask())
self._run_event_loop()
self._shutdown()
@ -1146,6 +1148,10 @@ class Recorder(threading.Thread):
"""Run post schema migration tasks."""
migration.post_schema_migration(self, old_version, new_version)
def _migrate_context_ids(self) -> bool:
"""Migrate context ids if needed."""
return migration.migrate_context_ids(self)
def _send_keep_alive(self) -> None:
"""Send a keep alive to keep the db connection open."""
assert self.event_session is not None

View file

@ -21,6 +21,7 @@ from sqlalchemy import (
Identity,
Index,
Integer,
LargeBinary,
SmallInteger,
String,
Text,
@ -55,8 +56,12 @@ from .models import (
StatisticData,
StatisticDataTimestamp,
StatisticMetaData,
bytes_to_ulid_or_none,
bytes_to_uuid_hex_or_none,
datetime_to_timestamp_or_none,
process_timestamp,
ulid_to_bytes_or_none,
uuid_hex_to_bytes_or_none,
)
@ -66,7 +71,7 @@ class Base(DeclarativeBase):
"""Base class for tables."""
SCHEMA_VERSION = 35
SCHEMA_VERSION = 36
_LOGGER = logging.getLogger(__name__)
@ -108,8 +113,9 @@ TABLES_TO_CHECK = [
LAST_UPDATED_INDEX_TS = "ix_states_last_updated_ts"
ENTITY_ID_LAST_UPDATED_INDEX_TS = "ix_states_entity_id_last_updated_ts"
EVENTS_CONTEXT_ID_INDEX = "ix_events_context_id"
STATES_CONTEXT_ID_INDEX = "ix_states_context_id"
EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin"
STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin"
CONTEXT_ID_BIN_MAX_LENGTH = 16
_DEFAULT_TABLE_ARGS = {
"mysql_default_charset": "utf8mb4",
@ -174,6 +180,12 @@ class Events(Base):
# Used for fetching events at a specific time
# see logbook
Index("ix_events_event_type_time_fired_ts", "event_type", "time_fired_ts"),
Index(
EVENTS_CONTEXT_ID_BIN_INDEX,
"context_id_bin",
mysql_length=CONTEXT_ID_BIN_MAX_LENGTH,
mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH,
),
_DEFAULT_TABLE_ARGS,
)
__tablename__ = TABLE_EVENTS
@ -190,18 +202,27 @@ class Events(Base):
DATETIME_TYPE
) # no longer used for new rows
time_fired_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE, index=True)
context_id: Mapped[str | None] = mapped_column(
context_id: Mapped[str | None] = mapped_column( # no longer used
String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True
)
context_user_id: Mapped[str | None] = mapped_column(
context_user_id: Mapped[str | None] = mapped_column( # no longer used
String(MAX_LENGTH_EVENT_CONTEXT_ID)
)
context_parent_id: Mapped[str | None] = mapped_column(
context_parent_id: Mapped[str | None] = mapped_column( # no longer used
String(MAX_LENGTH_EVENT_CONTEXT_ID)
)
data_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("event_data.data_id"), index=True
)
context_id_bin: Mapped[bytes | None] = mapped_column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH),
)
context_user_id_bin: Mapped[bytes | None] = mapped_column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH),
)
context_parent_id_bin: Mapped[bytes | None] = mapped_column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
)
event_data_rel: Mapped[EventData | None] = relationship("EventData")
def __repr__(self) -> str:
@ -234,17 +255,20 @@ class Events(Base):
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
time_fired=None,
time_fired_ts=dt_util.utc_to_timestamp(event.time_fired),
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
context_id=None,
context_id_bin=ulid_to_bytes_or_none(event.context.id),
context_user_id=None,
context_user_id_bin=uuid_hex_to_bytes_or_none(event.context.user_id),
context_parent_id=None,
context_parent_id_bin=ulid_to_bytes_or_none(event.context.parent_id),
)
def to_native(self, validate_entity_id: bool = True) -> Event | None:
"""Convert to a native HA Event."""
context = Context(
id=self.context_id,
user_id=self.context_user_id,
parent_id=self.context_parent_id,
id=bytes_to_ulid_or_none(self.context_id_bin),
user_id=bytes_to_uuid_hex_or_none(self.context_user_id),
parent_id=bytes_to_ulid_or_none(self.context_parent_id_bin),
)
try:
return Event(
@ -316,6 +340,12 @@ class States(Base):
# Used for fetching the state of entities at a specific time
# (get_states in history.py)
Index(ENTITY_ID_LAST_UPDATED_INDEX_TS, "entity_id", "last_updated_ts"),
Index(
STATES_CONTEXT_ID_BIN_INDEX,
"context_id_bin",
mysql_length=CONTEXT_ID_BIN_MAX_LENGTH,
mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH,
),
_DEFAULT_TABLE_ARGS,
)
__tablename__ = TABLE_STATES
@ -344,13 +374,13 @@ class States(Base):
attributes_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("state_attributes.attributes_id"), index=True
)
context_id: Mapped[str | None] = mapped_column(
context_id: Mapped[str | None] = mapped_column( # no longer used
String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True
)
context_user_id: Mapped[str | None] = mapped_column(
context_user_id: Mapped[str | None] = mapped_column( # no longer used
String(MAX_LENGTH_EVENT_CONTEXT_ID)
)
context_parent_id: Mapped[str | None] = mapped_column(
context_parent_id: Mapped[str | None] = mapped_column( # no longer used
String(MAX_LENGTH_EVENT_CONTEXT_ID)
)
origin_idx: Mapped[int | None] = mapped_column(
@ -358,6 +388,15 @@ class States(Base):
) # 0 is local, 1 is remote
old_state: Mapped[States | None] = relationship("States", remote_side=[state_id])
state_attributes: Mapped[StateAttributes | None] = relationship("StateAttributes")
context_id_bin: Mapped[bytes | None] = mapped_column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH),
)
context_user_id_bin: Mapped[bytes | None] = mapped_column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH),
)
context_parent_id_bin: Mapped[bytes | None] = mapped_column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
@ -388,9 +427,12 @@ class States(Base):
dbstate = States(
entity_id=entity_id,
attributes=None,
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
context_id=None,
context_id_bin=ulid_to_bytes_or_none(event.context.id),
context_user_id=None,
context_user_id_bin=uuid_hex_to_bytes_or_none(event.context.user_id),
context_parent_id=None,
context_parent_id_bin=ulid_to_bytes_or_none(event.context.parent_id),
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
last_updated=None,
last_changed=None,
@ -414,9 +456,9 @@ class States(Base):
def to_native(self, validate_entity_id: bool = True) -> State | None:
"""Convert to an HA state object."""
context = Context(
id=self.context_id,
user_id=self.context_user_id,
parent_id=self.context_parent_id,
id=bytes_to_ulid_or_none(self.context_id_bin),
user_id=bytes_to_uuid_hex_or_none(self.context_user_id),
parent_id=bytes_to_ulid_or_none(self.context_parent_id_bin),
)
try:
attrs = json_loads_object(self.attributes) if self.attributes else {}

View file

@ -7,9 +7,10 @@ from dataclasses import dataclass, replace as dataclass_replace
from datetime import timedelta
import logging
from typing import TYPE_CHECKING, cast
from uuid import UUID
import sqlalchemy
from sqlalchemy import ForeignKeyConstraint, MetaData, Table, func, text
from sqlalchemy import ForeignKeyConstraint, MetaData, Table, func, text, update
from sqlalchemy.engine import CursorResult, Engine
from sqlalchemy.exc import (
DatabaseError,
@ -24,20 +25,28 @@ from sqlalchemy.schema import AddConstraint, DropConstraint
from sqlalchemy.sql.expression import true
from homeassistant.core import HomeAssistant
from homeassistant.util.ulid import ulid_to_bytes
from .const import SupportedDialect
from .db_schema import (
CONTEXT_ID_BIN_MAX_LENGTH,
SCHEMA_VERSION,
STATISTICS_TABLES,
TABLE_STATES,
Base,
Events,
SchemaChanges,
States,
Statistics,
StatisticsMeta,
StatisticsRuns,
StatisticsShortTerm,
)
from .models import process_timestamp
from .queries import (
find_events_context_ids_to_migrate,
find_states_context_ids_to_migrate,
)
from .statistics import (
correct_db_schema as statistics_correct_db_schema,
delete_statistics_duplicates,
@ -56,7 +65,7 @@ if TYPE_CHECKING:
from . import Recorder
LIVE_MIGRATION_MIN_SCHEMA_VERSION = 0
_EMPTY_CONTEXT_ID = b"\x00" * 16
_LOGGER = logging.getLogger(__name__)
@ -219,7 +228,10 @@ def _create_index(
def _drop_index(
session_maker: Callable[[], Session], table_name: str, index_name: str
session_maker: Callable[[], Session],
table_name: str,
index_name: str,
quiet: bool | None = None,
) -> None:
"""Drop an index from a specified table.
@ -282,33 +294,37 @@ def _drop_index(
_LOGGER.debug(
"Finished dropping index %s from table %s", index_name, table_name
)
else:
if index_name in (
"ix_states_entity_id",
"ix_states_context_parent_id",
"ix_statistics_short_term_statistic_id_start",
"ix_statistics_statistic_id_start",
):
# ix_states_context_parent_id was only there on nightly so we do not want
# to generate log noise or issues about it.
#
# ix_states_entity_id was only there for users who upgraded from schema
# version 8 or earlier. Newer installs will not have it so we do not
# want to generate log noise or issues about it.
#
# ix_statistics_short_term_statistic_id_start and ix_statistics_statistic_id_start
# were only there for users who upgraded from schema version 23 or earlier.
return
return
_LOGGER.warning(
(
"Failed to drop index %s from table %s. Schema "
"Migration will continue; this is not a "
"critical operation"
),
index_name,
table_name,
)
if quiet:
return
if index_name in (
"ix_states_entity_id",
"ix_states_context_parent_id",
"ix_statistics_short_term_statistic_id_start",
"ix_statistics_statistic_id_start",
):
# ix_states_context_parent_id was only there on nightly so we do not want
# to generate log noise or issues about it.
#
# ix_states_entity_id was only there for users who upgraded from schema
# version 8 or earlier. Newer installs will not have it so we do not
# want to generate log noise or issues about it.
#
# ix_statistics_short_term_statistic_id_start and ix_statistics_statistic_id_start
# were only there for users who upgraded from schema version 23 or earlier.
return
_LOGGER.warning(
(
"Failed to drop index %s from table %s. Schema "
"Migration will continue; this is not a "
"critical operation"
),
index_name,
table_name,
)
def _add_columns(
@ -522,10 +538,15 @@ def _apply_update( # noqa: C901
"""Perform operations to bring schema up to date."""
dialect = engine.dialect.name
big_int = "INTEGER(20)" if dialect == SupportedDialect.MYSQL else "INTEGER"
if dialect in (SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL):
if dialect == SupportedDialect.MYSQL:
timestamp_type = "DOUBLE PRECISION"
context_bin_type = f"BLOB({CONTEXT_ID_BIN_MAX_LENGTH})"
if dialect == SupportedDialect.POSTGRESQL:
timestamp_type = "DOUBLE PRECISION"
context_bin_type = "BYTEA"
else:
timestamp_type = "FLOAT"
context_bin_type = "BLOB"
if new_version == 1:
# This used to create ix_events_time_fired, but it was removed in version 32
@ -944,6 +965,19 @@ def _apply_update( # noqa: C901
)
# ix_statistics_start and ix_statistics_statistic_id_start are still used
# for the post migration cleanup and can be removed in a future version.
elif new_version == 36:
for table in ("states", "events"):
_add_columns(
session_maker,
table,
[
f"context_id_bin {context_bin_type}",
f"context_user_id_bin {context_bin_type}",
f"context_parent_id_bin {context_bin_type}",
],
)
_create_index(session_maker, "events", "ix_events_context_id_bin")
_create_index(session_maker, "states", "ix_states_context_id_bin")
else:
raise ValueError(f"No schema migration defined for version {new_version}")
@ -1193,6 +1227,67 @@ def _migrate_statistics_columns_to_timestamp(
)
def _context_id_to_bytes(context_id: str | None) -> bytes | None:
"""Convert a context_id to bytes."""
if context_id is None:
return None
if len(context_id) == 32:
return UUID(context_id).bytes
if len(context_id) == 26:
return ulid_to_bytes(context_id)
return None
def migrate_context_ids(instance: Recorder) -> bool:
"""Migrate context_ids to use binary format."""
_to_bytes = _context_id_to_bytes
session_maker = instance.get_session
_LOGGER.debug("Migrating context_ids to binary format")
with session_scope(session=session_maker()) as session:
if events := session.execute(find_events_context_ids_to_migrate()).all():
session.execute(
update(Events),
[
{
"event_id": event_id,
"context_id": None,
"context_id_bin": _to_bytes(context_id) or _EMPTY_CONTEXT_ID,
"context_user_id": None,
"context_user_id_bin": _to_bytes(context_user_id),
"context_parent_id": None,
"context_parent_id_bin": _to_bytes(context_parent_id),
}
for event_id, context_id, context_user_id, context_parent_id in events
],
)
if states := session.execute(find_states_context_ids_to_migrate()).all():
session.execute(
update(States),
[
{
"state_id": state_id,
"context_id": None,
"context_id_bin": _to_bytes(context_id) or _EMPTY_CONTEXT_ID,
"context_user_id": None,
"context_user_id_bin": _to_bytes(context_user_id),
"context_parent_id": None,
"context_parent_id_bin": _to_bytes(context_parent_id),
}
for state_id, context_id, context_user_id, context_parent_id in states
],
)
# If there is more work to do return False
# so that we can be called again
is_done = not (events or states)
if is_done:
_drop_index(session_maker, "events", "ix_events_context_id", quiet=True)
_drop_index(session_maker, "states", "ix_states_context_id", quiet=True)
_LOGGER.debug("Migrating context_ids to binary format: done=%s", is_done)
return is_done
def _initialize_database(session: Session) -> bool:
"""Initialize a new database.

View file

@ -1,10 +1,13 @@
"""Models for Recorder."""
from __future__ import annotations
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import lru_cache
import logging
from typing import Any, Literal, TypedDict, overload
from uuid import UUID
from awesomeversion import AwesomeVersion
from sqlalchemy.engine.row import Row
@ -18,6 +21,7 @@ from homeassistant.const import (
from homeassistant.core import Context, State
import homeassistant.util.dt as dt_util
from homeassistant.util.json import json_loads_object
from homeassistant.util.ulid import bytes_to_ulid, ulid_to_bytes
from .const import SupportedDialect
@ -155,6 +159,40 @@ def timestamp_to_datetime_or_none(ts: float | None) -> datetime | None:
return dt_util.utc_from_timestamp(ts)
def ulid_to_bytes_or_none(ulid: str | None) -> bytes | None:
"""Convert an ulid to bytes."""
if ulid is None:
return None
return ulid_to_bytes(ulid)
def bytes_to_ulid_or_none(_bytes: bytes | None) -> str | None:
"""Convert bytes to a ulid."""
if _bytes is None:
return None
return bytes_to_ulid(_bytes)
@lru_cache(maxsize=16)
def uuid_hex_to_bytes_or_none(uuid_hex: str | None) -> bytes | None:
"""Convert a uuid hex to bytes."""
if uuid_hex is None:
return None
with suppress(ValueError):
return UUID(hex=uuid_hex).bytes
return None
@lru_cache(maxsize=16)
def bytes_to_uuid_hex_or_none(_bytes: bytes | None) -> str | None:
"""Convert bytes to a uuid hex."""
if _bytes is None:
return None
with suppress(ValueError):
return UUID(bytes=_bytes).hex
return None
class LazyStatePreSchema31(State):
"""A lazy version of core State before schema 31."""

View file

@ -667,3 +667,31 @@ def find_legacy_row() -> StatementLambdaElement:
# https://github.com/sqlalchemy/sqlalchemy/issues/9189
# pylint: disable-next=not-callable
return lambda_stmt(lambda: select(func.max(States.event_id)))
def find_events_context_ids_to_migrate() -> StatementLambdaElement:
"""Find events context_ids to migrate."""
return lambda_stmt(
lambda: select(
Events.event_id,
Events.context_id,
Events.context_user_id,
Events.context_parent_id,
)
.filter(Events.context_id_bin.is_(None))
.limit(SQLITE_MAX_BIND_VARS)
)
def find_states_context_ids_to_migrate() -> StatementLambdaElement:
"""Find events context_ids to migrate."""
return lambda_stmt(
lambda: select(
States.state_id,
States.context_id,
States.context_user_id,
States.context_parent_id,
)
.filter(States.context_id_bin.is_(None))
.limit(SQLITE_MAX_BIND_VARS)
)

View file

@ -6,6 +6,7 @@ import asyncio
from collections.abc import Callable, Iterable
from dataclasses import dataclass
from datetime import datetime
import logging
import threading
from typing import TYPE_CHECKING, Any
@ -18,6 +19,9 @@ from .db_schema import Statistics, StatisticsShortTerm
from .models import StatisticData, StatisticMetaData
from .util import periodic_db_cleanups
_LOGGER = logging.getLogger(__name__)
if TYPE_CHECKING:
from .core import Recorder
@ -339,3 +343,16 @@ class AdjustLRUSizeTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Handle the task to adjust the size."""
instance._adjust_lru_size() # pylint: disable=[protected-access]
@dataclass
class ContextIDMigrationTask(RecorderTask):
"""An object to insert into the recorder queue to migrate context ids."""
commit_before = False
def run(self, instance: Recorder) -> None:
"""Run context id migration task."""
if not instance._migrate_context_ids(): # pylint: disable=[protected-access]
# Schedule a new migration task if this one didn't finish
instance.queue_task(ContextIDMigrationTask())

View file

@ -3,9 +3,9 @@ from __future__ import annotations
import time
from ulid_transform import ulid_at_time, ulid_hex
from ulid_transform import bytes_to_ulid, ulid_at_time, ulid_hex, ulid_to_bytes
__all__ = ["ulid", "ulid_hex", "ulid_at_time"]
__all__ = ["ulid", "ulid_hex", "ulid_at_time", "ulid_to_bytes", "bytes_to_ulid"]
def ulid(timestamp: float | None = None) -> str:

View file

@ -6,7 +6,11 @@ from typing import Any
from homeassistant.components import logbook
from homeassistant.components.logbook import processor
from homeassistant.components.recorder.models import process_timestamp_to_utc_isoformat
from homeassistant.components.recorder.models import (
process_timestamp_to_utc_isoformat,
ulid_to_bytes_or_none,
uuid_hex_to_bytes_or_none,
)
from homeassistant.core import Context
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.json import JSONEncoder
@ -28,9 +32,13 @@ class MockRow:
self.data = data
self.time_fired = dt_util.utcnow()
self.time_fired_ts = dt_util.utc_to_timestamp(self.time_fired)
self.context_parent_id = context.parent_id if context else None
self.context_user_id = context.user_id if context else None
self.context_id = context.id if context else None
self.context_parent_id_bin = (
ulid_to_bytes_or_none(context.parent_id) if context else None
)
self.context_user_id_bin = (
uuid_hex_to_bytes_or_none(context.user_id) if context else None
)
self.context_id_bin = ulid_to_bytes_or_none(context.id) if context else None
self.state = None
self.entity_id = None
self.state_id = None

View file

@ -323,9 +323,9 @@ def create_state_changed_event_from_old_new(
"event_data",
"time_fired",
"time_fired_ts",
"context_id",
"context_user_id",
"context_parent_id",
"context_id_bin",
"context_user_id_bin",
"context_parent_id_bin",
"state",
"entity_id",
"domain",
@ -349,12 +349,12 @@ def create_state_changed_event_from_old_new(
row.entity_id = entity_id
row.domain = entity_id and ha.split_entity_id(entity_id)[0]
row.context_only = False
row.context_id = None
row.context_id_bin = None
row.friendly_name = None
row.icon = None
row.old_format_icon = None
row.context_user_id = None
row.context_parent_id = None
row.context_user_id_bin = None
row.context_parent_id_bin = None
row.old_state_id = old_state and 1
row.state_id = new_state and 1
return LazyEventPartialState(row, {})
@ -966,7 +966,7 @@ async def test_logbook_entity_context_id(
await async_recorder_block_till_done(hass)
context = ha.Context(
id="ac5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVAAA",
user_id="b400facee45711eaa9308bfd3d19e474",
)
@ -1027,7 +1027,7 @@ async def test_logbook_entity_context_id(
# A service call
light_turn_off_service_context = ha.Context(
id="9c5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVBFC",
user_id="9400facee45711eaa9308bfd3d19e474",
)
hass.states.async_set("light.switch", STATE_ON)
@ -1120,7 +1120,7 @@ async def test_logbook_context_id_automation_script_started_manually(
# An Automation
automation_entity_id_test = "automation.alarm"
automation_context = ha.Context(
id="fc5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVCCC",
user_id="f400facee45711eaa9308bfd3d19e474",
)
hass.bus.async_fire(
@ -1129,7 +1129,7 @@ async def test_logbook_context_id_automation_script_started_manually(
context=automation_context,
)
script_context = ha.Context(
id="ac5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVAAA",
user_id="b400facee45711eaa9308bfd3d19e474",
)
hass.bus.async_fire(
@ -1141,7 +1141,7 @@ async def test_logbook_context_id_automation_script_started_manually(
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
script_2_context = ha.Context(
id="1234",
id="01GTDGKBCH00GW0X476W5TVEEE",
user_id="b400facee45711eaa9308bfd3d19e474",
)
hass.bus.async_fire(
@ -1172,12 +1172,12 @@ async def test_logbook_context_id_automation_script_started_manually(
assert json_dict[0]["entity_id"] == "automation.alarm"
assert "context_entity_id" not in json_dict[0]
assert json_dict[0]["context_user_id"] == "f400facee45711eaa9308bfd3d19e474"
assert json_dict[0]["context_id"] == "fc5bd62de45711eaaeb351041eec8dd9"
assert json_dict[0]["context_id"] == "01GTDGKBCH00GW0X476W5TVCCC"
assert json_dict[1]["entity_id"] == "script.mock_script"
assert "context_entity_id" not in json_dict[1]
assert json_dict[1]["context_user_id"] == "b400facee45711eaa9308bfd3d19e474"
assert json_dict[1]["context_id"] == "ac5bd62de45711eaaeb351041eec8dd9"
assert json_dict[1]["context_id"] == "01GTDGKBCH00GW0X476W5TVAAA"
assert json_dict[2]["domain"] == "homeassistant"
@ -1185,7 +1185,7 @@ async def test_logbook_context_id_automation_script_started_manually(
assert json_dict[3]["name"] == "Mock script"
assert "context_entity_id" not in json_dict[1]
assert json_dict[3]["context_user_id"] == "b400facee45711eaa9308bfd3d19e474"
assert json_dict[3]["context_id"] == "1234"
assert json_dict[3]["context_id"] == "01GTDGKBCH00GW0X476W5TVEEE"
assert json_dict[4]["entity_id"] == "switch.new"
assert json_dict[4]["state"] == "off"
@ -1209,7 +1209,7 @@ async def test_logbook_entity_context_parent_id(
await async_recorder_block_till_done(hass)
context = ha.Context(
id="ac5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVAAA",
user_id="b400facee45711eaa9308bfd3d19e474",
)
@ -1222,8 +1222,8 @@ async def test_logbook_entity_context_parent_id(
)
child_context = ha.Context(
id="2798bfedf8234b5e9f4009c91f48f30c",
parent_id="ac5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVDDD",
parent_id="01GTDGKBCH00GW0X476W5TVAAA",
user_id="b400facee45711eaa9308bfd3d19e474",
)
hass.bus.async_fire(
@ -1274,8 +1274,8 @@ async def test_logbook_entity_context_parent_id(
# A state change via service call with the script as the parent
light_turn_off_service_context = ha.Context(
id="9c5bd62de45711eaaeb351041eec8dd9",
parent_id="2798bfedf8234b5e9f4009c91f48f30c",
id="01GTDGKBCH00GW0X476W5TVBFC",
parent_id="01GTDGKBCH00GW0X476W5TVDDD",
user_id="9400facee45711eaa9308bfd3d19e474",
)
hass.states.async_set("light.switch", STATE_ON)
@ -1299,8 +1299,8 @@ async def test_logbook_entity_context_parent_id(
# An event with a parent event, but the parent event isn't available
missing_parent_context = ha.Context(
id="fc40b9a0d1f246f98c34b33c76228ee6",
parent_id="c8ce515fe58e442f8664246c65ed964f",
id="01GTDGKBCH00GW0X476W5TEDDD",
parent_id="01GTDGKBCH00GW0X276W5TEDDD",
user_id="485cacf93ef84d25a99ced3126b921d2",
)
logbook.async_log_entry(
@ -1423,7 +1423,7 @@ async def test_logbook_context_from_template(
await hass.async_block_till_done()
switch_turn_off_context = ha.Context(
id="9c5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVBFC",
user_id="9400facee45711eaa9308bfd3d19e474",
)
hass.states.async_set(
@ -1506,7 +1506,7 @@ async def test_logbook_(
await hass.async_block_till_done()
switch_turn_off_context = ha.Context(
id="9c5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVBFC",
user_id="9400facee45711eaa9308bfd3d19e474",
)
hass.states.async_set(
@ -1692,7 +1692,7 @@ async def test_logbook_multiple_entities(
await hass.async_block_till_done()
switch_turn_off_context = ha.Context(
id="9c5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVBFC",
user_id="9400facee45711eaa9308bfd3d19e474",
)
hass.states.async_set(
@ -2394,7 +2394,7 @@ async def test_get_events(
hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 400})
await hass.async_block_till_done()
context = ha.Context(
id="ac5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVAAA",
user_id="b400facee45711eaa9308bfd3d19e474",
)
@ -2474,7 +2474,7 @@ async def test_get_events(
"id": 5,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"context_id": "ac5bd62de45711eaaeb351041eec8dd9",
"context_id": "01GTDGKBCH00GW0X476W5TVAAA",
}
)
response = await client.receive_json()
@ -2651,7 +2651,7 @@ async def test_get_events_with_device_ids(
hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 400})
await hass.async_block_till_done()
context = ha.Context(
id="ac5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVAAA",
user_id="b400facee45711eaa9308bfd3d19e474",
)
@ -2740,7 +2740,7 @@ async def test_logbook_select_entities_context_id(
await async_recorder_block_till_done(hass)
context = ha.Context(
id="ac5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVAAA",
user_id="b400facee45711eaa9308bfd3d19e474",
)
@ -2799,7 +2799,7 @@ async def test_logbook_select_entities_context_id(
# A service call
light_turn_off_service_context = ha.Context(
id="9c5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVBFC",
user_id="9400facee45711eaa9308bfd3d19e474",
)
hass.states.async_set("light.switch", STATE_ON)
@ -2880,7 +2880,7 @@ async def test_get_events_with_context_state(
hass.states.async_set("light.kitchen2", STATE_OFF)
context = ha.Context(
id="ac5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X476W5TVAAA",
user_id="b400facee45711eaa9308bfd3d19e474",
)
hass.states.async_set("binary_sensor.is_light", STATE_OFF, context=context)

View file

@ -159,7 +159,7 @@ async def test_get_events(
hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 400})
await hass.async_block_till_done()
context = core.Context(
id="ac5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X276W5TEDDD",
user_id="b400facee45711eaa9308bfd3d19e474",
)
@ -239,7 +239,7 @@ async def test_get_events(
"id": 5,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"context_id": "ac5bd62de45711eaaeb351041eec8dd9",
"context_id": "01GTDGKBCH00GW0X276W5TEDDD",
}
)
response = await client.receive_json()
@ -448,7 +448,7 @@ async def test_get_events_with_device_ids(
hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 400})
await hass.async_block_till_done()
context = core.Context(
id="ac5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X276W5TEDDD",
user_id="b400facee45711eaa9308bfd3d19e474",
)
@ -1262,7 +1262,7 @@ async def test_subscribe_unsubscribe_logbook_stream(
]
context = core.Context(
id="ac5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X276W5TEDDD",
user_id="b400facee45711eaa9308bfd3d19e474",
)
automation_entity_id_test = "automation.alarm"
@ -1300,7 +1300,7 @@ async def test_subscribe_unsubscribe_logbook_stream(
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"context_id": "ac5bd62de45711eaaeb351041eec8dd9",
"context_id": "01GTDGKBCH00GW0X276W5TEDDD",
"context_user_id": "b400facee45711eaa9308bfd3d19e474",
"domain": "automation",
"entity_id": "automation.alarm",
@ -1313,7 +1313,7 @@ async def test_subscribe_unsubscribe_logbook_stream(
"context_domain": "automation",
"context_entity_id": "automation.alarm",
"context_event_type": "automation_triggered",
"context_id": "ac5bd62de45711eaaeb351041eec8dd9",
"context_id": "01GTDGKBCH00GW0X276W5TEDDD",
"context_message": "triggered by state of binary_sensor.dog_food_ready",
"context_name": "Mock automation",
"context_source": "state of binary_sensor.dog_food_ready",
@ -1365,7 +1365,7 @@ async def test_subscribe_unsubscribe_logbook_stream(
"context_domain": "automation",
"context_entity_id": "automation.alarm",
"context_event_type": "automation_triggered",
"context_id": "ac5bd62de45711eaaeb351041eec8dd9",
"context_id": "01GTDGKBCH00GW0X276W5TEDDD",
"context_message": "triggered by state of binary_sensor.dog_food_ready",
"context_name": "Mock automation",
"context_source": "state of binary_sensor.dog_food_ready",
@ -1395,7 +1395,7 @@ async def test_subscribe_unsubscribe_logbook_stream(
"context_domain": "automation",
"context_entity_id": "automation.alarm",
"context_event_type": "automation_triggered",
"context_id": "ac5bd62de45711eaaeb351041eec8dd9",
"context_id": "01GTDGKBCH00GW0X276W5TEDDD",
"context_message": "triggered by state of binary_sensor.dog_food_ready",
"context_name": "Mock automation",
"context_source": "state of binary_sensor.dog_food_ready",
@ -1990,7 +1990,7 @@ async def test_logbook_stream_match_multiple_entities(
hass.states.async_set("binary_sensor.should_not_appear", STATE_ON)
hass.states.async_set("binary_sensor.should_not_appear", STATE_OFF)
context = core.Context(
id="ac5bd62de45711eaaeb351041eec8dd9",
id="01GTDGKBCH00GW0X276W5TEDDD",
user_id="b400facee45711eaa9308bfd3d19e474",
)
hass.bus.async_fire(

View file

@ -27,6 +27,7 @@ from sqlalchemy import (
Identity,
Index,
Integer,
LargeBinary,
SmallInteger,
String,
Text,
@ -92,6 +93,10 @@ DOUBLE_TYPE = (
TIMESTAMP_TYPE = DOUBLE_TYPE
CONTEXT_ID_BIN_MAX_LENGTH = 16
EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin"
STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin"
class Events(Base): # type: ignore
"""Event history data."""
@ -100,6 +105,12 @@ class Events(Base): # type: ignore
# Used for fetching events at a specific time
# see logbook
Index("ix_events_event_type_time_fired", "event_type", "time_fired"),
Index(
EVENTS_CONTEXT_ID_BIN_INDEX,
"context_id_bin",
mysql_length=CONTEXT_ID_BIN_MAX_LENGTH,
mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH,
),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_EVENTS
@ -121,6 +132,15 @@ class Events(Base): # type: ignore
data_id = Column(
Integer, ForeignKey("event_data.data_id"), index=True
) # *** Not originally in v23, only added for recorder to startup ok
context_id_bin = Column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
) # *** Not originally in v23, only added for recorder to startup ok
context_user_id_bin = Column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
) # *** Not originally in v23, only added for recorder to startup ok
context_parent_id_bin = Column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
) # *** Not originally in v23, only added for recorder to startup ok
event_data_rel = relationship(
"EventData"
) # *** Not originally in v23, only added for recorder to startup ok
@ -191,6 +211,12 @@ class States(Base): # type: ignore
# Used for fetching the state of entities at a specific time
# (get_states in history.py)
Index("ix_states_entity_id_last_updated", "entity_id", "last_updated"),
Index(
STATES_CONTEXT_ID_BIN_INDEX,
"context_id_bin",
mysql_length=CONTEXT_ID_BIN_MAX_LENGTH,
mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH,
),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATES
@ -212,6 +238,15 @@ class States(Base): # type: ignore
) # *** Not originally in v23, only added for recorder to startup ok
created = Column(DATETIME_TYPE, default=dt_util.utcnow)
old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True)
context_id_bin = Column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
) # *** Not originally in v23, only added for recorder to startup ok
context_user_id_bin = Column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
) # *** Not originally in v23, only added for recorder to startup ok
context_parent_id_bin = Column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
) # *** Not originally in v23, only added for recorder to startup ok
event = relationship("Events", uselist=False)
old_state = relationship("States", remote_side=[state_id])

View file

@ -23,6 +23,7 @@ from sqlalchemy import (
Identity,
Index,
Integer,
LargeBinary,
SmallInteger,
String,
Text,
@ -96,6 +97,9 @@ LAST_UPDATED_INDEX = "ix_states_last_updated"
ENTITY_ID_LAST_UPDATED_INDEX = "ix_states_entity_id_last_updated"
EVENTS_CONTEXT_ID_INDEX = "ix_events_context_id"
STATES_CONTEXT_ID_INDEX = "ix_states_context_id"
CONTEXT_ID_BIN_MAX_LENGTH = 16
EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin"
STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin"
class FAST_PYSQLITE_DATETIME(sqlite.DATETIME): # type: ignore[misc]
@ -193,6 +197,12 @@ class Events(Base): # type: ignore[misc,valid-type]
# Used for fetching events at a specific time
# see logbook
Index("ix_events_event_type_time_fired", "event_type", "time_fired"),
Index(
EVENTS_CONTEXT_ID_BIN_INDEX,
"context_id_bin",
mysql_length=CONTEXT_ID_BIN_MAX_LENGTH,
mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH,
),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_EVENTS
@ -206,6 +216,15 @@ class Events(Base): # type: ignore[misc,valid-type]
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
data_id = Column(Integer, ForeignKey("event_data.data_id"), index=True)
context_id_bin = Column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
) # *** Not originally in v30, only added for recorder to startup ok
context_user_id_bin = Column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
) # *** Not originally in v23, only added for recorder to startup ok
context_parent_id_bin = Column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
) # *** Not originally in v30, only added for recorder to startup ok
event_data_rel = relationship("EventData")
def __repr__(self) -> str:
@ -310,6 +329,12 @@ class States(Base): # type: ignore[misc,valid-type]
# Used for fetching the state of entities at a specific time
# (get_states in history.py)
Index(ENTITY_ID_LAST_UPDATED_INDEX, "entity_id", "last_updated"),
Index(
STATES_CONTEXT_ID_BIN_INDEX,
"context_id_bin",
mysql_length=CONTEXT_ID_BIN_MAX_LENGTH,
mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH,
),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATES
@ -332,6 +357,15 @@ class States(Base): # type: ignore[misc,valid-type]
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
origin_idx = Column(SmallInteger) # 0 is local, 1 is remote
context_id_bin = Column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
) # *** Not originally in v30, only added for recorder to startup ok
context_user_id_bin = Column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
) # *** Not originally in v23, only added for recorder to startup ok
context_parent_id_bin = Column(
LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH)
) # *** Not originally in v30, only added for recorder to startup ok
old_state = relationship("States", remote_side=[state_id])
state_attributes = relationship("StateAttributes")

View file

@ -6,9 +6,10 @@ import sqlite3
import sys
import threading
from unittest.mock import Mock, PropertyMock, call, patch
import uuid
import pytest
from sqlalchemy import create_engine, text
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.exc import (
DatabaseError,
InternalError,
@ -23,17 +24,25 @@ from homeassistant.components import persistent_notification as pn, recorder
from homeassistant.components.recorder import db_schema, migration
from homeassistant.components.recorder.db_schema import (
SCHEMA_VERSION,
Events,
RecorderRuns,
States,
)
from homeassistant.components.recorder.tasks import ContextIDMigrationTask
from homeassistant.components.recorder.util import session_scope
from homeassistant.core import HomeAssistant
from homeassistant.helpers import recorder as recorder_helper
import homeassistant.util.dt as dt_util
from homeassistant.util.ulid import bytes_to_ulid
from .common import async_wait_recording_done, create_engine_test
from .common import (
async_recorder_block_till_done,
async_wait_recording_done,
create_engine_test,
)
from tests.common import async_fire_time_changed
from tests.typing import RecorderInstanceGenerator
ORIG_TZ = dt_util.DEFAULT_TIME_ZONE
@ -535,3 +544,147 @@ def test_raise_if_exception_missing_empty_cause_str() -> None:
with pytest.raises(ProgrammingError):
migration.raise_if_exception_missing_str(programming_exc, ["not present"])
@pytest.mark.parametrize("enable_migrate_context_ids", [True])
async def test_migrate_context_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
test_uuid = uuid.uuid4()
uuid_hex = test_uuid.hex
uuid_bin = test_uuid.bytes
def _insert_events():
with session_scope(hass=hass) as session:
session.add_all(
(
Events(
event_type="old_uuid_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.452529,
context_id=uuid_hex,
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
Events(
event_type="empty_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.552529,
context_id=None,
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
Events(
event_type="ulid_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.552529,
context_id="01ARZ3NDEKTSV4RRFFQ69G5FAV",
context_id_bin=None,
context_user_id="9400facee45711eaa9308bfd3d19e474",
context_user_id_bin=None,
context_parent_id="01ARZ3NDEKTSV4RRFFQ69G5FA2",
context_parent_id_bin=None,
),
Events(
event_type="invalid_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.552529,
context_id="invalid",
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
)
)
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(ContextIDMigrationTask())
await async_recorder_block_till_done(hass)
def _object_as_dict(obj):
return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs}
def _fetch_migrated_events():
with session_scope(hass=hass) as session:
events = (
session.query(Events)
.filter(
Events.event_type.in_(
[
"old_uuid_context_id_event",
"empty_context_id_event",
"ulid_context_id_event",
"invalid_context_id_event",
]
)
)
.all()
)
assert len(events) == 4
return {event.event_type: _object_as_dict(event) for event in events}
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
old_uuid_context_id_event = events_by_type["old_uuid_context_id_event"]
assert old_uuid_context_id_event["context_id"] is None
assert old_uuid_context_id_event["context_user_id"] is None
assert old_uuid_context_id_event["context_parent_id"] is None
assert old_uuid_context_id_event["context_id_bin"] == uuid_bin
assert old_uuid_context_id_event["context_user_id_bin"] is None
assert old_uuid_context_id_event["context_parent_id_bin"] is None
empty_context_id_event = events_by_type["empty_context_id_event"]
assert empty_context_id_event["context_id"] is None
assert empty_context_id_event["context_user_id"] is None
assert empty_context_id_event["context_parent_id"] is None
assert empty_context_id_event["context_id_bin"] == b"\x00" * 16
assert empty_context_id_event["context_user_id_bin"] is None
assert empty_context_id_event["context_parent_id_bin"] is None
ulid_context_id_event = events_by_type["ulid_context_id_event"]
assert ulid_context_id_event["context_id"] is None
assert ulid_context_id_event["context_user_id"] is None
assert ulid_context_id_event["context_parent_id"] is None
assert (
bytes_to_ulid(ulid_context_id_event["context_id_bin"])
== "01ARZ3NDEKTSV4RRFFQ69G5FAV"
)
assert (
ulid_context_id_event["context_user_id_bin"]
== b"\x94\x00\xfa\xce\xe4W\x11\xea\xa90\x8b\xfd=\x19\xe4t"
)
assert (
bytes_to_ulid(ulid_context_id_event["context_parent_id_bin"])
== "01ARZ3NDEKTSV4RRFFQ69G5FA2"
)
invalid_context_id_event = events_by_type["invalid_context_id_event"]
assert invalid_context_id_event["context_id"] is None
assert invalid_context_id_event["context_user_id"] is None
assert invalid_context_id_event["context_parent_id"] is None
assert invalid_context_id_event["context_id_bin"] == b"\x00" * 16
assert invalid_context_id_event["context_user_id_bin"] is None
assert invalid_context_id_event["context_parent_id_bin"] is None

View file

@ -1138,6 +1138,16 @@ def enable_nightly_purge() -> bool:
return False
@pytest.fixture
def enable_migrate_context_ids() -> bool:
"""Fixture to control enabling of recorder's context id migration.
To enable context id migration, tests can be marked with:
@pytest.mark.parametrize("enable_migrate_context_ids", [True])
"""
return False
@pytest.fixture
def recorder_config() -> dict[str, Any] | None:
"""Fixture to override recorder config.
@ -1280,6 +1290,7 @@ async def async_setup_recorder_instance(
enable_nightly_purge: bool,
enable_statistics: bool,
enable_statistics_table_validation: bool,
enable_migrate_context_ids: bool,
) -> AsyncGenerator[RecorderInstanceGenerator, None]:
"""Yield callable to setup recorder instance."""
# pylint: disable-next=import-outside-toplevel
@ -1295,6 +1306,9 @@ async def async_setup_recorder_instance(
if enable_statistics_table_validation
else itertools.repeat(set())
)
migrate_context_ids = (
recorder.Recorder._migrate_context_ids if enable_migrate_context_ids else None
)
with patch(
"homeassistant.components.recorder.Recorder.async_nightly_tasks",
side_effect=nightly,
@ -1307,6 +1321,10 @@ async def async_setup_recorder_instance(
"homeassistant.components.recorder.migration.statistics_validate_db_schema",
side_effect=stats_validate,
autospec=True,
), patch(
"homeassistant.components.recorder.Recorder._migrate_context_ids",
side_effect=migrate_context_ids,
autospec=True,
):
async def async_setup_recorder(