From d8b618f7c3bd2a32fe763404d9cd272a20aef946 Mon Sep 17 00:00:00 2001 From: Erik Montnemery Date: Sat, 26 Oct 2024 07:19:03 +0200 Subject: [PATCH] Remove support for live recorder data migration of context ids (#125309) --- homeassistant/components/recorder/core.py | 29 +--- .../components/recorder/migration.py | 155 +++++++++++++++--- homeassistant/components/recorder/util.py | 93 +++++++---- .../statistics/test_duplicates.py | 6 + tests/components/recorder/common.py | 5 +- .../recorder/test_migration_from_schema_32.py | 141 +++++++++++----- ..._migration_run_time_migrations_remember.py | 5 +- .../recorder/test_statistics_v23_migration.py | 12 ++ tests/components/recorder/test_util.py | 32 +++- .../components/recorder/test_v32_migration.py | 5 + 10 files changed, 350 insertions(+), 133 deletions(-) diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 77d01088d67..02a4710fc91 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -78,16 +78,8 @@ from .db_schema import ( StatisticsShortTerm, ) from .executor import DBInterruptibleThreadPoolExecutor -from .migration import ( - EntityIDMigration, - EventIDPostMigration, - EventsContextIDMigration, - EventTypeIDMigration, - StatesContextIDMigration, -) from .models import DatabaseEngine, StatisticData, StatisticMetaData, UnsupportedDialect from .pool import POOL_SIZE, MutexPool, RecorderPool -from .queries import get_migration_changes from .table_managers.event_data import EventDataManager from .table_managers.event_types import EventTypeManager from .table_managers.recorder_runs import RecorderRunsManager @@ -120,7 +112,6 @@ from .util import ( build_mysqldb_conv, dburl_to_path, end_incomplete_runs, - execute_stmt_lambda_element, is_second_sunday, move_away_broken_database, session_scope, @@ -740,12 +731,17 @@ class Recorder(threading.Thread): # First do non-live migration steps, if needed if schema_status.migration_needed: + # Do non-live schema migration result, schema_status = self._migrate_schema_offline(schema_status) if not result: self._notify_migration_failed() self.migration_in_progress = False return self.schema_version = schema_status.current_version + + # Do non-live data migration + migration.migrate_data_non_live(self, self.get_session, schema_status) + # Non-live migration is now completed, remaining steps are live self.migration_is_live = True @@ -801,20 +797,7 @@ class Recorder(threading.Thread): # there are a lot of statistics graphs on the frontend. self.statistics_meta_manager.load(session) - migration_changes: dict[str, int] = { - row[0]: row[1] - for row in execute_stmt_lambda_element(session, get_migration_changes()) - } - - for migrator_cls in ( - StatesContextIDMigration, - EventsContextIDMigration, - EventTypeIDMigration, - EntityIDMigration, - EventIDPostMigration, - ): - migrator = migrator_cls(schema_status.start_version, migration_changes) - migrator.do_migrate(self, session) + migration.migrate_data_live(self, self.get_session, schema_status) # We must only set the db ready after we have set the table managers # to active if there is no data to migrate. diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 5180a0c440c..51604ae94bd 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -91,6 +91,7 @@ from .queries import ( find_states_context_ids_to_migrate, find_unmigrated_short_term_statistics_rows, find_unmigrated_statistics_rows, + get_migration_changes, has_entity_ids_to_migrate, has_event_type_to_migrate, has_events_context_ids_to_migrate, @@ -104,6 +105,7 @@ from .statistics import cleanup_statistics_timestamp_migration, get_start_time from .tasks import RecorderTask from .util import ( database_job_retry_wrapper, + database_job_retry_wrapper_method, execute_stmt_lambda_element, get_index_by_name, retryable_database_job_method, @@ -233,8 +235,12 @@ def validate_db_schema( # columns may otherwise not exist etc. schema_errors = _find_schema_errors(hass, instance, session_maker) + migration_needed = not is_current or non_live_data_migration_needed( + instance, session_maker, current_version + ) + return SchemaValidationStatus( - current_version, not is_current, schema_errors, current_version + current_version, migration_needed, schema_errors, current_version ) @@ -350,6 +356,68 @@ def migrate_schema_live( return schema_status +def _get_migration_changes(session: Session) -> dict[str, int]: + """Return migration changes as a dict.""" + migration_changes: dict[str, int] = { + row[0]: row[1] + for row in execute_stmt_lambda_element(session, get_migration_changes()) + } + return migration_changes + + +def non_live_data_migration_needed( + instance: Recorder, + session_maker: Callable[[], Session], + schema_version: int, +) -> bool: + """Return True if non-live data migration is needed. + + This must only be called if database schema is current. + """ + migration_needed = False + with session_scope(session=session_maker()) as session: + migration_changes = _get_migration_changes(session) + for migrator_cls in NON_LIVE_DATA_MIGRATORS: + migrator = migrator_cls(schema_version, migration_changes) + migration_needed |= migrator.needs_migrate(instance, session) + + return migration_needed + + +def migrate_data_non_live( + instance: Recorder, + session_maker: Callable[[], Session], + schema_status: SchemaValidationStatus, +) -> None: + """Do non-live data migration. + + This must be called after non-live schema migration is completed. + """ + with session_scope(session=session_maker()) as session: + migration_changes = _get_migration_changes(session) + + for migrator_cls in NON_LIVE_DATA_MIGRATORS: + migrator = migrator_cls(schema_status.start_version, migration_changes) + migrator.migrate_all(instance, session_maker) + + +def migrate_data_live( + instance: Recorder, + session_maker: Callable[[], Session], + schema_status: SchemaValidationStatus, +) -> None: + """Queue live schema migration tasks. + + This must be called after live schema migration is completed. + """ + with session_scope(session=session_maker()) as session: + migration_changes = _get_migration_changes(session) + + for migrator_cls in LIVE_DATA_MIGRATORS: + migrator = migrator_cls(schema_status.start_version, migration_changes) + migrator.queue_migration(instance, session) + + def _create_index( session_maker: Callable[[], Session], table_name: str, index_name: str ) -> None: @@ -2196,29 +2264,24 @@ class DataMigrationStatus: migration_done: bool -class BaseRunTimeMigration(ABC): - """Base class for run time migrations.""" +class BaseMigration(ABC): + """Base class for migrations.""" index_to_drop: tuple[str, str] | None = None required_schema_version = 0 migration_version = 1 migration_id: str - task = MigrationTask def __init__(self, schema_version: int, migration_changes: dict[str, int]) -> None: """Initialize a new BaseRunTimeMigration.""" self.schema_version = schema_version self.migration_changes = migration_changes - def do_migrate(self, instance: Recorder, session: Session) -> None: - """Start migration if needed.""" - if self.needs_migrate(instance, session): - instance.queue_task(self.task(self)) - else: - self.migration_done(instance, session) - - @retryable_database_job_method("migrate data") + @abstractmethod def migrate_data(self, instance: Recorder) -> bool: + """Migrate some data, return True if migration is completed.""" + + def _migrate_data(self, instance: Recorder) -> bool: """Migrate some data, returns True if migration is completed.""" status = self.migrate_data_impl(instance) if status.migration_done: @@ -2273,7 +2336,45 @@ class BaseRunTimeMigration(ABC): return needs_migrate.needs_migrate -class BaseRunTimeMigrationWithQuery(BaseRunTimeMigration): +class BaseOffLineMigration(BaseMigration): + """Base class for off line migrations.""" + + def migrate_all( + self, instance: Recorder, session_maker: Callable[[], Session] + ) -> None: + """Migrate all data.""" + with session_scope(session=session_maker()) as session: + if not self.needs_migrate(instance, session): + self.migration_done(instance, session) + return + while not self.migrate_data(instance): + pass + + @database_job_retry_wrapper_method("migrate data", 10) + def migrate_data(self, instance: Recorder) -> bool: + """Migrate some data, returns True if migration is completed.""" + return self._migrate_data(instance) + + +class BaseRunTimeMigration(BaseMigration): + """Base class for run time migrations.""" + + task = MigrationTask + + def queue_migration(self, instance: Recorder, session: Session) -> None: + """Start migration if needed.""" + if self.needs_migrate(instance, session): + instance.queue_task(self.task(self)) + else: + self.migration_done(instance, session) + + @retryable_database_job_method("migrate data") + def migrate_data(self, instance: Recorder) -> bool: + """Migrate some data, returns True if migration is completed.""" + return self._migrate_data(instance) + + +class BaseMigrationWithQuery(BaseMigration): """Base class for run time migrations.""" @abstractmethod @@ -2290,7 +2391,7 @@ class BaseRunTimeMigrationWithQuery(BaseRunTimeMigration): ) -class StatesContextIDMigration(BaseRunTimeMigrationWithQuery): +class StatesContextIDMigration(BaseMigrationWithQuery, BaseOffLineMigration): """Migration to migrate states context_ids to binary format.""" required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION @@ -2333,7 +2434,7 @@ class StatesContextIDMigration(BaseRunTimeMigrationWithQuery): return has_states_context_ids_to_migrate() -class EventsContextIDMigration(BaseRunTimeMigrationWithQuery): +class EventsContextIDMigration(BaseMigrationWithQuery, BaseOffLineMigration): """Migration to migrate events context_ids to binary format.""" required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION @@ -2376,7 +2477,7 @@ class EventsContextIDMigration(BaseRunTimeMigrationWithQuery): return has_events_context_ids_to_migrate() -class EventTypeIDMigration(BaseRunTimeMigrationWithQuery): +class EventTypeIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration): """Migration to migrate event_type to event_type_ids.""" required_schema_version = EVENT_TYPE_IDS_SCHEMA_VERSION @@ -2454,7 +2555,7 @@ class EventTypeIDMigration(BaseRunTimeMigrationWithQuery): return has_event_type_to_migrate() -class EntityIDMigration(BaseRunTimeMigrationWithQuery): +class EntityIDMigration(BaseMigrationWithQuery, BaseRunTimeMigration): """Migration to migrate entity_ids to states_meta.""" required_schema_version = STATES_META_SCHEMA_VERSION @@ -2542,7 +2643,7 @@ class EntityIDMigration(BaseRunTimeMigrationWithQuery): instance.states_meta_manager.active = True with contextlib.suppress(SQLAlchemyError): migrate = EntityIDPostMigration(self.schema_version, self.migration_changes) - migrate.do_migrate(instance, session) + migrate.queue_migration(instance, session) def needs_migrate_query(self) -> StatementLambdaElement: """Check if the data is migrated.""" @@ -2631,7 +2732,7 @@ class EventIDPostMigration(BaseRunTimeMigration): return DataMigrationStatus(needs_migrate=False, migration_done=True) -class EntityIDPostMigration(BaseRunTimeMigrationWithQuery): +class EntityIDPostMigration(BaseMigrationWithQuery, BaseRunTimeMigration): """Migration to remove old entity_id strings from states.""" migration_id = "entity_id_post_migration" @@ -2648,9 +2749,19 @@ class EntityIDPostMigration(BaseRunTimeMigrationWithQuery): return has_used_states_entity_ids() -def _mark_migration_done( - session: Session, migration: type[BaseRunTimeMigration] -) -> None: +NON_LIVE_DATA_MIGRATORS = ( + StatesContextIDMigration, # Introduced in HA Core 2023.4 + EventsContextIDMigration, # Introduced in HA Core 2023.4 +) + +LIVE_DATA_MIGRATORS = ( + EventTypeIDMigration, + EntityIDMigration, + EventIDPostMigration, +) + + +def _mark_migration_done(session: Session, migration: type[BaseMigration]) -> None: """Mark a migration as done in the database.""" session.merge( MigrationChanges( diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index d078c32cb88..a59519ef38d 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -652,13 +652,13 @@ type _FuncOrMethType[**_P, _R] = Callable[_P, _R] def retryable_database_job[**_P]( description: str, ) -> Callable[[_FuncType[_P, bool]], _FuncType[_P, bool]]: - """Try to execute a database job. + """Execute a database job repeatedly until it succeeds. The job should return True if it finished, and False if it needs to be rescheduled. """ def decorator(job: _FuncType[_P, bool]) -> _FuncType[_P, bool]: - return _wrap_func_or_meth(job, description, False) + return _wrap_retryable_database_job_func_or_meth(job, description, False) return decorator @@ -666,18 +666,18 @@ def retryable_database_job[**_P]( def retryable_database_job_method[_Self, **_P]( description: str, ) -> Callable[[_MethType[_Self, _P, bool]], _MethType[_Self, _P, bool]]: - """Try to execute a database job. + """Execute a database job repeatedly until it succeeds. The job should return True if it finished, and False if it needs to be rescheduled. """ def decorator(job: _MethType[_Self, _P, bool]) -> _MethType[_Self, _P, bool]: - return _wrap_func_or_meth(job, description, True) + return _wrap_retryable_database_job_func_or_meth(job, description, True) return decorator -def _wrap_func_or_meth[**_P]( +def _wrap_retryable_database_job_func_or_meth[**_P]( job: _FuncOrMethType[_P, bool], description: str, method: bool ) -> _FuncOrMethType[_P, bool]: recorder_pos = 1 if method else 0 @@ -705,10 +705,10 @@ def _wrap_func_or_meth[**_P]( return wrapper -def database_job_retry_wrapper[**_P]( - description: str, attempts: int = 5 -) -> Callable[[_FuncType[_P, None]], _FuncType[_P, None]]: - """Try to execute a database job multiple times. +def database_job_retry_wrapper[**_P, _R]( + description: str, attempts: int +) -> Callable[[_FuncType[_P, _R]], _FuncType[_P, _R]]: + """Execute a database job repeatedly until it succeeds, at most attempts times. This wrapper handles InnoDB deadlocks and lock timeouts. @@ -717,32 +717,63 @@ def database_job_retry_wrapper[**_P]( """ def decorator( - job: _FuncType[_P, None], - ) -> _FuncType[_P, None]: - @functools.wraps(job) - def wrapper(instance: Recorder, *args: _P.args, **kwargs: _P.kwargs) -> None: - for attempt in range(attempts): - try: - job(instance, *args, **kwargs) - except OperationalError as err: - if attempt == attempts - 1 or not _is_retryable_error( - instance, err - ): - raise - assert isinstance(err.orig, BaseException) # noqa: PT017 - _LOGGER.info( - "%s; %s failed, retrying", err.orig.args[1], description - ) - time.sleep(instance.db_retry_wait) - # Failed with retryable error - else: - return - - return wrapper + job: _FuncType[_P, _R], + ) -> _FuncType[_P, _R]: + return _database_job_retry_wrapper_func_or_meth( + job, description, attempts, False + ) return decorator +def database_job_retry_wrapper_method[_Self, **_P, _R]( + description: str, attempts: int +) -> Callable[[_MethType[_Self, _P, _R]], _MethType[_Self, _P, _R]]: + """Execute a database job repeatedly until it succeeds, at most attempts times. + + This wrapper handles InnoDB deadlocks and lock timeouts. + + This is different from retryable_database_job in that it will retry the job + attempts number of times instead of returning False if the job fails. + """ + + def decorator( + job: _MethType[_Self, _P, _R], + ) -> _MethType[_Self, _P, _R]: + return _database_job_retry_wrapper_func_or_meth( + job, description, attempts, True + ) + + return decorator + + +def _database_job_retry_wrapper_func_or_meth[**_P, _R]( + job: _FuncOrMethType[_P, _R], + description: str, + attempts: int, + method: bool, +) -> _FuncOrMethType[_P, _R]: + recorder_pos = 1 if method else 0 + + @functools.wraps(job) + def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R: + instance: Recorder = args[recorder_pos] # type: ignore[assignment] + for attempt in range(attempts): + try: + return job(*args, **kwargs) + except OperationalError as err: + # Failed with retryable error + if attempt == attempts - 1 or not _is_retryable_error(instance, err): + raise + assert isinstance(err.orig, BaseException) # noqa: PT017 + _LOGGER.info("%s; %s failed, retrying", err.orig.args[1], description) + time.sleep(instance.db_retry_wait) + + raise ValueError("attempts must be a positive integer") + + return wrapper + + def periodic_db_cleanups(instance: Recorder) -> None: """Run any database cleanups that need to happen periodically. diff --git a/tests/components/recorder/auto_repairs/statistics/test_duplicates.py b/tests/components/recorder/auto_repairs/statistics/test_duplicates.py index a2cf41578c7..9e287d13594 100644 --- a/tests/components/recorder/auto_repairs/statistics/test_duplicates.py +++ b/tests/components/recorder/auto_repairs/statistics/test_duplicates.py @@ -189,6 +189,9 @@ async def test_delete_metadata_duplicates( patch.object( recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION ), + patch.object( + recorder.migration, "non_live_data_migration_needed", return_value=False + ), patch( "homeassistant.components.recorder.core.create_engine", new=_create_engine_28, @@ -306,6 +309,9 @@ async def test_delete_metadata_duplicates_many( patch.object( recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION ), + patch.object( + recorder.migration, "non_live_data_migration_needed", return_value=False + ), patch( "homeassistant.components.recorder.core.create_engine", new=_create_engine_28, diff --git a/tests/components/recorder/common.py b/tests/components/recorder/common.py index 18e58d9e572..60168f5e6ef 100644 --- a/tests/components/recorder/common.py +++ b/tests/components/recorder/common.py @@ -445,9 +445,8 @@ def old_db_schema(schema_version_postfix: str) -> Iterator[None]: with ( patch.object(recorder, "db_schema", old_db_schema), - patch.object( - recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION - ), + patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), + patch.object(migration, "non_live_data_migration_needed", return_value=False), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(core, "EventData", old_db_schema.EventData), diff --git a/tests/components/recorder/test_migration_from_schema_32.py b/tests/components/recorder/test_migration_from_schema_32.py index 8a54a752989..80d0e88a544 100644 --- a/tests/components/recorder/test_migration_from_schema_32.py +++ b/tests/components/recorder/test_migration_from_schema_32.py @@ -105,9 +105,8 @@ def db_schema_32(): with ( patch.object(recorder, "db_schema", old_db_schema), - patch.object( - recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION - ), + patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), + patch.object(migration, "non_live_data_migration_needed", return_value=False), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(core, "EventData", old_db_schema.EventData), @@ -120,13 +119,13 @@ def db_schema_32(): yield +@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("enable_migrate_event_context_ids", [True]) -@pytest.mark.usefixtures("db_schema_32") +@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage async def test_migrate_events_context_ids( - hass: HomeAssistant, recorder_mock: Recorder + async_test_recorder: RecorderInstanceGenerator, ) -> None: """Test we can migrate old uuid context ids and ulid context ids to binary format.""" - await async_wait_recording_done(hass) importlib.import_module(SCHEMA_MODULE) old_db_schema = sys.modules[SCHEMA_MODULE] @@ -219,18 +218,28 @@ async def test_migrate_events_context_ids( ) ) - await recorder_mock.async_add_executor_job(_insert_events) + # Create database with old schema + with ( + patch.object(recorder, "db_schema", old_db_schema), + patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), + patch.object(migration.EventsContextIDMigration, "migrate_data"), + patch(CREATE_ENGINE_TARGET, new=_create_engine_test), + ): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await instance.async_add_executor_job(_insert_events) - await async_wait_recording_done(hass) - now = dt_util.utcnow() - expected_ulid_fallback_start = ulid_to_bytes(ulid_at_time(now.timestamp()))[0:6] - await _async_wait_migration_done(hass) + await async_wait_recording_done(hass) + now = dt_util.utcnow() + expected_ulid_fallback_start = ulid_to_bytes(ulid_at_time(now.timestamp()))[ + 0:6 + ] + await _async_wait_migration_done(hass) - with freeze_time(now): - # This is a threadsafe way to add a task to the recorder - migrator = migration.EventsContextIDMigration(None, None) - recorder_mock.queue_task(migrator.task(migrator)) - await _async_wait_migration_done(hass) + await hass.async_stop() + await hass.async_block_till_done() def _object_as_dict(obj): return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs} @@ -256,7 +265,34 @@ async def test_migrate_events_context_ids( assert len(events) == 6 return {event.event_type: _object_as_dict(event) for event in events} - events_by_type = await recorder_mock.async_add_executor_job(_fetch_migrated_events) + # Run again with new schema, let migration run + with freeze_time(now): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + instance.recorder_and_worker_thread_ids.add(threading.get_ident()) + + await hass.async_block_till_done() + await async_wait_recording_done(hass) + await async_wait_recording_done(hass) + + events_by_type = await instance.async_add_executor_job( + _fetch_migrated_events + ) + + migration_changes = await instance.async_add_executor_job( + _get_migration_id, hass + ) + + # Check the index which will be removed by the migrator no longer exists + with session_scope(hass=hass) as session: + assert ( + get_index_by_name(session, "events", "ix_events_context_id") is None + ) + + await hass.async_stop() + await hass.async_block_till_done() old_uuid_context_id_event = events_by_type["old_uuid_context_id_event"] assert old_uuid_context_id_event["context_id"] is None @@ -327,18 +363,11 @@ async def test_migrate_events_context_ids( event_with_garbage_context_id_no_time_fired_ts["context_parent_id_bin"] is None ) - migration_changes = await recorder_mock.async_add_executor_job( - _get_migration_id, hass - ) assert ( migration_changes[migration.EventsContextIDMigration.migration_id] == migration.EventsContextIDMigration.migration_version ) - # Check the index which will be removed by the migrator no longer exists - with session_scope(hass=hass) as session: - assert get_index_by_name(session, "events", "ix_events_context_id") is None - @pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("enable_migrate_event_context_ids", [True]) @@ -448,13 +477,13 @@ async def test_finish_migrate_events_context_ids( await hass.async_block_till_done() +@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("enable_migrate_state_context_ids", [True]) -@pytest.mark.usefixtures("db_schema_32") +@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage async def test_migrate_states_context_ids( - hass: HomeAssistant, recorder_mock: Recorder + async_test_recorder: RecorderInstanceGenerator, ) -> None: """Test we can migrate old uuid context ids and ulid context ids to binary format.""" - await async_wait_recording_done(hass) importlib.import_module(SCHEMA_MODULE) old_db_schema = sys.modules[SCHEMA_MODULE] @@ -529,12 +558,24 @@ async def test_migrate_states_context_ids( ) ) - await recorder_mock.async_add_executor_job(_insert_states) + # Create database with old schema + with ( + patch.object(recorder, "db_schema", old_db_schema), + patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), + patch.object(migration.StatesContextIDMigration, "migrate_data"), + patch(CREATE_ENGINE_TARGET, new=_create_engine_test), + ): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await instance.async_add_executor_job(_insert_states) - await async_wait_recording_done(hass) - migrator = migration.StatesContextIDMigration(None, None) - recorder_mock.queue_task(migrator.task(migrator)) - await _async_wait_migration_done(hass) + await async_wait_recording_done(hass) + await _async_wait_migration_done(hass) + + await hass.async_stop() + await hass.async_block_till_done() def _object_as_dict(obj): return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs} @@ -560,9 +601,31 @@ async def test_migrate_states_context_ids( assert len(events) == 6 return {state.entity_id: _object_as_dict(state) for state in events} - states_by_entity_id = await recorder_mock.async_add_executor_job( - _fetch_migrated_states - ) + # Run again with new schema, let migration run + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + instance.recorder_and_worker_thread_ids.add(threading.get_ident()) + + await hass.async_block_till_done() + await async_wait_recording_done(hass) + await async_wait_recording_done(hass) + + states_by_entity_id = await instance.async_add_executor_job( + _fetch_migrated_states + ) + + migration_changes = await instance.async_add_executor_job( + _get_migration_id, hass + ) + + # Check the index which will be removed by the migrator no longer exists + with session_scope(hass=hass) as session: + assert get_index_by_name(session, "states", "ix_states_context_id") is None + + await hass.async_stop() + await hass.async_block_till_done() old_uuid_context_id = states_by_entity_id["state.old_uuid_context_id"] assert old_uuid_context_id["context_id"] is None @@ -637,18 +700,11 @@ async def test_migrate_states_context_ids( == b"\n\xe2\x97\x99\xeeNOE\x81\x16\xf5\x82\xd7\xd3\xeee" ) - migration_changes = await recorder_mock.async_add_executor_job( - _get_migration_id, hass - ) assert ( migration_changes[migration.StatesContextIDMigration.migration_id] == migration.StatesContextIDMigration.migration_version ) - # Check the index which will be removed by the migrator no longer exists - with session_scope(hass=hass) as session: - assert get_index_by_name(session, "states", "ix_states_context_id") is None - @pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("enable_migrate_state_context_ids", [True]) @@ -1763,6 +1819,7 @@ async def test_migrate_times( with ( patch.object(recorder, "db_schema", old_db_schema), patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), + patch.object(migration, "non_live_data_migration_needed", return_value=False), patch(CREATE_ENGINE_TARGET, new=_create_engine_test), ): async with ( diff --git a/tests/components/recorder/test_migration_run_time_migrations_remember.py b/tests/components/recorder/test_migration_run_time_migrations_remember.py index 880e4d6d61e..93fa16b8364 100644 --- a/tests/components/recorder/test_migration_run_time_migrations_remember.py +++ b/tests/components/recorder/test_migration_run_time_migrations_remember.py @@ -94,9 +94,8 @@ async def test_migration_changes_prevent_trying_to_migrate_again( # Start with db schema that needs migration (version 32) with ( patch.object(recorder, "db_schema", old_db_schema), - patch.object( - recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION - ), + patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), + patch.object(migration, "non_live_data_migration_needed", return_value=False), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object(core, "EventData", old_db_schema.EventData), diff --git a/tests/components/recorder/test_statistics_v23_migration.py b/tests/components/recorder/test_statistics_v23_migration.py index 53c59635e8c..1f9be0cabee 100644 --- a/tests/components/recorder/test_statistics_v23_migration.py +++ b/tests/components/recorder/test_statistics_v23_migration.py @@ -168,6 +168,9 @@ async def test_delete_duplicates( patch.object( recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION ), + patch.object( + recorder.migration, "non_live_data_migration_needed", return_value=False + ), patch( CREATE_ENGINE_TARGET, new=partial( @@ -352,6 +355,9 @@ async def test_delete_duplicates_many( patch.object( recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION ), + patch.object( + recorder.migration, "non_live_data_migration_needed", return_value=False + ), patch( CREATE_ENGINE_TARGET, new=partial( @@ -515,6 +521,9 @@ async def test_delete_duplicates_non_identical( patch.object( recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION ), + patch.object( + recorder.migration, "non_live_data_migration_needed", return_value=False + ), patch( CREATE_ENGINE_TARGET, new=partial( @@ -638,6 +647,9 @@ async def test_delete_duplicates_short_term( patch.object( recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION ), + patch.object( + recorder.migration, "non_live_data_migration_needed", return_value=False + ), patch( CREATE_ENGINE_TARGET, new=partial( diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index ad68e415df5..4904bdecc4d 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -1134,19 +1134,32 @@ Retryable = OperationalError(None, None, BaseException(RETRYABLE_MYSQL_ERRORS[0] @pytest.mark.parametrize( - ("side_effect", "dialect", "expected_result", "num_calls"), + ("side_effect", "dialect", "retval", "expected_result", "num_calls"), [ - (None, SupportedDialect.MYSQL, does_not_raise(), 1), - (ValueError, SupportedDialect.MYSQL, pytest.raises(ValueError), 1), - (NonRetryable, SupportedDialect.MYSQL, pytest.raises(OperationalError), 1), - (Retryable, SupportedDialect.MYSQL, pytest.raises(OperationalError), 5), - (NonRetryable, SupportedDialect.SQLITE, pytest.raises(OperationalError), 1), - (Retryable, SupportedDialect.SQLITE, pytest.raises(OperationalError), 1), + (None, SupportedDialect.MYSQL, None, does_not_raise(), 1), + (ValueError, SupportedDialect.MYSQL, None, pytest.raises(ValueError), 1), + ( + NonRetryable, + SupportedDialect.MYSQL, + None, + pytest.raises(OperationalError), + 1, + ), + (Retryable, SupportedDialect.MYSQL, None, pytest.raises(OperationalError), 5), + ( + NonRetryable, + SupportedDialect.SQLITE, + None, + pytest.raises(OperationalError), + 1, + ), + (Retryable, SupportedDialect.SQLITE, None, pytest.raises(OperationalError), 1), ], ) def test_database_job_retry_wrapper( side_effect: Any, dialect: str, + retval: Any, expected_result: AbstractContextManager, num_calls: int, ) -> None: @@ -1157,12 +1170,13 @@ def test_database_job_retry_wrapper( instance.engine.dialect.name = dialect mock_job = Mock(side_effect=side_effect) - @database_job_retry_wrapper(description="test") + @database_job_retry_wrapper("test", 5) def job(instance, *args, **kwargs) -> None: mock_job() + return retval with expected_result: - job(instance) + assert job(instance) == retval assert len(mock_job.mock_calls) == num_calls diff --git a/tests/components/recorder/test_v32_migration.py b/tests/components/recorder/test_v32_migration.py index 9a616959174..d59486b61f0 100644 --- a/tests/components/recorder/test_v32_migration.py +++ b/tests/components/recorder/test_v32_migration.py @@ -110,6 +110,7 @@ async def test_migrate_times( with ( patch.object(recorder, "db_schema", old_db_schema), patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), + patch.object(migration, "non_live_data_migration_needed", return_value=False), patch.object(migration, "post_migrate_entity_ids", return_value=False), patch.object(migration.EventsContextIDMigration, "migrate_data"), patch.object(migration.StatesContextIDMigration, "migrate_data"), @@ -266,6 +267,7 @@ async def test_migrate_can_resume_entity_id_post_migration( patch.object(recorder, "db_schema", old_db_schema), patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), patch.object(migration.EventIDPostMigration, "migrate_data"), + patch.object(migration, "non_live_data_migration_needed", return_value=False), patch.object(migration, "post_migrate_entity_ids", return_value=False), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "EventTypes", old_db_schema.EventTypes), @@ -385,6 +387,7 @@ async def test_migrate_can_resume_ix_states_event_id_removed( patch.object(recorder, "db_schema", old_db_schema), patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), patch.object(migration.EventIDPostMigration, "migrate_data"), + patch.object(migration, "non_live_data_migration_needed", return_value=False), patch.object(migration, "post_migrate_entity_ids", return_value=False), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "EventTypes", old_db_schema.EventTypes), @@ -517,6 +520,7 @@ async def test_out_of_disk_space_while_rebuild_states_table( patch.object(recorder, "db_schema", old_db_schema), patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), patch.object(migration.EventIDPostMigration, "migrate_data"), + patch.object(migration, "non_live_data_migration_needed", return_value=False), patch.object(migration, "post_migrate_entity_ids", return_value=False), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "EventTypes", old_db_schema.EventTypes), @@ -694,6 +698,7 @@ async def test_out_of_disk_space_while_removing_foreign_key( patch.object(recorder, "db_schema", old_db_schema), patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), patch.object(migration.EventIDPostMigration, "migrate_data"), + patch.object(migration, "non_live_data_migration_needed", return_value=False), patch.object(migration, "post_migrate_entity_ids", return_value=False), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(core, "EventTypes", old_db_schema.EventTypes),