Remove the old ix_states_event_id index if its no longer being used (#89901)
* Remove the old ix_states_event_id index if its no longer being used * cover it * fixes * fixup * Update homeassistant/components/recorder/tasks.py
This commit is contained in:
parent
138bbd9c28
commit
b1f64de6ce
8 changed files with 124 additions and 22 deletions
|
@ -48,6 +48,8 @@ CONTEXT_ID_AS_BINARY_SCHEMA_VERSION = 36
|
||||||
EVENT_TYPE_IDS_SCHEMA_VERSION = 37
|
EVENT_TYPE_IDS_SCHEMA_VERSION = 37
|
||||||
STATES_META_SCHEMA_VERSION = 38
|
STATES_META_SCHEMA_VERSION = 38
|
||||||
|
|
||||||
|
LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION = 28
|
||||||
|
|
||||||
|
|
||||||
class SupportedDialect(StrEnum):
|
class SupportedDialect(StrEnum):
|
||||||
"""Supported dialects."""
|
"""Supported dialects."""
|
||||||
|
|
|
@ -46,6 +46,7 @@ from .const import (
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
EVENT_TYPE_IDS_SCHEMA_VERSION,
|
EVENT_TYPE_IDS_SCHEMA_VERSION,
|
||||||
KEEPALIVE_TIME,
|
KEEPALIVE_TIME,
|
||||||
|
LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION,
|
||||||
MARIADB_PYMYSQL_URL_PREFIX,
|
MARIADB_PYMYSQL_URL_PREFIX,
|
||||||
MARIADB_URL_PREFIX,
|
MARIADB_URL_PREFIX,
|
||||||
MAX_QUEUE_BACKLOG,
|
MAX_QUEUE_BACKLOG,
|
||||||
|
@ -57,7 +58,9 @@ from .const import (
|
||||||
SupportedDialect,
|
SupportedDialect,
|
||||||
)
|
)
|
||||||
from .db_schema import (
|
from .db_schema import (
|
||||||
|
LEGACY_STATES_EVENT_ID_INDEX,
|
||||||
SCHEMA_VERSION,
|
SCHEMA_VERSION,
|
||||||
|
TABLE_STATES,
|
||||||
Base,
|
Base,
|
||||||
EventData,
|
EventData,
|
||||||
Events,
|
Events,
|
||||||
|
@ -93,6 +96,7 @@ from .tasks import (
|
||||||
CompileMissingStatisticsTask,
|
CompileMissingStatisticsTask,
|
||||||
DatabaseLockTask,
|
DatabaseLockTask,
|
||||||
EntityIDMigrationTask,
|
EntityIDMigrationTask,
|
||||||
|
EventIdMigrationTask,
|
||||||
EventsContextIDMigrationTask,
|
EventsContextIDMigrationTask,
|
||||||
EventTask,
|
EventTask,
|
||||||
EventTypeIDMigrationTask,
|
EventTypeIDMigrationTask,
|
||||||
|
@ -113,6 +117,7 @@ from .util import (
|
||||||
dburl_to_path,
|
dburl_to_path,
|
||||||
end_incomplete_runs,
|
end_incomplete_runs,
|
||||||
execute_stmt_lambda_element,
|
execute_stmt_lambda_element,
|
||||||
|
get_index_by_name,
|
||||||
is_second_sunday,
|
is_second_sunday,
|
||||||
move_away_broken_database,
|
move_away_broken_database,
|
||||||
session_scope,
|
session_scope,
|
||||||
|
@ -730,6 +735,15 @@ class Recorder(threading.Thread):
|
||||||
_LOGGER.debug("Activating states_meta manager as all data is migrated")
|
_LOGGER.debug("Activating states_meta manager as all data is migrated")
|
||||||
self.states_meta_manager.active = True
|
self.states_meta_manager.active = True
|
||||||
|
|
||||||
|
if self.schema_version > LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION:
|
||||||
|
with contextlib.suppress(SQLAlchemyError):
|
||||||
|
# If the index of event_ids on the states table is still present
|
||||||
|
# we need to queue a task to remove it.
|
||||||
|
if get_index_by_name(
|
||||||
|
session, TABLE_STATES, LEGACY_STATES_EVENT_ID_INDEX
|
||||||
|
):
|
||||||
|
self.queue_task(EventIdMigrationTask())
|
||||||
|
|
||||||
# We must only set the db ready after we have set the table managers
|
# We must only set the db ready after we have set the table managers
|
||||||
# to active if there is no data to migrate.
|
# to active if there is no data to migrate.
|
||||||
#
|
#
|
||||||
|
@ -1138,6 +1152,10 @@ class Recorder(threading.Thread):
|
||||||
"""Post migrate entity_ids if needed."""
|
"""Post migrate entity_ids if needed."""
|
||||||
return migration.post_migrate_entity_ids(self)
|
return migration.post_migrate_entity_ids(self)
|
||||||
|
|
||||||
|
def _cleanup_legacy_states_event_ids(self) -> bool:
|
||||||
|
"""Cleanup legacy event_ids if needed."""
|
||||||
|
return migration.cleanup_legacy_states_event_ids(self)
|
||||||
|
|
||||||
def _send_keep_alive(self) -> None:
|
def _send_keep_alive(self) -> None:
|
||||||
"""Send a keep alive to keep the db connection open."""
|
"""Send a keep alive to keep the db connection open."""
|
||||||
assert self.event_session is not None
|
assert self.event_session is not None
|
||||||
|
|
|
@ -116,6 +116,7 @@ LAST_UPDATED_INDEX_TS = "ix_states_last_updated_ts"
|
||||||
METADATA_ID_LAST_UPDATED_INDEX_TS = "ix_states_metadata_id_last_updated_ts"
|
METADATA_ID_LAST_UPDATED_INDEX_TS = "ix_states_metadata_id_last_updated_ts"
|
||||||
EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin"
|
EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin"
|
||||||
STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin"
|
STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin"
|
||||||
|
LEGACY_STATES_EVENT_ID_INDEX = "ix_states_event_id"
|
||||||
CONTEXT_ID_BIN_MAX_LENGTH = 16
|
CONTEXT_ID_BIN_MAX_LENGTH = 16
|
||||||
|
|
||||||
_DEFAULT_TABLE_ARGS = {
|
_DEFAULT_TABLE_ARGS = {
|
||||||
|
@ -385,9 +386,7 @@ class States(Base):
|
||||||
attributes: Mapped[str | None] = mapped_column(
|
attributes: Mapped[str | None] = mapped_column(
|
||||||
Text().with_variant(mysql.LONGTEXT, "mysql", "mariadb")
|
Text().with_variant(mysql.LONGTEXT, "mysql", "mariadb")
|
||||||
) # no longer used for new rows
|
) # no longer used for new rows
|
||||||
event_id: Mapped[int | None] = mapped_column( # no longer used for new rows
|
event_id: Mapped[int | None] = mapped_column(Integer) # no longer used for new rows
|
||||||
Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True
|
|
||||||
)
|
|
||||||
last_changed: Mapped[datetime | None] = mapped_column(
|
last_changed: Mapped[datetime | None] = mapped_column(
|
||||||
DATETIME_TYPE
|
DATETIME_TYPE
|
||||||
) # no longer used for new rows
|
) # no longer used for new rows
|
||||||
|
|
|
@ -30,6 +30,7 @@ from homeassistant.util.ulid import ulid_to_bytes
|
||||||
from .const import SupportedDialect
|
from .const import SupportedDialect
|
||||||
from .db_schema import (
|
from .db_schema import (
|
||||||
CONTEXT_ID_BIN_MAX_LENGTH,
|
CONTEXT_ID_BIN_MAX_LENGTH,
|
||||||
|
LEGACY_STATES_EVENT_ID_INDEX,
|
||||||
SCHEMA_VERSION,
|
SCHEMA_VERSION,
|
||||||
STATISTICS_TABLES,
|
STATISTICS_TABLES,
|
||||||
TABLE_STATES,
|
TABLE_STATES,
|
||||||
|
@ -51,6 +52,7 @@ from .queries import (
|
||||||
find_event_type_to_migrate,
|
find_event_type_to_migrate,
|
||||||
find_events_context_ids_to_migrate,
|
find_events_context_ids_to_migrate,
|
||||||
find_states_context_ids_to_migrate,
|
find_states_context_ids_to_migrate,
|
||||||
|
has_used_states_event_ids,
|
||||||
)
|
)
|
||||||
from .statistics import (
|
from .statistics import (
|
||||||
correct_db_schema as statistics_correct_db_schema,
|
correct_db_schema as statistics_correct_db_schema,
|
||||||
|
@ -64,7 +66,12 @@ from .tasks import (
|
||||||
PostSchemaMigrationTask,
|
PostSchemaMigrationTask,
|
||||||
StatisticsTimestampMigrationCleanupTask,
|
StatisticsTimestampMigrationCleanupTask,
|
||||||
)
|
)
|
||||||
from .util import database_job_retry_wrapper, retryable_database_job, session_scope
|
from .util import (
|
||||||
|
database_job_retry_wrapper,
|
||||||
|
get_index_by_name,
|
||||||
|
retryable_database_job,
|
||||||
|
session_scope,
|
||||||
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from . import Recorder
|
from . import Recorder
|
||||||
|
@ -308,18 +315,7 @@ def _drop_index(
|
||||||
with session_scope(session=session_maker()) as session, contextlib.suppress(
|
with session_scope(session=session_maker()) as session, contextlib.suppress(
|
||||||
SQLAlchemyError
|
SQLAlchemyError
|
||||||
):
|
):
|
||||||
connection = session.connection()
|
if index_to_drop := get_index_by_name(session, table_name, index_name):
|
||||||
inspector = sqlalchemy.inspect(connection)
|
|
||||||
indexes = inspector.get_indexes(table_name)
|
|
||||||
if index_to_drop := next(
|
|
||||||
(
|
|
||||||
possible_index["name"]
|
|
||||||
for possible_index in indexes
|
|
||||||
if possible_index["name"]
|
|
||||||
and possible_index["name"].endswith(f"_{index_name}")
|
|
||||||
),
|
|
||||||
None,
|
|
||||||
):
|
|
||||||
connection.execute(text(f"DROP INDEX {index_to_drop}"))
|
connection.execute(text(f"DROP INDEX {index_to_drop}"))
|
||||||
success = True
|
success = True
|
||||||
|
|
||||||
|
@ -593,7 +589,7 @@ def _apply_update( # noqa: C901
|
||||||
# but it was removed in version 32
|
# but it was removed in version 32
|
||||||
elif new_version == 5:
|
elif new_version == 5:
|
||||||
# Create supporting index for States.event_id foreign key
|
# Create supporting index for States.event_id foreign key
|
||||||
_create_index(session_maker, "states", "ix_states_event_id")
|
_create_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX)
|
||||||
elif new_version == 6:
|
elif new_version == 6:
|
||||||
_add_columns(
|
_add_columns(
|
||||||
session_maker,
|
session_maker,
|
||||||
|
@ -1529,6 +1525,33 @@ def post_migrate_entity_ids(instance: Recorder) -> bool:
|
||||||
return is_done
|
return is_done
|
||||||
|
|
||||||
|
|
||||||
|
@retryable_database_job("cleanup_legacy_event_ids")
|
||||||
|
def cleanup_legacy_states_event_ids(instance: Recorder) -> bool:
|
||||||
|
"""Remove old event_id index from states.
|
||||||
|
|
||||||
|
We used to link states to events using the event_id column but we no
|
||||||
|
longer store state changed events in the events table.
|
||||||
|
|
||||||
|
If all old states have been purged and existing states are in the new
|
||||||
|
format we can drop the index since it can take up ~10MB per 1M rows.
|
||||||
|
"""
|
||||||
|
session_maker = instance.get_session
|
||||||
|
_LOGGER.debug("Cleanup legacy entity_ids")
|
||||||
|
with session_scope(session=session_maker()) as session:
|
||||||
|
result = session.execute(has_used_states_event_ids()).scalar()
|
||||||
|
# In the future we may migrate existing states to the new format
|
||||||
|
# but in practice very few of these still exist in production and
|
||||||
|
# removing the index is the likely all that needs to happen.
|
||||||
|
all_gone = not result
|
||||||
|
|
||||||
|
if all_gone:
|
||||||
|
# Only drop the index if there are no more event_ids in the states table
|
||||||
|
# ex all NULL
|
||||||
|
_drop_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def _initialize_database(session: Session) -> bool:
|
def _initialize_database(session: Session) -> bool:
|
||||||
"""Initialize a new database.
|
"""Initialize a new database.
|
||||||
|
|
||||||
|
|
|
@ -745,6 +745,13 @@ def batch_cleanup_entity_ids() -> StatementLambdaElement:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def has_used_states_event_ids() -> StatementLambdaElement:
|
||||||
|
"""Check if there are used event_ids in the states table."""
|
||||||
|
return lambda_stmt(
|
||||||
|
lambda: select(States.state_id).filter(States.event_id.isnot(None)).limit(1)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def has_events_context_ids_to_migrate() -> StatementLambdaElement:
|
def has_events_context_ids_to_migrate() -> StatementLambdaElement:
|
||||||
"""Check if there are events context ids to migrate."""
|
"""Check if there are events context ids to migrate."""
|
||||||
return lambda_stmt(
|
return lambda_stmt(
|
||||||
|
|
|
@ -438,3 +438,17 @@ class EntityIDPostMigrationTask(RecorderTask):
|
||||||
):
|
):
|
||||||
# Schedule a new migration task if this one didn't finish
|
# Schedule a new migration task if this one didn't finish
|
||||||
instance.queue_task(EntityIDPostMigrationTask())
|
instance.queue_task(EntityIDPostMigrationTask())
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class EventIdMigrationTask(RecorderTask):
|
||||||
|
"""An object to insert into the recorder queue to cleanup legacy event_ids in the states table.
|
||||||
|
|
||||||
|
This task should only be queued if the ix_states_event_id index exists
|
||||||
|
since it is used to scan the states table and it will be removed after this
|
||||||
|
task is run if its no longer needed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def run(self, instance: Recorder) -> None:
|
||||||
|
"""Clean up the legacy event_id index on states."""
|
||||||
|
instance._cleanup_legacy_states_event_ids() # pylint: disable=[protected-access]
|
||||||
|
|
|
@ -18,7 +18,7 @@ from awesomeversion import (
|
||||||
AwesomeVersionStrategy,
|
AwesomeVersionStrategy,
|
||||||
)
|
)
|
||||||
import ciso8601
|
import ciso8601
|
||||||
from sqlalchemy import text
|
from sqlalchemy import inspect, text
|
||||||
from sqlalchemy.engine import Result, Row
|
from sqlalchemy.engine import Result, Row
|
||||||
from sqlalchemy.exc import OperationalError, SQLAlchemyError
|
from sqlalchemy.exc import OperationalError, SQLAlchemyError
|
||||||
from sqlalchemy.orm.query import Query
|
from sqlalchemy.orm.query import Query
|
||||||
|
@ -832,3 +832,22 @@ def chunked(iterable: Iterable, chunked_num: int) -> Iterable[Any]:
|
||||||
From more-itertools
|
From more-itertools
|
||||||
"""
|
"""
|
||||||
return iter(partial(take, chunked_num, iter(iterable)), [])
|
return iter(partial(take, chunked_num, iter(iterable)), [])
|
||||||
|
|
||||||
|
|
||||||
|
def get_index_by_name(session: Session, table_name: str, index_name: str) -> str | None:
|
||||||
|
"""Get an index by name."""
|
||||||
|
connection = session.connection()
|
||||||
|
inspector = inspect(connection)
|
||||||
|
indexes = inspector.get_indexes(table_name)
|
||||||
|
return next(
|
||||||
|
(
|
||||||
|
possible_index["name"]
|
||||||
|
for possible_index in indexes
|
||||||
|
if possible_index["name"]
|
||||||
|
and (
|
||||||
|
possible_index["name"] == index_name
|
||||||
|
or possible_index["name"].endswith(f"_{index_name}")
|
||||||
|
)
|
||||||
|
),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
|
|
@ -91,6 +91,10 @@ async def test_migrate_times(
|
||||||
)
|
)
|
||||||
number_of_migrations = 5
|
number_of_migrations = 5
|
||||||
|
|
||||||
|
def _get_states_index_names():
|
||||||
|
with session_scope(hass=hass) as session:
|
||||||
|
return inspect(session.connection()).get_indexes("states")
|
||||||
|
|
||||||
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
|
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
|
||||||
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
|
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
|
||||||
), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(
|
), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(
|
||||||
|
@ -113,6 +117,8 @@ async def test_migrate_times(
|
||||||
"homeassistant.components.recorder.Recorder._migrate_entity_ids",
|
"homeassistant.components.recorder.Recorder._migrate_entity_ids",
|
||||||
), patch(
|
), patch(
|
||||||
"homeassistant.components.recorder.Recorder._post_migrate_entity_ids"
|
"homeassistant.components.recorder.Recorder._post_migrate_entity_ids"
|
||||||
|
), patch(
|
||||||
|
"homeassistant.components.recorder.Recorder._cleanup_legacy_states_event_ids"
|
||||||
):
|
):
|
||||||
hass = await async_test_home_assistant(asyncio.get_running_loop())
|
hass = await async_test_home_assistant(asyncio.get_running_loop())
|
||||||
recorder_helper.async_initialize_recorder(hass)
|
recorder_helper.async_initialize_recorder(hass)
|
||||||
|
@ -132,11 +138,18 @@ async def test_migrate_times(
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
await recorder.get_instance(hass).async_block_till_done()
|
await recorder.get_instance(hass).async_block_till_done()
|
||||||
|
|
||||||
|
states_indexes = await recorder.get_instance(hass).async_add_executor_job(
|
||||||
|
_get_states_index_names
|
||||||
|
)
|
||||||
|
states_index_names = {index["name"] for index in states_indexes}
|
||||||
|
|
||||||
await hass.async_stop()
|
await hass.async_stop()
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
|
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
|
||||||
|
|
||||||
|
assert "ix_states_event_id" in states_index_names
|
||||||
|
|
||||||
# Test that the duplicates are removed during migration from schema 23
|
# Test that the duplicates are removed during migration from schema 23
|
||||||
hass = await async_test_home_assistant(asyncio.get_running_loop())
|
hass = await async_test_home_assistant(asyncio.get_running_loop())
|
||||||
recorder_helper.async_initialize_recorder(hass)
|
recorder_helper.async_initialize_recorder(hass)
|
||||||
|
@ -186,13 +199,20 @@ async def test_migrate_times(
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
return inspect(session.connection()).get_indexes("events")
|
return inspect(session.connection()).get_indexes("events")
|
||||||
|
|
||||||
indexes = await recorder.get_instance(hass).async_add_executor_job(
|
events_indexes = await recorder.get_instance(hass).async_add_executor_job(
|
||||||
_get_events_index_names
|
_get_events_index_names
|
||||||
)
|
)
|
||||||
index_names = {index["name"] for index in indexes}
|
events_index_names = {index["name"] for index in events_indexes}
|
||||||
|
|
||||||
assert "ix_events_context_id_bin" in index_names
|
assert "ix_events_context_id_bin" in events_index_names
|
||||||
assert "ix_events_context_id" not in index_names
|
assert "ix_events_context_id" not in events_index_names
|
||||||
|
|
||||||
|
states_indexes = await recorder.get_instance(hass).async_add_executor_job(
|
||||||
|
_get_states_index_names
|
||||||
|
)
|
||||||
|
states_index_names = {index["name"] for index in states_indexes}
|
||||||
|
|
||||||
|
assert "ix_states_event_id" not in states_index_names
|
||||||
|
|
||||||
await hass.async_stop()
|
await hass.async_stop()
|
||||||
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
|
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue