Handle InnoDB deadlocks during migration (#89073)

* Handle slow InnoDB rollback when encountering duplicates during migration

fixes #89069

* adjust

* fix mock

* tests

* return on success
This commit is contained in:
J. Nick Koston 2023-03-03 17:00:13 -10:00 committed by GitHub
parent 1d9e8c873f
commit 1bd9767d8c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 97 additions and 19 deletions

View file

@ -50,7 +50,7 @@ from .tasks import (
PostSchemaMigrationTask, PostSchemaMigrationTask,
StatisticsTimestampMigrationCleanupTask, StatisticsTimestampMigrationCleanupTask,
) )
from .util import session_scope from .util import database_job_retry_wrapper, session_scope
if TYPE_CHECKING: if TYPE_CHECKING:
from . import Recorder from . import Recorder
@ -158,7 +158,9 @@ def migrate_schema(
hass.add_job(instance.async_set_db_ready) hass.add_job(instance.async_set_db_ready)
new_version = version + 1 new_version = version + 1
_LOGGER.info("Upgrading recorder db schema to version %s", new_version) _LOGGER.info("Upgrading recorder db schema to version %s", new_version)
_apply_update(hass, engine, session_maker, new_version, current_version) _apply_update(
instance, hass, engine, session_maker, new_version, current_version
)
with session_scope(session=session_maker()) as session: with session_scope(session=session_maker()) as session:
session.add(SchemaChanges(schema_version=new_version)) session.add(SchemaChanges(schema_version=new_version))
@ -508,7 +510,9 @@ def _drop_foreign_key_constraints(
) )
@database_job_retry_wrapper("Apply migration update", 10)
def _apply_update( # noqa: C901 def _apply_update( # noqa: C901
instance: Recorder,
hass: HomeAssistant, hass: HomeAssistant,
engine: Engine, engine: Engine,
session_maker: Callable[[], Session], session_maker: Callable[[], Session],
@ -922,7 +926,7 @@ def _apply_update( # noqa: C901
# There may be duplicated statistics entries, delete duplicates # There may be duplicated statistics entries, delete duplicates
# and try again # and try again
with session_scope(session=session_maker()) as session: with session_scope(session=session_maker()) as session:
delete_statistics_duplicates(hass, session) delete_statistics_duplicates(instance, hass, session)
_migrate_statistics_columns_to_timestamp(session_maker, engine) _migrate_statistics_columns_to_timestamp(session_maker, engine)
# Log at error level to ensure the user sees this message in the log # Log at error level to ensure the user sees this message in the log
# since we logged the error above. # since we logged the error above.
@ -965,7 +969,7 @@ def post_schema_migration(
# since they are no longer used and take up a significant amount of space. # since they are no longer used and take up a significant amount of space.
assert instance.event_session is not None assert instance.event_session is not None
assert instance.engine is not None assert instance.engine is not None
_wipe_old_string_time_columns(instance.engine, instance.event_session) _wipe_old_string_time_columns(instance, instance.engine, instance.event_session)
if old_version < 35 <= new_version: if old_version < 35 <= new_version:
# In version 34 we migrated all the created, start, and last_reset # In version 34 we migrated all the created, start, and last_reset
# columns to be timestamps. In version 34 we need to wipe the old columns # columns to be timestamps. In version 34 we need to wipe the old columns
@ -978,7 +982,10 @@ def _wipe_old_string_statistics_columns(instance: Recorder) -> None:
instance.queue_task(StatisticsTimestampMigrationCleanupTask()) instance.queue_task(StatisticsTimestampMigrationCleanupTask())
def _wipe_old_string_time_columns(engine: Engine, session: Session) -> None: @database_job_retry_wrapper("Wipe old string time columns", 3)
def _wipe_old_string_time_columns(
instance: Recorder, engine: Engine, session: Session
) -> None:
"""Wipe old string time columns to save space.""" """Wipe old string time columns to save space."""
# Wipe Events.time_fired since its been replaced by Events.time_fired_ts # Wipe Events.time_fired since its been replaced by Events.time_fired_ts
# Wipe States.last_updated since its been replaced by States.last_updated_ts # Wipe States.last_updated since its been replaced by States.last_updated_ts
@ -1162,7 +1169,7 @@ def _migrate_statistics_columns_to_timestamp(
"last_reset_ts=" "last_reset_ts="
"UNIX_TIMESTAMP(last_reset) " "UNIX_TIMESTAMP(last_reset) "
"where start_ts is NULL " "where start_ts is NULL "
"LIMIT 250000;" "LIMIT 100000;"
) )
) )
elif engine.dialect.name == SupportedDialect.POSTGRESQL: elif engine.dialect.name == SupportedDialect.POSTGRESQL:
@ -1180,7 +1187,7 @@ def _migrate_statistics_columns_to_timestamp(
"created_ts=EXTRACT(EPOCH FROM created), " "created_ts=EXTRACT(EPOCH FROM created), "
"last_reset_ts=EXTRACT(EPOCH FROM last_reset) " "last_reset_ts=EXTRACT(EPOCH FROM last_reset) "
"where id IN ( " "where id IN ( "
f"SELECT id FROM {table} where start_ts is NULL LIMIT 250000 " f"SELECT id FROM {table} where start_ts is NULL LIMIT 100000 "
" );" " );"
) )
) )

View file

@ -75,6 +75,7 @@ from .models import (
datetime_to_timestamp_or_none, datetime_to_timestamp_or_none,
) )
from .util import ( from .util import (
database_job_retry_wrapper,
execute, execute,
execute_stmt_lambda_element, execute_stmt_lambda_element,
get_instance, get_instance,
@ -515,7 +516,10 @@ def _delete_duplicates_from_table(
return (total_deleted_rows, all_non_identical_duplicates) return (total_deleted_rows, all_non_identical_duplicates)
def delete_statistics_duplicates(hass: HomeAssistant, session: Session) -> None: @database_job_retry_wrapper("delete statistics duplicates", 3)
def delete_statistics_duplicates(
instance: Recorder, hass: HomeAssistant, session: Session
) -> None:
"""Identify and delete duplicated statistics. """Identify and delete duplicated statistics.
A backup will be made of duplicated statistics before it is deleted. A backup will be made of duplicated statistics before it is deleted.

View file

@ -568,6 +568,17 @@ def end_incomplete_runs(session: Session, start_time: datetime) -> None:
session.add(run) session.add(run)
def _is_retryable_error(instance: Recorder, err: OperationalError) -> bool:
"""Return True if the error is retryable."""
assert instance.engine is not None
return bool(
instance.engine.dialect.name == SupportedDialect.MYSQL
and isinstance(err.orig, BaseException)
and err.orig.args
and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS
)
_FuncType = Callable[Concatenate[_RecorderT, _P], bool] _FuncType = Callable[Concatenate[_RecorderT, _P], bool]
@ -585,12 +596,8 @@ def retryable_database_job(
try: try:
return job(instance, *args, **kwargs) return job(instance, *args, **kwargs)
except OperationalError as err: except OperationalError as err:
assert instance.engine is not None if _is_retryable_error(instance, err):
if ( assert isinstance(err.orig, BaseException)
instance.engine.dialect.name == SupportedDialect.MYSQL
and err.orig
and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS
):
_LOGGER.info( _LOGGER.info(
"%s; %s not completed, retrying", err.orig.args[1], description "%s; %s not completed, retrying", err.orig.args[1], description
) )
@ -608,6 +615,46 @@ def retryable_database_job(
return decorator return decorator
_WrappedFuncType = Callable[Concatenate[_RecorderT, _P], None]
def database_job_retry_wrapper(
description: str, attempts: int = 5
) -> Callable[[_WrappedFuncType[_RecorderT, _P]], _WrappedFuncType[_RecorderT, _P]]:
"""Try to execute a database job multiple times.
This wrapper handles InnoDB deadlocks and lock timeouts.
This is different from retryable_database_job in that it will retry the job
attempts number of times instead of returning False if the job fails.
"""
def decorator(
job: _WrappedFuncType[_RecorderT, _P]
) -> _WrappedFuncType[_RecorderT, _P]:
@functools.wraps(job)
def wrapper(instance: _RecorderT, *args: _P.args, **kwargs: _P.kwargs) -> None:
for attempt in range(attempts):
try:
job(instance, *args, **kwargs)
return
except OperationalError as err:
if attempt == attempts - 1 or not _is_retryable_error(
instance, err
):
raise
assert isinstance(err.orig, BaseException)
_LOGGER.info(
"%s; %s failed, retrying", err.orig.args[1], description
)
time.sleep(instance.db_retry_wait)
# Failed with retryable error
return wrapper
return decorator
def periodic_db_cleanups(instance: Recorder) -> None: def periodic_db_cleanups(instance: Recorder) -> None:
"""Run any database cleanups that need to happen periodically. """Run any database cleanups that need to happen periodically.

View file

@ -69,7 +69,7 @@ async def test_schema_update_calls(recorder_db_url: str, hass: HomeAssistant) ->
session_maker = instance.get_session session_maker = instance.get_session
update.assert_has_calls( update.assert_has_calls(
[ [
call(hass, engine, session_maker, version + 1, 0) call(instance, hass, engine, session_maker, version + 1, 0)
for version in range(0, db_schema.SCHEMA_VERSION) for version in range(0, db_schema.SCHEMA_VERSION)
] ]
) )
@ -304,6 +304,8 @@ async def test_schema_migrate(
migration_version = None migration_version = None
real_migrate_schema = recorder.migration.migrate_schema real_migrate_schema = recorder.migration.migrate_schema
real_apply_update = recorder.migration._apply_update real_apply_update = recorder.migration._apply_update
real_create_index = recorder.migration._create_index
create_calls = 0
def _create_engine_test(*args, **kwargs): def _create_engine_test(*args, **kwargs):
"""Test version of create_engine that initializes with old schema. """Test version of create_engine that initializes with old schema.
@ -355,6 +357,17 @@ async def test_schema_migrate(
migration_stall.wait() migration_stall.wait()
real_apply_update(*args) real_apply_update(*args)
def _sometimes_failing_create_index(*args):
"""Make the first index create raise a retryable error to ensure we retry."""
if recorder_db_url.startswith("mysql://"):
nonlocal create_calls
if create_calls < 1:
create_calls += 1
mysql_exception = OperationalError("statement", {}, [])
mysql_exception.orig = Exception(1205, "retryable")
raise mysql_exception
real_create_index(*args)
with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch( with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch(
"homeassistant.components.recorder.core.create_engine", "homeassistant.components.recorder.core.create_engine",
new=_create_engine_test, new=_create_engine_test,
@ -368,6 +381,11 @@ async def test_schema_migrate(
), patch( ), patch(
"homeassistant.components.recorder.migration._apply_update", "homeassistant.components.recorder.migration._apply_update",
wraps=_instrument_apply_update, wraps=_instrument_apply_update,
) as apply_update_mock, patch(
"homeassistant.components.recorder.util.time.sleep"
), patch(
"homeassistant.components.recorder.migration._create_index",
wraps=_sometimes_failing_create_index,
), patch( ), patch(
"homeassistant.components.recorder.Recorder._schedule_compile_missing_statistics", "homeassistant.components.recorder.Recorder._schedule_compile_missing_statistics",
), patch( ), patch(
@ -394,12 +412,13 @@ async def test_schema_migrate(
assert migration_version == db_schema.SCHEMA_VERSION assert migration_version == db_schema.SCHEMA_VERSION
assert setup_run.called assert setup_run.called
assert recorder.util.async_migration_in_progress(hass) is not True assert recorder.util.async_migration_in_progress(hass) is not True
assert apply_update_mock.called
def test_invalid_update(hass: HomeAssistant) -> None: def test_invalid_update(hass: HomeAssistant) -> None:
"""Test that an invalid new version raises an exception.""" """Test that an invalid new version raises an exception."""
with pytest.raises(ValueError): with pytest.raises(ValueError):
migration._apply_update(hass, Mock(), Mock(), -1, 0) migration._apply_update(Mock(), hass, Mock(), Mock(), -1, 0)
@pytest.mark.parametrize( @pytest.mark.parametrize(

View file

@ -2,7 +2,7 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
import json import json
import sqlite3 import sqlite3
from unittest.mock import MagicMock, patch from unittest.mock import patch
import pytest import pytest
from sqlalchemy.exc import DatabaseError, OperationalError from sqlalchemy.exc import DatabaseError, OperationalError
@ -192,7 +192,7 @@ async def test_purge_old_states_encounters_temporary_mysql_error(
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
mysql_exception = OperationalError("statement", {}, []) mysql_exception = OperationalError("statement", {}, [])
mysql_exception.orig = MagicMock(args=(1205, "retryable")) mysql_exception.orig = Exception(1205, "retryable")
with patch( with patch(
"homeassistant.components.recorder.util.time.sleep" "homeassistant.components.recorder.util.time.sleep"

View file

@ -1231,8 +1231,9 @@ def test_delete_duplicates_no_duplicates(
"""Test removal of duplicated statistics.""" """Test removal of duplicated statistics."""
hass = hass_recorder() hass = hass_recorder()
wait_recording_done(hass) wait_recording_done(hass)
instance = recorder.get_instance(hass)
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
delete_statistics_duplicates(hass, session) delete_statistics_duplicates(instance, hass, session)
assert "duplicated statistics rows" not in caplog.text assert "duplicated statistics rows" not in caplog.text
assert "Found non identical" not in caplog.text assert "Found non identical" not in caplog.text
assert "Found duplicated" not in caplog.text assert "Found duplicated" not in caplog.text