Move recorder data migration implementation to migrators (#122045)
This commit is contained in:
parent
7f76de2c7a
commit
6921e053e4
2 changed files with 214 additions and 227 deletions
|
@ -1863,217 +1863,6 @@ def _generate_ulid_bytes_at_time(timestamp: float | None) -> bytes:
|
|||
return ulid_to_bytes(ulid_at_time(timestamp or time()))
|
||||
|
||||
|
||||
@retryable_database_job("migrate states context_ids to binary format")
|
||||
def migrate_states_context_ids(instance: Recorder) -> bool:
|
||||
"""Migrate states context_ids to use binary format."""
|
||||
_to_bytes = _context_id_to_bytes
|
||||
session_maker = instance.get_session
|
||||
_LOGGER.debug("Migrating states context_ids to binary format")
|
||||
with session_scope(session=session_maker()) as session:
|
||||
if states := session.execute(
|
||||
find_states_context_ids_to_migrate(instance.max_bind_vars)
|
||||
).all():
|
||||
session.execute(
|
||||
update(States),
|
||||
[
|
||||
{
|
||||
"state_id": state_id,
|
||||
"context_id": None,
|
||||
"context_id_bin": _to_bytes(context_id)
|
||||
or _generate_ulid_bytes_at_time(last_updated_ts),
|
||||
"context_user_id": None,
|
||||
"context_user_id_bin": _to_bytes(context_user_id),
|
||||
"context_parent_id": None,
|
||||
"context_parent_id_bin": _to_bytes(context_parent_id),
|
||||
}
|
||||
for state_id, last_updated_ts, context_id, context_user_id, context_parent_id in states
|
||||
],
|
||||
)
|
||||
# If there is more work to do return False
|
||||
# so that we can be called again
|
||||
if is_done := not states:
|
||||
_mark_migration_done(session, StatesContextIDMigration)
|
||||
|
||||
if is_done:
|
||||
_drop_index(session_maker, "states", "ix_states_context_id")
|
||||
|
||||
_LOGGER.debug("Migrating states context_ids to binary format: done=%s", is_done)
|
||||
return is_done
|
||||
|
||||
|
||||
@retryable_database_job("migrate events context_ids to binary format")
|
||||
def migrate_events_context_ids(instance: Recorder) -> bool:
|
||||
"""Migrate events context_ids to use binary format."""
|
||||
_to_bytes = _context_id_to_bytes
|
||||
session_maker = instance.get_session
|
||||
_LOGGER.debug("Migrating context_ids to binary format")
|
||||
with session_scope(session=session_maker()) as session:
|
||||
if events := session.execute(
|
||||
find_events_context_ids_to_migrate(instance.max_bind_vars)
|
||||
).all():
|
||||
session.execute(
|
||||
update(Events),
|
||||
[
|
||||
{
|
||||
"event_id": event_id,
|
||||
"context_id": None,
|
||||
"context_id_bin": _to_bytes(context_id)
|
||||
or _generate_ulid_bytes_at_time(time_fired_ts),
|
||||
"context_user_id": None,
|
||||
"context_user_id_bin": _to_bytes(context_user_id),
|
||||
"context_parent_id": None,
|
||||
"context_parent_id_bin": _to_bytes(context_parent_id),
|
||||
}
|
||||
for event_id, time_fired_ts, context_id, context_user_id, context_parent_id in events
|
||||
],
|
||||
)
|
||||
# If there is more work to do return False
|
||||
# so that we can be called again
|
||||
if is_done := not events:
|
||||
_mark_migration_done(session, EventsContextIDMigration)
|
||||
|
||||
if is_done:
|
||||
_drop_index(session_maker, "events", "ix_events_context_id")
|
||||
|
||||
_LOGGER.debug("Migrating events context_ids to binary format: done=%s", is_done)
|
||||
return is_done
|
||||
|
||||
|
||||
@retryable_database_job("migrate events event_types to event_type_ids")
|
||||
def migrate_event_type_ids(instance: Recorder) -> bool:
|
||||
"""Migrate event_type to event_type_ids."""
|
||||
session_maker = instance.get_session
|
||||
_LOGGER.debug("Migrating event_types")
|
||||
event_type_manager = instance.event_type_manager
|
||||
with session_scope(session=session_maker()) as session:
|
||||
if events := session.execute(
|
||||
find_event_type_to_migrate(instance.max_bind_vars)
|
||||
).all():
|
||||
event_types = {event_type for _, event_type in events}
|
||||
if None in event_types:
|
||||
# event_type should never be None but we need to be defensive
|
||||
# so we don't fail the migration because of a bad state
|
||||
event_types.remove(None)
|
||||
event_types.add(_EMPTY_EVENT_TYPE)
|
||||
|
||||
event_type_to_id = event_type_manager.get_many(event_types, session)
|
||||
if missing_event_types := {
|
||||
event_type
|
||||
for event_type, event_id in event_type_to_id.items()
|
||||
if event_id is None
|
||||
}:
|
||||
missing_db_event_types = [
|
||||
EventTypes(event_type=event_type)
|
||||
for event_type in missing_event_types
|
||||
]
|
||||
session.add_all(missing_db_event_types)
|
||||
session.flush() # Assign ids
|
||||
for db_event_type in missing_db_event_types:
|
||||
# We cannot add the assigned ids to the event_type_manager
|
||||
# because the commit could get rolled back
|
||||
assert (
|
||||
db_event_type.event_type is not None
|
||||
), "event_type should never be None"
|
||||
event_type_to_id[db_event_type.event_type] = (
|
||||
db_event_type.event_type_id
|
||||
)
|
||||
event_type_manager.clear_non_existent(db_event_type.event_type)
|
||||
|
||||
session.execute(
|
||||
update(Events),
|
||||
[
|
||||
{
|
||||
"event_id": event_id,
|
||||
"event_type": None,
|
||||
"event_type_id": event_type_to_id[
|
||||
_EMPTY_EVENT_TYPE if event_type is None else event_type
|
||||
],
|
||||
}
|
||||
for event_id, event_type in events
|
||||
],
|
||||
)
|
||||
|
||||
# If there is more work to do return False
|
||||
# so that we can be called again
|
||||
if is_done := not events:
|
||||
_mark_migration_done(session, EventTypeIDMigration)
|
||||
|
||||
_LOGGER.debug("Migrating event_types done=%s", is_done)
|
||||
return is_done
|
||||
|
||||
|
||||
@retryable_database_job("migrate states entity_ids to states_meta")
|
||||
def migrate_entity_ids(instance: Recorder) -> bool:
|
||||
"""Migrate entity_ids to states_meta.
|
||||
|
||||
We do this in two steps because we need the history queries to work
|
||||
while we are migrating.
|
||||
|
||||
1. Link the states to the states_meta table
|
||||
2. Remove the entity_id column from the states table (in post_migrate_entity_ids)
|
||||
"""
|
||||
_LOGGER.debug("Migrating entity_ids")
|
||||
states_meta_manager = instance.states_meta_manager
|
||||
with session_scope(session=instance.get_session()) as session:
|
||||
if states := session.execute(
|
||||
find_entity_ids_to_migrate(instance.max_bind_vars)
|
||||
).all():
|
||||
entity_ids = {entity_id for _, entity_id in states}
|
||||
if None in entity_ids:
|
||||
# entity_id should never be None but we need to be defensive
|
||||
# so we don't fail the migration because of a bad state
|
||||
entity_ids.remove(None)
|
||||
entity_ids.add(_EMPTY_ENTITY_ID)
|
||||
|
||||
entity_id_to_metadata_id = states_meta_manager.get_many(
|
||||
entity_ids, session, True
|
||||
)
|
||||
if missing_entity_ids := {
|
||||
entity_id
|
||||
for entity_id, metadata_id in entity_id_to_metadata_id.items()
|
||||
if metadata_id is None
|
||||
}:
|
||||
missing_states_metadata = [
|
||||
StatesMeta(entity_id=entity_id) for entity_id in missing_entity_ids
|
||||
]
|
||||
session.add_all(missing_states_metadata)
|
||||
session.flush() # Assign ids
|
||||
for db_states_metadata in missing_states_metadata:
|
||||
# We cannot add the assigned ids to the event_type_manager
|
||||
# because the commit could get rolled back
|
||||
assert (
|
||||
db_states_metadata.entity_id is not None
|
||||
), "entity_id should never be None"
|
||||
entity_id_to_metadata_id[db_states_metadata.entity_id] = (
|
||||
db_states_metadata.metadata_id
|
||||
)
|
||||
|
||||
session.execute(
|
||||
update(States),
|
||||
[
|
||||
{
|
||||
"state_id": state_id,
|
||||
# We cannot set "entity_id": None yet since
|
||||
# the history queries still need to work while the
|
||||
# migration is in progress and we will do this in
|
||||
# post_migrate_entity_ids
|
||||
"metadata_id": entity_id_to_metadata_id[
|
||||
_EMPTY_ENTITY_ID if entity_id is None else entity_id
|
||||
],
|
||||
}
|
||||
for state_id, entity_id in states
|
||||
],
|
||||
)
|
||||
|
||||
# If there is more work to do return False
|
||||
# so that we can be called again
|
||||
if is_done := not states:
|
||||
_mark_migration_done(session, EntityIDMigration)
|
||||
|
||||
_LOGGER.debug("Migrating entity_ids done=%s", is_done)
|
||||
return is_done
|
||||
|
||||
|
||||
@retryable_database_job("post migrate states entity_ids to states_meta")
|
||||
def post_migrate_entity_ids(instance: Recorder) -> bool:
|
||||
"""Remove old entity_id strings from states.
|
||||
|
@ -2262,9 +2051,42 @@ class StatesContextIDMigration(BaseRunTimeMigration):
|
|||
migration_id = "state_context_id_as_binary"
|
||||
|
||||
@staticmethod
|
||||
@retryable_database_job("migrate states context_ids to binary format")
|
||||
def migrate_data(instance: Recorder) -> bool:
|
||||
"""Migrate some data, returns True if migration is completed."""
|
||||
return migrate_states_context_ids(instance)
|
||||
"""Migrate states context_ids to use binary format, return True if completed."""
|
||||
_to_bytes = _context_id_to_bytes
|
||||
session_maker = instance.get_session
|
||||
_LOGGER.debug("Migrating states context_ids to binary format")
|
||||
with session_scope(session=session_maker()) as session:
|
||||
if states := session.execute(
|
||||
find_states_context_ids_to_migrate(instance.max_bind_vars)
|
||||
).all():
|
||||
session.execute(
|
||||
update(States),
|
||||
[
|
||||
{
|
||||
"state_id": state_id,
|
||||
"context_id": None,
|
||||
"context_id_bin": _to_bytes(context_id)
|
||||
or _generate_ulid_bytes_at_time(last_updated_ts),
|
||||
"context_user_id": None,
|
||||
"context_user_id_bin": _to_bytes(context_user_id),
|
||||
"context_parent_id": None,
|
||||
"context_parent_id_bin": _to_bytes(context_parent_id),
|
||||
}
|
||||
for state_id, last_updated_ts, context_id, context_user_id, context_parent_id in states
|
||||
],
|
||||
)
|
||||
# If there is more work to do return False
|
||||
# so that we can be called again
|
||||
if is_done := not states:
|
||||
_mark_migration_done(session, StatesContextIDMigration)
|
||||
|
||||
if is_done:
|
||||
_drop_index(session_maker, "states", "ix_states_context_id")
|
||||
|
||||
_LOGGER.debug("Migrating states context_ids to binary format: done=%s", is_done)
|
||||
return is_done
|
||||
|
||||
def needs_migrate_query(self) -> StatementLambdaElement:
|
||||
"""Return the query to check if the migration needs to run."""
|
||||
|
@ -2278,9 +2100,42 @@ class EventsContextIDMigration(BaseRunTimeMigration):
|
|||
migration_id = "event_context_id_as_binary"
|
||||
|
||||
@staticmethod
|
||||
@retryable_database_job("migrate events context_ids to binary format")
|
||||
def migrate_data(instance: Recorder) -> bool:
|
||||
"""Migrate some data, returns True if migration is completed."""
|
||||
return migrate_events_context_ids(instance)
|
||||
"""Migrate events context_ids to use binary format, return True if completed."""
|
||||
_to_bytes = _context_id_to_bytes
|
||||
session_maker = instance.get_session
|
||||
_LOGGER.debug("Migrating context_ids to binary format")
|
||||
with session_scope(session=session_maker()) as session:
|
||||
if events := session.execute(
|
||||
find_events_context_ids_to_migrate(instance.max_bind_vars)
|
||||
).all():
|
||||
session.execute(
|
||||
update(Events),
|
||||
[
|
||||
{
|
||||
"event_id": event_id,
|
||||
"context_id": None,
|
||||
"context_id_bin": _to_bytes(context_id)
|
||||
or _generate_ulid_bytes_at_time(time_fired_ts),
|
||||
"context_user_id": None,
|
||||
"context_user_id_bin": _to_bytes(context_user_id),
|
||||
"context_parent_id": None,
|
||||
"context_parent_id_bin": _to_bytes(context_parent_id),
|
||||
}
|
||||
for event_id, time_fired_ts, context_id, context_user_id, context_parent_id in events
|
||||
],
|
||||
)
|
||||
# If there is more work to do return False
|
||||
# so that we can be called again
|
||||
if is_done := not events:
|
||||
_mark_migration_done(session, EventsContextIDMigration)
|
||||
|
||||
if is_done:
|
||||
_drop_index(session_maker, "events", "ix_events_context_id")
|
||||
|
||||
_LOGGER.debug("Migrating events context_ids to binary format: done=%s", is_done)
|
||||
return is_done
|
||||
|
||||
def needs_migrate_query(self) -> StatementLambdaElement:
|
||||
"""Return the query to check if the migration needs to run."""
|
||||
|
@ -2298,9 +2153,67 @@ class EventTypeIDMigration(BaseRunTimeMigration):
|
|||
# the db since this happens live
|
||||
|
||||
@staticmethod
|
||||
@retryable_database_job("migrate events event_types to event_type_ids")
|
||||
def migrate_data(instance: Recorder) -> bool:
|
||||
"""Migrate some data, returns True if migration is completed."""
|
||||
return migrate_event_type_ids(instance)
|
||||
"""Migrate event_type to event_type_ids, return True if completed."""
|
||||
session_maker = instance.get_session
|
||||
_LOGGER.debug("Migrating event_types")
|
||||
event_type_manager = instance.event_type_manager
|
||||
with session_scope(session=session_maker()) as session:
|
||||
if events := session.execute(
|
||||
find_event_type_to_migrate(instance.max_bind_vars)
|
||||
).all():
|
||||
event_types = {event_type for _, event_type in events}
|
||||
if None in event_types:
|
||||
# event_type should never be None but we need to be defensive
|
||||
# so we don't fail the migration because of a bad state
|
||||
event_types.remove(None)
|
||||
event_types.add(_EMPTY_EVENT_TYPE)
|
||||
|
||||
event_type_to_id = event_type_manager.get_many(event_types, session)
|
||||
if missing_event_types := {
|
||||
event_type
|
||||
for event_type, event_id in event_type_to_id.items()
|
||||
if event_id is None
|
||||
}:
|
||||
missing_db_event_types = [
|
||||
EventTypes(event_type=event_type)
|
||||
for event_type in missing_event_types
|
||||
]
|
||||
session.add_all(missing_db_event_types)
|
||||
session.flush() # Assign ids
|
||||
for db_event_type in missing_db_event_types:
|
||||
# We cannot add the assigned ids to the event_type_manager
|
||||
# because the commit could get rolled back
|
||||
assert (
|
||||
db_event_type.event_type is not None
|
||||
), "event_type should never be None"
|
||||
event_type_to_id[db_event_type.event_type] = (
|
||||
db_event_type.event_type_id
|
||||
)
|
||||
event_type_manager.clear_non_existent(db_event_type.event_type)
|
||||
|
||||
session.execute(
|
||||
update(Events),
|
||||
[
|
||||
{
|
||||
"event_id": event_id,
|
||||
"event_type": None,
|
||||
"event_type_id": event_type_to_id[
|
||||
_EMPTY_EVENT_TYPE if event_type is None else event_type
|
||||
],
|
||||
}
|
||||
for event_id, event_type in events
|
||||
],
|
||||
)
|
||||
|
||||
# If there is more work to do return False
|
||||
# so that we can be called again
|
||||
if is_done := not events:
|
||||
_mark_migration_done(session, EventTypeIDMigration)
|
||||
|
||||
_LOGGER.debug("Migrating event_types done=%s", is_done)
|
||||
return is_done
|
||||
|
||||
def migration_done(self, instance: Recorder) -> None:
|
||||
"""Will be called after migrate returns True."""
|
||||
|
@ -2323,9 +2236,77 @@ class EntityIDMigration(BaseRunTimeMigration):
|
|||
# the db since this happens live
|
||||
|
||||
@staticmethod
|
||||
@retryable_database_job("migrate states entity_ids to states_meta")
|
||||
def migrate_data(instance: Recorder) -> bool:
|
||||
"""Migrate some data, returns True if migration is completed."""
|
||||
return migrate_entity_ids(instance)
|
||||
"""Migrate entity_ids to states_meta, return True if completed.
|
||||
|
||||
We do this in two steps because we need the history queries to work
|
||||
while we are migrating.
|
||||
|
||||
1. Link the states to the states_meta table
|
||||
2. Remove the entity_id column from the states table (in post_migrate_entity_ids)
|
||||
"""
|
||||
_LOGGER.debug("Migrating entity_ids")
|
||||
states_meta_manager = instance.states_meta_manager
|
||||
with session_scope(session=instance.get_session()) as session:
|
||||
if states := session.execute(
|
||||
find_entity_ids_to_migrate(instance.max_bind_vars)
|
||||
).all():
|
||||
entity_ids = {entity_id for _, entity_id in states}
|
||||
if None in entity_ids:
|
||||
# entity_id should never be None but we need to be defensive
|
||||
# so we don't fail the migration because of a bad state
|
||||
entity_ids.remove(None)
|
||||
entity_ids.add(_EMPTY_ENTITY_ID)
|
||||
|
||||
entity_id_to_metadata_id = states_meta_manager.get_many(
|
||||
entity_ids, session, True
|
||||
)
|
||||
if missing_entity_ids := {
|
||||
entity_id
|
||||
for entity_id, metadata_id in entity_id_to_metadata_id.items()
|
||||
if metadata_id is None
|
||||
}:
|
||||
missing_states_metadata = [
|
||||
StatesMeta(entity_id=entity_id)
|
||||
for entity_id in missing_entity_ids
|
||||
]
|
||||
session.add_all(missing_states_metadata)
|
||||
session.flush() # Assign ids
|
||||
for db_states_metadata in missing_states_metadata:
|
||||
# We cannot add the assigned ids to the event_type_manager
|
||||
# because the commit could get rolled back
|
||||
assert (
|
||||
db_states_metadata.entity_id is not None
|
||||
), "entity_id should never be None"
|
||||
entity_id_to_metadata_id[db_states_metadata.entity_id] = (
|
||||
db_states_metadata.metadata_id
|
||||
)
|
||||
|
||||
session.execute(
|
||||
update(States),
|
||||
[
|
||||
{
|
||||
"state_id": state_id,
|
||||
# We cannot set "entity_id": None yet since
|
||||
# the history queries still need to work while the
|
||||
# migration is in progress and we will do this in
|
||||
# post_migrate_entity_ids
|
||||
"metadata_id": entity_id_to_metadata_id[
|
||||
_EMPTY_ENTITY_ID if entity_id is None else entity_id
|
||||
],
|
||||
}
|
||||
for state_id, entity_id in states
|
||||
],
|
||||
)
|
||||
|
||||
# If there is more work to do return False
|
||||
# so that we can be called again
|
||||
if is_done := not states:
|
||||
_mark_migration_done(session, EntityIDMigration)
|
||||
|
||||
_LOGGER.debug("Migrating entity_ids done=%s", is_done)
|
||||
return is_done
|
||||
|
||||
def migration_done(self, instance: Recorder) -> None:
|
||||
"""Will be called after migrate returns True."""
|
||||
|
|
|
@ -1451,16 +1451,22 @@ async def async_test_recorder(
|
|||
else None
|
||||
)
|
||||
migrate_states_context_ids = (
|
||||
migration.migrate_states_context_ids if enable_migrate_context_ids else None
|
||||
migration.StatesContextIDMigration.migrate_data
|
||||
if enable_migrate_context_ids
|
||||
else None
|
||||
)
|
||||
migrate_events_context_ids = (
|
||||
migration.migrate_events_context_ids if enable_migrate_context_ids else None
|
||||
migration.EventsContextIDMigration.migrate_data
|
||||
if enable_migrate_context_ids
|
||||
else None
|
||||
)
|
||||
migrate_event_type_ids = (
|
||||
migration.migrate_event_type_ids if enable_migrate_event_type_ids else None
|
||||
migration.EventTypeIDMigration.migrate_data
|
||||
if enable_migrate_event_type_ids
|
||||
else None
|
||||
)
|
||||
migrate_entity_ids = (
|
||||
migration.migrate_entity_ids if enable_migrate_entity_ids else None
|
||||
migration.EntityIDMigration.migrate_data if enable_migrate_entity_ids else None
|
||||
)
|
||||
legacy_event_id_foreign_key_exists = (
|
||||
recorder.Recorder._legacy_event_id_foreign_key_exists
|
||||
|
@ -1484,22 +1490,22 @@ async def async_test_recorder(
|
|||
autospec=True,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.migration.migrate_events_context_ids",
|
||||
"homeassistant.components.recorder.migration.EventsContextIDMigration.migrate_data",
|
||||
side_effect=migrate_events_context_ids,
|
||||
autospec=True,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.migration.migrate_states_context_ids",
|
||||
"homeassistant.components.recorder.migration.StatesContextIDMigration.migrate_data",
|
||||
side_effect=migrate_states_context_ids,
|
||||
autospec=True,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.migration.migrate_event_type_ids",
|
||||
"homeassistant.components.recorder.migration.EventTypeIDMigration.migrate_data",
|
||||
side_effect=migrate_event_type_ids,
|
||||
autospec=True,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.migration.migrate_entity_ids",
|
||||
"homeassistant.components.recorder.migration.EntityIDMigration.migrate_data",
|
||||
side_effect=migrate_entity_ids,
|
||||
autospec=True,
|
||||
),
|
||||
|
|
Loading…
Add table
Reference in a new issue