Fix index not being dropped on postgresql databases with a schema prefix (#90144)
* Fix index not being dropped on postgresql databases with a schema prefix Added logging in case index drops fail so we can tell why in the future * coverage
This commit is contained in:
parent
99b58f157e
commit
31c988c4f0
2 changed files with 92 additions and 63 deletions
|
@ -297,6 +297,19 @@ def _create_index(
|
||||||
_LOGGER.debug("Finished creating %s", index_name)
|
_LOGGER.debug("Finished creating %s", index_name)
|
||||||
|
|
||||||
|
|
||||||
|
def _execute_or_collect_error(
|
||||||
|
session_maker: Callable[[], Session], query: str, errors: list[str]
|
||||||
|
) -> bool:
|
||||||
|
"""Execute a query or collect an error."""
|
||||||
|
with session_scope(session=session_maker()) as session:
|
||||||
|
try:
|
||||||
|
session.connection().execute(text(query))
|
||||||
|
return True
|
||||||
|
except SQLAlchemyError as err:
|
||||||
|
errors.append(str(err))
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def _drop_index(
|
def _drop_index(
|
||||||
session_maker: Callable[[], Session],
|
session_maker: Callable[[], Session],
|
||||||
table_name: str,
|
table_name: str,
|
||||||
|
@ -322,74 +335,45 @@ def _drop_index(
|
||||||
index_name,
|
index_name,
|
||||||
table_name,
|
table_name,
|
||||||
)
|
)
|
||||||
success = False
|
index_to_drop: str | None = None
|
||||||
|
with session_scope(session=session_maker()) as session:
|
||||||
|
index_to_drop = get_index_by_name(session, table_name, index_name)
|
||||||
|
|
||||||
# Engines like DB2/Oracle
|
if index_to_drop is None:
|
||||||
with session_scope(session=session_maker()) as session, contextlib.suppress(
|
|
||||||
SQLAlchemyError
|
|
||||||
):
|
|
||||||
connection = session.connection()
|
|
||||||
connection.execute(text(f"DROP INDEX {index_name}"))
|
|
||||||
success = True
|
|
||||||
|
|
||||||
# Engines like SQLite, SQL Server
|
|
||||||
if not success:
|
|
||||||
with session_scope(session=session_maker()) as session, contextlib.suppress(
|
|
||||||
SQLAlchemyError
|
|
||||||
):
|
|
||||||
connection = session.connection()
|
|
||||||
connection.execute(
|
|
||||||
text(
|
|
||||||
"DROP INDEX {table}.{index}".format(
|
|
||||||
index=index_name, table=table_name
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
success = True
|
|
||||||
|
|
||||||
if not success:
|
|
||||||
# Engines like MySQL, MS Access
|
|
||||||
with session_scope(session=session_maker()) as session, contextlib.suppress(
|
|
||||||
SQLAlchemyError
|
|
||||||
):
|
|
||||||
connection = session.connection()
|
|
||||||
connection.execute(
|
|
||||||
text(
|
|
||||||
"DROP INDEX {index} ON {table}".format(
|
|
||||||
index=index_name, table=table_name
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
success = True
|
|
||||||
|
|
||||||
if not success:
|
|
||||||
# Engines like postgresql may have a prefix
|
|
||||||
# ex idx_16532_ix_events_event_type_time_fired
|
|
||||||
with session_scope(session=session_maker()) as session, contextlib.suppress(
|
|
||||||
SQLAlchemyError
|
|
||||||
):
|
|
||||||
if index_to_drop := get_index_by_name(session, table_name, index_name):
|
|
||||||
connection.execute(text(f"DROP INDEX {index_to_drop}"))
|
|
||||||
success = True
|
|
||||||
|
|
||||||
if success:
|
|
||||||
_LOGGER.debug(
|
_LOGGER.debug(
|
||||||
"Finished dropping index %s from table %s", index_name, table_name
|
"The index %s on table %s no longer exists", index_name, table_name
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
if quiet:
|
errors: list[str] = []
|
||||||
return
|
for query in (
|
||||||
|
# Engines like DB2/Oracle
|
||||||
|
f"DROP INDEX {index_name}",
|
||||||
|
# Engines like SQLite, SQL Server
|
||||||
|
f"DROP INDEX {table_name}.{index_name}",
|
||||||
|
# Engines like MySQL, MS Access
|
||||||
|
f"DROP INDEX {index_name} ON {table_name}",
|
||||||
|
# Engines like postgresql may have a prefix
|
||||||
|
# ex idx_16532_ix_events_event_type_time_fired
|
||||||
|
f"DROP INDEX {index_to_drop}",
|
||||||
|
):
|
||||||
|
if _execute_or_collect_error(session_maker, query, errors):
|
||||||
|
_LOGGER.debug(
|
||||||
|
"Finished dropping index %s from table %s", index_name, table_name
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
_LOGGER.warning(
|
if not quiet:
|
||||||
(
|
_LOGGER.warning(
|
||||||
"Failed to drop index `%s` from table `%s`. Schema "
|
(
|
||||||
"Migration will continue; this is not a "
|
"Failed to drop index `%s` from table `%s`. Schema "
|
||||||
"critical operation"
|
"Migration will continue; this is not a "
|
||||||
),
|
"critical operation: %s"
|
||||||
index_name,
|
),
|
||||||
table_name,
|
index_name,
|
||||||
)
|
table_name,
|
||||||
|
errors,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _add_columns(
|
def _add_columns(
|
||||||
|
|
|
@ -15,6 +15,7 @@ from sqlalchemy.exc import (
|
||||||
InternalError,
|
InternalError,
|
||||||
OperationalError,
|
OperationalError,
|
||||||
ProgrammingError,
|
ProgrammingError,
|
||||||
|
SQLAlchemyError,
|
||||||
)
|
)
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
from sqlalchemy.pool import StaticPool
|
from sqlalchemy.pool import StaticPool
|
||||||
|
@ -492,7 +493,51 @@ def test_forgiving_add_index(recorder_db_url: str) -> None:
|
||||||
with Session(engine) as session:
|
with Session(engine) as session:
|
||||||
instance = Mock()
|
instance = Mock()
|
||||||
instance.get_session = Mock(return_value=session)
|
instance.get_session = Mock(return_value=session)
|
||||||
migration._create_index(instance.get_session, "states", "ix_states_context_id")
|
migration._create_index(
|
||||||
|
instance.get_session, "states", "ix_states_context_id_bin"
|
||||||
|
)
|
||||||
|
engine.dispose()
|
||||||
|
|
||||||
|
|
||||||
|
def test_forgiving_drop_index(
|
||||||
|
recorder_db_url: str, caplog: pytest.LogCaptureFixture
|
||||||
|
) -> None:
|
||||||
|
"""Test that drop index will continue if index drop fails."""
|
||||||
|
engine = create_engine(recorder_db_url, poolclass=StaticPool)
|
||||||
|
db_schema.Base.metadata.create_all(engine)
|
||||||
|
with Session(engine) as session:
|
||||||
|
instance = Mock()
|
||||||
|
instance.get_session = Mock(return_value=session)
|
||||||
|
migration._drop_index(
|
||||||
|
instance.get_session, "states", "ix_states_context_id_bin"
|
||||||
|
)
|
||||||
|
migration._drop_index(
|
||||||
|
instance.get_session, "states", "ix_states_context_id_bin"
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.recorder.migration.get_index_by_name",
|
||||||
|
return_value="ix_states_context_id_bin",
|
||||||
|
), patch.object(
|
||||||
|
session, "connection", side_effect=SQLAlchemyError("connection failure")
|
||||||
|
):
|
||||||
|
migration._drop_index(
|
||||||
|
instance.get_session, "states", "ix_states_context_id_bin"
|
||||||
|
)
|
||||||
|
assert "Failed to drop index" in caplog.text
|
||||||
|
assert "connection failure" in caplog.text
|
||||||
|
caplog.clear()
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.recorder.migration.get_index_by_name",
|
||||||
|
return_value="ix_states_context_id_bin",
|
||||||
|
), patch.object(
|
||||||
|
session, "connection", side_effect=SQLAlchemyError("connection failure")
|
||||||
|
):
|
||||||
|
migration._drop_index(
|
||||||
|
instance.get_session, "states", "ix_states_context_id_bin", quiet=True
|
||||||
|
)
|
||||||
|
assert "Failed to drop index" not in caplog.text
|
||||||
|
assert "connection failure" not in caplog.text
|
||||||
engine.dispose()
|
engine.dispose()
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue