From 268425b5e3db07525635b06e3b0c1e2863ae513f Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 22 Oct 2023 17:34:43 -1000 Subject: [PATCH] Recover from previously failed statistics migrations (#101781) * Handle statistics columns being unmigrated from previous downgrades If the user downgraded HA from 2023.3.x to an older version without restoring the database and they upgrade again with the same database they will have unmigrated statistics columns since we only migrate them once. As its expensive to check, we do not want to check every time at startup, so we will only do this one more time since the risk that someone will downgrade to an older version is very low at this point. * add guard to sqlite to prevent re-migrate * test * move test to insert with old schema * use helper * normalize timestamps * remove * add check * add fallback migration * add fallback migration * commit * remove useless logging * remove useless logging * do the other columns at the same time * coverage * dry * comment * Update tests/components/recorder/test_migration_from_schema_32.py --- .../components/recorder/db_schema.py | 2 +- .../components/recorder/migration.py | 130 +++- homeassistant/components/recorder/queries.py | 94 +++ tests/components/recorder/common.py | 15 +- .../recorder/test_migration_from_schema_32.py | 587 +++++++++++++++++- .../recorder/test_purge_v32_schema.py | 44 +- 6 files changed, 819 insertions(+), 53 deletions(-) diff --git a/homeassistant/components/recorder/db_schema.py b/homeassistant/components/recorder/db_schema.py index 17e34af1e11..06c8cf68903 100644 --- a/homeassistant/components/recorder/db_schema.py +++ b/homeassistant/components/recorder/db_schema.py @@ -68,7 +68,7 @@ class Base(DeclarativeBase): """Base class for tables.""" -SCHEMA_VERSION = 41 +SCHEMA_VERSION = 42 _LOGGER = logging.getLogger(__name__) diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 7655002b45f..8808ed2fd2b 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -68,13 +68,20 @@ from .db_schema import ( StatisticsShortTerm, ) from .models import process_timestamp +from .models.time import datetime_to_timestamp_or_none from .queries import ( batch_cleanup_entity_ids, + delete_duplicate_short_term_statistics_row, + delete_duplicate_statistics_row, find_entity_ids_to_migrate, find_event_type_to_migrate, find_events_context_ids_to_migrate, find_states_context_ids_to_migrate, + find_unmigrated_short_term_statistics_rows, + find_unmigrated_statistics_rows, has_used_states_event_ids, + migrate_single_short_term_statistics_row_to_timestamp, + migrate_single_statistics_row_to_timestamp, ) from .statistics import get_start_time from .tasks import ( @@ -950,26 +957,9 @@ def _apply_update( # noqa: C901 "statistics_short_term", "ix_statistics_short_term_statistic_id_start_ts", ) - try: - _migrate_statistics_columns_to_timestamp(instance, session_maker, engine) - except IntegrityError as ex: - _LOGGER.error( - "Statistics table contains duplicate entries: %s; " - "Cleaning up duplicates and trying again; " - "This will take a while; " - "Please be patient!", - ex, - ) - # There may be duplicated statistics entries, delete duplicates - # and try again - with session_scope(session=session_maker()) as session: - delete_statistics_duplicates(instance, hass, session) - _migrate_statistics_columns_to_timestamp(instance, session_maker, engine) - # Log at error level to ensure the user sees this message in the log - # since we logged the error above. - _LOGGER.error( - "Statistics migration successfully recovered after statistics table duplicate cleanup" - ) + _migrate_statistics_columns_to_timestamp_removing_duplicates( + hass, instance, session_maker, engine + ) elif new_version == 35: # Migration is done in two steps to ensure we can start using # the new columns before we wipe the old ones. @@ -1060,10 +1050,55 @@ def _apply_update( # noqa: C901 elif new_version == 41: _create_index(session_maker, "event_types", "ix_event_types_event_type") _create_index(session_maker, "states_meta", "ix_states_meta_entity_id") + elif new_version == 42: + # If the user had a previously failed migration, or they + # downgraded from 2023.3.x to an older version we will have + # unmigrated statistics columns so we want to clean this up + # one last time since compiling the statistics will be slow + # or fail if we have unmigrated statistics. + _migrate_statistics_columns_to_timestamp_removing_duplicates( + hass, instance, session_maker, engine + ) else: raise ValueError(f"No schema migration defined for version {new_version}") +def _migrate_statistics_columns_to_timestamp_removing_duplicates( + hass: HomeAssistant, + instance: Recorder, + session_maker: Callable[[], Session], + engine: Engine, +) -> None: + """Migrate statistics columns to timestamp or cleanup duplicates.""" + try: + _migrate_statistics_columns_to_timestamp(instance, session_maker, engine) + except IntegrityError as ex: + _LOGGER.error( + "Statistics table contains duplicate entries: %s; " + "Cleaning up duplicates and trying again; " + "This will take a while; " + "Please be patient!", + ex, + ) + # There may be duplicated statistics entries, delete duplicates + # and try again + with session_scope(session=session_maker()) as session: + delete_statistics_duplicates(instance, hass, session) + try: + _migrate_statistics_columns_to_timestamp(instance, session_maker, engine) + except IntegrityError: + _LOGGER.warning( + "Statistics table still contains duplicate entries after cleanup; " + "Falling back to a one by one migration" + ) + _migrate_statistics_columns_to_timestamp_one_by_one(instance, session_maker) + # Log at error level to ensure the user sees this message in the log + # since we logged the error above. + _LOGGER.error( + "Statistics migration successfully recovered after statistics table duplicate cleanup" + ) + + def _correct_table_character_set_and_collation( table: str, session_maker: Callable[[], Session], @@ -1269,6 +1304,59 @@ def _migrate_columns_to_timestamp( ) +@database_job_retry_wrapper("Migrate statistics columns to timestamp one by one", 3) +def _migrate_statistics_columns_to_timestamp_one_by_one( + instance: Recorder, session_maker: Callable[[], Session] +) -> None: + """Migrate statistics columns to use timestamp on by one. + + If something manually inserted data into the statistics table + in the past it may have inserted duplicate rows. + + Before we had the unique index on (statistic_id, start) this + the data could have been inserted without any errors and we + could end up with duplicate rows that go undetected (even by + our current duplicate cleanup code) until we try to migrate the + data to use timestamps. + + This will migrate the data one by one to ensure we do not hit any + duplicate rows, and remove the duplicate rows as they are found. + """ + for find_func, migrate_func, delete_func in ( + ( + find_unmigrated_statistics_rows, + migrate_single_statistics_row_to_timestamp, + delete_duplicate_statistics_row, + ), + ( + find_unmigrated_short_term_statistics_rows, + migrate_single_short_term_statistics_row_to_timestamp, + delete_duplicate_short_term_statistics_row, + ), + ): + with session_scope(session=session_maker()) as session: + while stats := session.execute(find_func(instance.max_bind_vars)).all(): + for statistic_id, start, created, last_reset in stats: + start_ts = datetime_to_timestamp_or_none(process_timestamp(start)) + created_ts = datetime_to_timestamp_or_none( + process_timestamp(created) + ) + last_reset_ts = datetime_to_timestamp_or_none( + process_timestamp(last_reset) + ) + try: + session.execute( + migrate_func( + statistic_id, start_ts, created_ts, last_reset_ts + ) + ) + except IntegrityError: + # This can happen if we have duplicate rows + # in the statistics table. + session.execute(delete_func(statistic_id)) + session.commit() + + @database_job_retry_wrapper("Migrate statistics columns to timestamp", 3) def _migrate_statistics_columns_to_timestamp( instance: Recorder, session_maker: Callable[[], Session], engine: Engine @@ -1292,7 +1380,7 @@ def _migrate_statistics_columns_to_timestamp( f"created_ts=strftime('%s',created) + " "cast(substr(created,-7) AS FLOAT), " f"last_reset_ts=strftime('%s',last_reset) + " - "cast(substr(last_reset,-7) AS FLOAT);" + "cast(substr(last_reset,-7) AS FLOAT) where start_ts is NULL;" ) ) elif engine.dialect.name == SupportedDialect.MYSQL: diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index d44094878c2..c03057b31b2 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -16,6 +16,7 @@ from .db_schema import ( StateAttributes, States, StatesMeta, + Statistics, StatisticsRuns, StatisticsShortTerm, ) @@ -860,3 +861,96 @@ def delete_states_meta_rows(metadata_ids: Iterable[int]) -> StatementLambdaEleme .where(StatesMeta.metadata_id.in_(metadata_ids)) .execution_options(synchronize_session=False) ) + + +def find_unmigrated_short_term_statistics_rows( + max_bind_vars: int, +) -> StatementLambdaElement: + """Find unmigrated short term statistics rows.""" + return lambda_stmt( + lambda: select( + StatisticsShortTerm.id, + StatisticsShortTerm.start, + StatisticsShortTerm.created, + StatisticsShortTerm.last_reset, + ) + .filter(StatisticsShortTerm.start_ts.is_(None)) + .filter(StatisticsShortTerm.start.isnot(None)) + .limit(max_bind_vars) + ) + + +def find_unmigrated_statistics_rows(max_bind_vars: int) -> StatementLambdaElement: + """Find unmigrated statistics rows.""" + return lambda_stmt( + lambda: select( + Statistics.id, Statistics.start, Statistics.created, Statistics.last_reset + ) + .filter(Statistics.start_ts.is_(None)) + .filter(Statistics.start.isnot(None)) + .limit(max_bind_vars) + ) + + +def migrate_single_short_term_statistics_row_to_timestamp( + statistic_id: int, + start_ts: float | None, + created_ts: float | None, + last_reset_ts: float | None, +) -> StatementLambdaElement: + """Migrate a single short term statistics row to timestamp.""" + return lambda_stmt( + lambda: update(StatisticsShortTerm) + .where(StatisticsShortTerm.id == statistic_id) + .values( + start_ts=start_ts, + start=None, + created_ts=created_ts, + created=None, + last_reset_ts=last_reset_ts, + last_reset=None, + ) + .execution_options(synchronize_session=False) + ) + + +def migrate_single_statistics_row_to_timestamp( + statistic_id: int, + start_ts: float | None, + created_ts: float | None, + last_reset_ts: float | None, +) -> StatementLambdaElement: + """Migrate a single statistics row to timestamp.""" + return lambda_stmt( + lambda: update(Statistics) + .where(Statistics.id == statistic_id) + .values( + start_ts=start_ts, + start=None, + created_ts=created_ts, + created=None, + last_reset_ts=last_reset_ts, + last_reset=None, + ) + .execution_options(synchronize_session=False) + ) + + +def delete_duplicate_short_term_statistics_row( + statistic_id: int, +) -> StatementLambdaElement: + """Delete a single duplicate short term statistics row.""" + return lambda_stmt( + lambda: delete(StatisticsShortTerm) + .where(StatisticsShortTerm.id == statistic_id) + .execution_options(synchronize_session=False) + ) + + +def delete_duplicate_statistics_row(statistic_id: int) -> StatementLambdaElement: + """Delete a single duplicate statistics row.""" + return lambda_stmt( + lambda: delete(Statistics) + .where(Statistics.id == statistic_id) + .execution_options(synchronize_session=False) + ) diff --git a/tests/components/recorder/common.py b/tests/components/recorder/common.py index 521be81c89b..a982eeb39be 100644 --- a/tests/components/recorder/common.py +++ b/tests/components/recorder/common.py @@ -11,7 +11,7 @@ import importlib import sys import time from typing import Any, Literal, cast -from unittest.mock import patch, sentinel +from unittest.mock import MagicMock, patch, sentinel from freezegun import freeze_time from sqlalchemy import create_engine @@ -430,3 +430,16 @@ def old_db_schema(schema_version_postfix: str) -> Iterator[None]: ), ): yield + + +async def async_attach_db_engine(hass: HomeAssistant) -> None: + """Attach a database engine to the recorder.""" + instance = recorder.get_instance(hass) + + def _mock_setup_recorder_connection(): + with instance.engine.connect() as connection: + instance._setup_recorder_connection( + connection._dbapi_connection, MagicMock() + ) + + await instance.async_add_executor_job(_mock_setup_recorder_connection) diff --git a/tests/components/recorder/test_migration_from_schema_32.py b/tests/components/recorder/test_migration_from_schema_32.py index e007d2408dd..852419559b2 100644 --- a/tests/components/recorder/test_migration_from_schema_32.py +++ b/tests/components/recorder/test_migration_from_schema_32.py @@ -1,22 +1,26 @@ """The tests for the recorder filter matching the EntityFilter component.""" +import datetime import importlib import sys +from typing import Any from unittest.mock import patch import uuid from freezegun import freeze_time import pytest from sqlalchemy import create_engine, inspect +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from homeassistant.components import recorder -from homeassistant.components.recorder import core, migration, statistics +from homeassistant.components.recorder import core, db_schema, migration, statistics from homeassistant.components.recorder.db_schema import ( Events, EventTypes, States, StatesMeta, ) +from homeassistant.components.recorder.models import process_timestamp from homeassistant.components.recorder.queries import select_event_type_ids from homeassistant.components.recorder.tasks import ( EntityIDMigrationTask, @@ -30,7 +34,11 @@ from homeassistant.core import HomeAssistant import homeassistant.util.dt as dt_util from homeassistant.util.ulid import bytes_to_ulid, ulid_at_time, ulid_to_bytes -from .common import async_recorder_block_till_done, async_wait_recording_done +from .common import ( + async_attach_db_engine, + async_recorder_block_till_done, + async_wait_recording_done, +) from tests.typing import RecorderInstanceGenerator @@ -844,3 +852,578 @@ async def test_migrate_null_event_type_ids( events_by_type = await instance.async_add_executor_job(_fetch_migrated_events) assert len(events_by_type["event_type_one"]) == 2 assert len(events_by_type[migration._EMPTY_EVENT_TYPE]) == 1000 + + +async def test_stats_timestamp_conversion_is_reentrant( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, +) -> None: + """Test stats migration is reentrant.""" + instance = await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + await async_attach_db_engine(hass) + importlib.import_module(SCHEMA_MODULE) + old_db_schema = sys.modules[SCHEMA_MODULE] + now = dt_util.utcnow() + one_year_ago = now - datetime.timedelta(days=365) + six_months_ago = now - datetime.timedelta(days=180) + one_month_ago = now - datetime.timedelta(days=30) + + def _do_migration(): + migration._migrate_statistics_columns_to_timestamp_removing_duplicates( + hass, instance, instance.get_session, instance.engine + ) + + def _insert_fake_metadata(): + with session_scope(hass=hass) as session: + session.add( + old_db_schema.StatisticsMeta( + id=1000, + statistic_id="test", + source="test", + unit_of_measurement="test", + has_mean=True, + has_sum=True, + name="1", + ) + ) + + def _insert_pre_timestamp_stat(date_time: datetime) -> None: + with session_scope(hass=hass) as session: + session.add( + old_db_schema.StatisticsShortTerm( + metadata_id=1000, + created=date_time, + created_ts=None, + start=date_time, + start_ts=None, + last_reset=date_time, + last_reset_ts=None, + state="1", + ) + ) + + def _insert_post_timestamp_stat(date_time: datetime) -> None: + with session_scope(hass=hass) as session: + session.add( + db_schema.StatisticsShortTerm( + metadata_id=1000, + created=None, + created_ts=date_time.timestamp(), + start=None, + start_ts=date_time.timestamp(), + last_reset=None, + last_reset_ts=date_time.timestamp(), + state="1", + ) + ) + + def _get_all_short_term_stats() -> list[dict[str, Any]]: + with session_scope(hass=hass) as session: + results = [] + for result in ( + session.query(old_db_schema.StatisticsShortTerm) + .where(old_db_schema.StatisticsShortTerm.metadata_id == 1000) + .all() + ): + results.append( + { + field.name: getattr(result, field.name) + for field in old_db_schema.StatisticsShortTerm.__table__.c + } + ) + return sorted(results, key=lambda row: row["start_ts"]) + + # Do not optimize this block, its intentionally written to interleave + # with the migration + await hass.async_add_executor_job(_insert_fake_metadata) + await async_wait_recording_done(hass) + await hass.async_add_executor_job(_insert_pre_timestamp_stat, one_year_ago) + await async_wait_recording_done(hass) + await hass.async_add_executor_job(_do_migration) + await hass.async_add_executor_job(_insert_post_timestamp_stat, six_months_ago) + await async_wait_recording_done(hass) + await hass.async_add_executor_job(_do_migration) + await hass.async_add_executor_job(_insert_pre_timestamp_stat, one_month_ago) + await async_wait_recording_done(hass) + await hass.async_add_executor_job(_do_migration) + + final_result = await hass.async_add_executor_job(_get_all_short_term_stats) + # Normalize timestamps since each engine returns them differently + for row in final_result: + if row["created"] is not None: + row["created"] = process_timestamp(row["created"]).replace(tzinfo=None) + if row["start"] is not None: + row["start"] = process_timestamp(row["start"]).replace(tzinfo=None) + if row["last_reset"] is not None: + row["last_reset"] = process_timestamp(row["last_reset"]).replace( + tzinfo=None + ) + + assert final_result == [ + { + "created": process_timestamp(one_year_ago).replace(tzinfo=None), + "created_ts": one_year_ago.timestamp(), + "id": 1, + "last_reset": process_timestamp(one_year_ago).replace(tzinfo=None), + "last_reset_ts": one_year_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": process_timestamp(one_year_ago).replace(tzinfo=None), + "start_ts": one_year_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": six_months_ago.timestamp(), + "id": 2, + "last_reset": None, + "last_reset_ts": six_months_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": six_months_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": process_timestamp(one_month_ago).replace(tzinfo=None), + "created_ts": one_month_ago.timestamp(), + "id": 3, + "last_reset": process_timestamp(one_month_ago).replace(tzinfo=None), + "last_reset_ts": one_month_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": process_timestamp(one_month_ago).replace(tzinfo=None), + "start_ts": one_month_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + ] + + +async def test_stats_timestamp_with_one_by_one( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, +) -> None: + """Test stats migration with one by one.""" + instance = await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + await async_attach_db_engine(hass) + importlib.import_module(SCHEMA_MODULE) + old_db_schema = sys.modules[SCHEMA_MODULE] + now = dt_util.utcnow() + one_year_ago = now - datetime.timedelta(days=365) + six_months_ago = now - datetime.timedelta(days=180) + one_month_ago = now - datetime.timedelta(days=30) + + def _do_migration(): + with patch.object( + migration, + "_migrate_statistics_columns_to_timestamp", + side_effect=IntegrityError("test", "test", "test"), + ): + migration._migrate_statistics_columns_to_timestamp_removing_duplicates( + hass, instance, instance.get_session, instance.engine + ) + + def _insert_fake_metadata(): + with session_scope(hass=hass) as session: + session.add( + old_db_schema.StatisticsMeta( + id=1000, + statistic_id="test", + source="test", + unit_of_measurement="test", + has_mean=True, + has_sum=True, + name="1", + ) + ) + + def _insert_pre_timestamp_stat(date_time: datetime) -> None: + with session_scope(hass=hass) as session: + session.add_all( + ( + old_db_schema.StatisticsShortTerm( + metadata_id=1000, + created=date_time, + created_ts=None, + start=date_time, + start_ts=None, + last_reset=date_time, + last_reset_ts=None, + state="1", + ), + old_db_schema.Statistics( + metadata_id=1000, + created=date_time, + created_ts=None, + start=date_time, + start_ts=None, + last_reset=date_time, + last_reset_ts=None, + state="1", + ), + ) + ) + + def _insert_post_timestamp_stat(date_time: datetime) -> None: + with session_scope(hass=hass) as session: + session.add_all( + ( + db_schema.StatisticsShortTerm( + metadata_id=1000, + created=None, + created_ts=date_time.timestamp(), + start=None, + start_ts=date_time.timestamp(), + last_reset=None, + last_reset_ts=date_time.timestamp(), + state="1", + ), + db_schema.Statistics( + metadata_id=1000, + created=None, + created_ts=date_time.timestamp(), + start=None, + start_ts=date_time.timestamp(), + last_reset=None, + last_reset_ts=date_time.timestamp(), + state="1", + ), + ) + ) + + def _get_all_stats(table: old_db_schema.StatisticsBase) -> list[dict[str, Any]]: + """Get all stats from a table.""" + with session_scope(hass=hass) as session: + results = [] + for result in session.query(table).where(table.metadata_id == 1000).all(): + results.append( + { + field.name: getattr(result, field.name) + for field in table.__table__.c + } + ) + return sorted(results, key=lambda row: row["start_ts"]) + + def _insert_and_do_migration(): + _insert_fake_metadata() + _insert_pre_timestamp_stat(one_year_ago) + _insert_post_timestamp_stat(six_months_ago) + _insert_pre_timestamp_stat(one_month_ago) + _do_migration() + + await hass.async_add_executor_job(_insert_and_do_migration) + final_short_term_result = await hass.async_add_executor_job( + _get_all_stats, old_db_schema.StatisticsShortTerm + ) + final_short_term_result = sorted( + final_short_term_result, key=lambda row: row["start_ts"] + ) + + assert final_short_term_result == [ + { + "created": None, + "created_ts": one_year_ago.timestamp(), + "id": 1, + "last_reset": None, + "last_reset_ts": one_year_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": one_year_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": six_months_ago.timestamp(), + "id": 2, + "last_reset": None, + "last_reset_ts": six_months_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": six_months_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": one_month_ago.timestamp(), + "id": 3, + "last_reset": None, + "last_reset_ts": one_month_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": one_month_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + ] + + final_result = await hass.async_add_executor_job( + _get_all_stats, old_db_schema.Statistics + ) + final_result = sorted(final_result, key=lambda row: row["start_ts"]) + + assert final_result == [ + { + "created": None, + "created_ts": one_year_ago.timestamp(), + "id": 1, + "last_reset": None, + "last_reset_ts": one_year_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": one_year_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": six_months_ago.timestamp(), + "id": 2, + "last_reset": None, + "last_reset_ts": six_months_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": six_months_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": one_month_ago.timestamp(), + "id": 3, + "last_reset": None, + "last_reset_ts": one_month_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": one_month_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + ] + + +async def test_stats_timestamp_with_one_by_one_removes_duplicates( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, +) -> None: + """Test stats migration with one by one removes duplicates.""" + instance = await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + await async_attach_db_engine(hass) + importlib.import_module(SCHEMA_MODULE) + old_db_schema = sys.modules[SCHEMA_MODULE] + now = dt_util.utcnow() + one_year_ago = now - datetime.timedelta(days=365) + six_months_ago = now - datetime.timedelta(days=180) + one_month_ago = now - datetime.timedelta(days=30) + + def _do_migration(): + with patch.object( + migration, + "_migrate_statistics_columns_to_timestamp", + side_effect=IntegrityError("test", "test", "test"), + ), patch.object( + migration, + "migrate_single_statistics_row_to_timestamp", + side_effect=IntegrityError("test", "test", "test"), + ): + migration._migrate_statistics_columns_to_timestamp_removing_duplicates( + hass, instance, instance.get_session, instance.engine + ) + + def _insert_fake_metadata(): + with session_scope(hass=hass) as session: + session.add( + old_db_schema.StatisticsMeta( + id=1000, + statistic_id="test", + source="test", + unit_of_measurement="test", + has_mean=True, + has_sum=True, + name="1", + ) + ) + + def _insert_pre_timestamp_stat(date_time: datetime) -> None: + with session_scope(hass=hass) as session: + session.add_all( + ( + old_db_schema.StatisticsShortTerm( + metadata_id=1000, + created=date_time, + created_ts=None, + start=date_time, + start_ts=None, + last_reset=date_time, + last_reset_ts=None, + state="1", + ), + old_db_schema.Statistics( + metadata_id=1000, + created=date_time, + created_ts=None, + start=date_time, + start_ts=None, + last_reset=date_time, + last_reset_ts=None, + state="1", + ), + ) + ) + + def _insert_post_timestamp_stat(date_time: datetime) -> None: + with session_scope(hass=hass) as session: + session.add_all( + ( + db_schema.StatisticsShortTerm( + metadata_id=1000, + created=None, + created_ts=date_time.timestamp(), + start=None, + start_ts=date_time.timestamp(), + last_reset=None, + last_reset_ts=date_time.timestamp(), + state="1", + ), + db_schema.Statistics( + metadata_id=1000, + created=None, + created_ts=date_time.timestamp(), + start=None, + start_ts=date_time.timestamp(), + last_reset=None, + last_reset_ts=date_time.timestamp(), + state="1", + ), + ) + ) + + def _get_all_stats(table: old_db_schema.StatisticsBase) -> list[dict[str, Any]]: + """Get all stats from a table.""" + with session_scope(hass=hass) as session: + results = [] + for result in session.query(table).where(table.metadata_id == 1000).all(): + results.append( + { + field.name: getattr(result, field.name) + for field in table.__table__.c + } + ) + return sorted(results, key=lambda row: row["start_ts"]) + + def _insert_and_do_migration(): + _insert_fake_metadata() + _insert_pre_timestamp_stat(one_year_ago) + _insert_post_timestamp_stat(six_months_ago) + _insert_pre_timestamp_stat(one_month_ago) + _do_migration() + + await hass.async_add_executor_job(_insert_and_do_migration) + final_short_term_result = await hass.async_add_executor_job( + _get_all_stats, old_db_schema.StatisticsShortTerm + ) + final_short_term_result = sorted( + final_short_term_result, key=lambda row: row["start_ts"] + ) + + assert final_short_term_result == [ + { + "created": None, + "created_ts": one_year_ago.timestamp(), + "id": 1, + "last_reset": None, + "last_reset_ts": one_year_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": one_year_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": six_months_ago.timestamp(), + "id": 2, + "last_reset": None, + "last_reset_ts": six_months_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": six_months_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": one_month_ago.timestamp(), + "id": 3, + "last_reset": None, + "last_reset_ts": one_month_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": one_month_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + ] + + # All the duplicates should have been removed but + # the non-duplicates should still be there + final_result = await hass.async_add_executor_job( + _get_all_stats, old_db_schema.Statistics + ) + assert final_result == [ + { + "created": None, + "created_ts": six_months_ago.timestamp(), + "id": 2, + "last_reset": None, + "last_reset_ts": six_months_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": six_months_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + ] diff --git a/tests/components/recorder/test_purge_v32_schema.py b/tests/components/recorder/test_purge_v32_schema.py index b3c20ad4e26..f386fd19e36 100644 --- a/tests/components/recorder/test_purge_v32_schema.py +++ b/tests/components/recorder/test_purge_v32_schema.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta import json import sqlite3 -from unittest.mock import MagicMock, patch +from unittest.mock import patch from freezegun import freeze_time import pytest @@ -27,6 +27,7 @@ from homeassistant.core import HomeAssistant from homeassistant.util import dt as dt_util from .common import ( + async_attach_db_engine, async_recorder_block_till_done, async_wait_purge_done, async_wait_recording_done, @@ -64,25 +65,12 @@ def mock_use_sqlite(request): yield -async def _async_attach_db_engine(hass: HomeAssistant) -> None: - """Attach a database engine to the recorder.""" - instance = recorder.get_instance(hass) - - def _mock_setup_recorder_connection(): - with instance.engine.connect() as connection: - instance._setup_recorder_connection( - connection._dbapi_connection, MagicMock() - ) - - await instance.async_add_executor_job(_mock_setup_recorder_connection) - - async def test_purge_old_states( async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant ) -> None: """Test deleting old states.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_states(hass) @@ -178,7 +166,7 @@ async def test_purge_old_states_encouters_database_corruption( return await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_states(hass) await async_wait_recording_done(hass) @@ -211,7 +199,7 @@ async def test_purge_old_states_encounters_temporary_mysql_error( ) -> None: """Test retry on specific mysql operational errors.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_states(hass) await async_wait_recording_done(hass) @@ -243,7 +231,7 @@ async def test_purge_old_states_encounters_operational_error( ) -> None: """Test error on operational errors that are not mysql does not retry.""" await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_states(hass) await async_wait_recording_done(hass) @@ -268,7 +256,7 @@ async def test_purge_old_events( ) -> None: """Test deleting old events.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_events(hass) @@ -306,7 +294,7 @@ async def test_purge_old_recorder_runs( ) -> None: """Test deleting old recorder runs keeps current run.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_recorder_runs(hass) @@ -343,7 +331,7 @@ async def test_purge_old_statistics_runs( ) -> None: """Test deleting old statistics runs keeps the latest run.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_statistics_runs(hass) @@ -384,7 +372,7 @@ async def test_purge_method( assert run1.start == run2.start await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) service_data = {"keep_days": 4} await _add_test_events(hass) @@ -522,7 +510,7 @@ async def test_purge_edge_case( ) await async_setup_recorder_instance(hass, None) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await async_wait_purge_done(hass) @@ -621,7 +609,7 @@ async def test_purge_cutoff_date( ) instance = await async_setup_recorder_instance(hass, None) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await async_wait_purge_done(hass) @@ -948,7 +936,7 @@ async def test_purge_many_old_events( ) -> None: """Test deleting old events.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) old_events_count = 5 with patch.object(instance, "max_bind_vars", old_events_count), patch.object( @@ -1001,7 +989,7 @@ async def test_purge_can_mix_legacy_and_new_format( ) -> None: """Test purging with legacy and new events.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await async_wait_recording_done(hass) # New databases are no longer created with the legacy events index @@ -1114,7 +1102,7 @@ async def test_purge_can_mix_legacy_and_new_format_with_detached_state( return pytest.skip("This tests disables foreign key checks on SQLite") instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await async_wait_recording_done(hass) # New databases are no longer created with the legacy events index @@ -1254,7 +1242,7 @@ async def test_purge_entities_keep_days( ) -> None: """Test purging states with an entity filter and keep_days.""" instance = await async_setup_recorder_instance(hass, {}) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await hass.async_block_till_done() await async_wait_recording_done(hass)