Deduplicate entity_id in the states table (#89557)

This commit is contained in:
J. Nick Koston 2023-03-12 10:01:58 -10:00 committed by GitHub
parent 8d88b02c2e
commit c41f91be89
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 3715 additions and 1018 deletions

View file

@ -16,6 +16,7 @@ from .db_schema import (
RecorderRuns,
StateAttributes,
States,
StatesMeta,
StatisticsRuns,
StatisticsShortTerm,
)
@ -59,6 +60,20 @@ def find_event_type_ids(event_types: Iterable[str]) -> StatementLambdaElement:
)
def find_all_states_metadata_ids() -> StatementLambdaElement:
"""Find all metadata_ids and entity_ids."""
return lambda_stmt(lambda: select(StatesMeta.metadata_id, StatesMeta.entity_id))
def find_states_metadata_ids(entity_ids: Iterable[str]) -> StatementLambdaElement:
"""Find metadata_ids by entity_ids."""
return lambda_stmt(
lambda: select(StatesMeta.metadata_id, StatesMeta.entity_id).filter(
StatesMeta.entity_id.in_(entity_ids)
)
)
def find_shared_attributes_id(
data_hash: int, shared_attrs: str
) -> StatementLambdaElement:
@ -716,6 +731,54 @@ def find_event_type_to_migrate() -> StatementLambdaElement:
)
def find_entity_ids_to_migrate() -> StatementLambdaElement:
"""Find entity_id to migrate."""
return lambda_stmt(
lambda: select(
States.state_id,
States.entity_id,
)
.filter(States.metadata_id.is_(None))
.limit(SQLITE_MAX_BIND_VARS)
)
def batch_cleanup_entity_ids() -> StatementLambdaElement:
"""Find entity_id to cleanup."""
# Self join because This version of MariaDB doesn't yet support 'LIMIT & IN/ALL/ANY/SOME subquery'
return lambda_stmt(
lambda: update(States)
.where(
States.state_id.in_(
select(States.state_id).join(
states_with_entity_ids := select(
States.state_id.label("state_id_with_entity_id")
)
.filter(States.entity_id.is_not(None))
.limit(5000)
.subquery(),
States.state_id == states_with_entity_ids.c.state_id_with_entity_id,
)
)
)
.values(entity_id=None)
)
def has_events_context_ids_to_migrate() -> StatementLambdaElement:
"""Check if there are events context ids to migrate."""
return lambda_stmt(
lambda: select(Events.event_id).filter(Events.context_id_bin.is_(None)).limit(1)
)
def has_states_context_ids_to_migrate() -> StatementLambdaElement:
"""Check if there are states context ids to migrate."""
return lambda_stmt(
lambda: select(States.state_id).filter(States.context_id_bin.is_(None)).limit(1)
)
def has_event_type_to_migrate() -> StatementLambdaElement:
"""Check if there are event_types to migrate."""
return lambda_stmt(
@ -723,6 +786,13 @@ def has_event_type_to_migrate() -> StatementLambdaElement:
)
def has_entity_ids_to_migrate() -> StatementLambdaElement:
"""Check if there are entity_id to migrate."""
return lambda_stmt(
lambda: select(States.state_id).filter(States.metadata_id.is_(None)).limit(1)
)
def find_states_context_ids_to_migrate() -> StatementLambdaElement:
"""Find events context_ids to migrate."""
return lambda_stmt(
@ -754,6 +824,23 @@ def find_event_types_to_purge() -> StatementLambdaElement:
)
def find_entity_ids_to_purge() -> StatementLambdaElement:
"""Find entity_ids to purge."""
return lambda_stmt(
lambda: select(StatesMeta.metadata_id, StatesMeta.entity_id).where(
StatesMeta.metadata_id.not_in(
select(StatesMeta.metadata_id).join(
used_states_metadata_id := select(
distinct(States.metadata_id).label("used_states_metadata_id")
).subquery(),
StatesMeta.metadata_id
== used_states_metadata_id.c.used_states_metadata_id,
)
)
)
)
def delete_event_types_rows(event_type_ids: Iterable[int]) -> StatementLambdaElement:
"""Delete EventTypes rows."""
return lambda_stmt(
@ -761,3 +848,12 @@ def delete_event_types_rows(event_type_ids: Iterable[int]) -> StatementLambdaEle
.where(EventTypes.event_type_id.in_(event_type_ids))
.execution_options(synchronize_session=False)
)
def delete_states_meta_rows(metadata_ids: Iterable[int]) -> StatementLambdaElement:
"""Delete StatesMeta rows."""
return lambda_stmt(
lambda: delete(StatesMeta)
.where(StatesMeta.metadata_id.in_(metadata_ids))
.execution_options(synchronize_session=False)
)