diff --git a/homeassistant/components/recorder/db_schema.py b/homeassistant/components/recorder/db_schema.py index 17e34af1e11..06c8cf68903 100644 --- a/homeassistant/components/recorder/db_schema.py +++ b/homeassistant/components/recorder/db_schema.py @@ -68,7 +68,7 @@ class Base(DeclarativeBase): """Base class for tables.""" -SCHEMA_VERSION = 41 +SCHEMA_VERSION = 42 _LOGGER = logging.getLogger(__name__) diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 7655002b45f..8808ed2fd2b 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -68,13 +68,20 @@ from .db_schema import ( StatisticsShortTerm, ) from .models import process_timestamp +from .models.time import datetime_to_timestamp_or_none from .queries import ( batch_cleanup_entity_ids, + delete_duplicate_short_term_statistics_row, + delete_duplicate_statistics_row, find_entity_ids_to_migrate, find_event_type_to_migrate, find_events_context_ids_to_migrate, find_states_context_ids_to_migrate, + find_unmigrated_short_term_statistics_rows, + find_unmigrated_statistics_rows, has_used_states_event_ids, + migrate_single_short_term_statistics_row_to_timestamp, + migrate_single_statistics_row_to_timestamp, ) from .statistics import get_start_time from .tasks import ( @@ -950,26 +957,9 @@ def _apply_update( # noqa: C901 "statistics_short_term", "ix_statistics_short_term_statistic_id_start_ts", ) - try: - _migrate_statistics_columns_to_timestamp(instance, session_maker, engine) - except IntegrityError as ex: - _LOGGER.error( - "Statistics table contains duplicate entries: %s; " - "Cleaning up duplicates and trying again; " - "This will take a while; " - "Please be patient!", - ex, - ) - # There may be duplicated statistics entries, delete duplicates - # and try again - with session_scope(session=session_maker()) as session: - delete_statistics_duplicates(instance, hass, session) - _migrate_statistics_columns_to_timestamp(instance, session_maker, engine) - # Log at error level to ensure the user sees this message in the log - # since we logged the error above. - _LOGGER.error( - "Statistics migration successfully recovered after statistics table duplicate cleanup" - ) + _migrate_statistics_columns_to_timestamp_removing_duplicates( + hass, instance, session_maker, engine + ) elif new_version == 35: # Migration is done in two steps to ensure we can start using # the new columns before we wipe the old ones. @@ -1060,10 +1050,55 @@ def _apply_update( # noqa: C901 elif new_version == 41: _create_index(session_maker, "event_types", "ix_event_types_event_type") _create_index(session_maker, "states_meta", "ix_states_meta_entity_id") + elif new_version == 42: + # If the user had a previously failed migration, or they + # downgraded from 2023.3.x to an older version we will have + # unmigrated statistics columns so we want to clean this up + # one last time since compiling the statistics will be slow + # or fail if we have unmigrated statistics. + _migrate_statistics_columns_to_timestamp_removing_duplicates( + hass, instance, session_maker, engine + ) else: raise ValueError(f"No schema migration defined for version {new_version}") +def _migrate_statistics_columns_to_timestamp_removing_duplicates( + hass: HomeAssistant, + instance: Recorder, + session_maker: Callable[[], Session], + engine: Engine, +) -> None: + """Migrate statistics columns to timestamp or cleanup duplicates.""" + try: + _migrate_statistics_columns_to_timestamp(instance, session_maker, engine) + except IntegrityError as ex: + _LOGGER.error( + "Statistics table contains duplicate entries: %s; " + "Cleaning up duplicates and trying again; " + "This will take a while; " + "Please be patient!", + ex, + ) + # There may be duplicated statistics entries, delete duplicates + # and try again + with session_scope(session=session_maker()) as session: + delete_statistics_duplicates(instance, hass, session) + try: + _migrate_statistics_columns_to_timestamp(instance, session_maker, engine) + except IntegrityError: + _LOGGER.warning( + "Statistics table still contains duplicate entries after cleanup; " + "Falling back to a one by one migration" + ) + _migrate_statistics_columns_to_timestamp_one_by_one(instance, session_maker) + # Log at error level to ensure the user sees this message in the log + # since we logged the error above. + _LOGGER.error( + "Statistics migration successfully recovered after statistics table duplicate cleanup" + ) + + def _correct_table_character_set_and_collation( table: str, session_maker: Callable[[], Session], @@ -1269,6 +1304,59 @@ def _migrate_columns_to_timestamp( ) +@database_job_retry_wrapper("Migrate statistics columns to timestamp one by one", 3) +def _migrate_statistics_columns_to_timestamp_one_by_one( + instance: Recorder, session_maker: Callable[[], Session] +) -> None: + """Migrate statistics columns to use timestamp on by one. + + If something manually inserted data into the statistics table + in the past it may have inserted duplicate rows. + + Before we had the unique index on (statistic_id, start) this + the data could have been inserted without any errors and we + could end up with duplicate rows that go undetected (even by + our current duplicate cleanup code) until we try to migrate the + data to use timestamps. + + This will migrate the data one by one to ensure we do not hit any + duplicate rows, and remove the duplicate rows as they are found. + """ + for find_func, migrate_func, delete_func in ( + ( + find_unmigrated_statistics_rows, + migrate_single_statistics_row_to_timestamp, + delete_duplicate_statistics_row, + ), + ( + find_unmigrated_short_term_statistics_rows, + migrate_single_short_term_statistics_row_to_timestamp, + delete_duplicate_short_term_statistics_row, + ), + ): + with session_scope(session=session_maker()) as session: + while stats := session.execute(find_func(instance.max_bind_vars)).all(): + for statistic_id, start, created, last_reset in stats: + start_ts = datetime_to_timestamp_or_none(process_timestamp(start)) + created_ts = datetime_to_timestamp_or_none( + process_timestamp(created) + ) + last_reset_ts = datetime_to_timestamp_or_none( + process_timestamp(last_reset) + ) + try: + session.execute( + migrate_func( + statistic_id, start_ts, created_ts, last_reset_ts + ) + ) + except IntegrityError: + # This can happen if we have duplicate rows + # in the statistics table. + session.execute(delete_func(statistic_id)) + session.commit() + + @database_job_retry_wrapper("Migrate statistics columns to timestamp", 3) def _migrate_statistics_columns_to_timestamp( instance: Recorder, session_maker: Callable[[], Session], engine: Engine @@ -1292,7 +1380,7 @@ def _migrate_statistics_columns_to_timestamp( f"created_ts=strftime('%s',created) + " "cast(substr(created,-7) AS FLOAT), " f"last_reset_ts=strftime('%s',last_reset) + " - "cast(substr(last_reset,-7) AS FLOAT);" + "cast(substr(last_reset,-7) AS FLOAT) where start_ts is NULL;" ) ) elif engine.dialect.name == SupportedDialect.MYSQL: diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index d44094878c2..c03057b31b2 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -16,6 +16,7 @@ from .db_schema import ( StateAttributes, States, StatesMeta, + Statistics, StatisticsRuns, StatisticsShortTerm, ) @@ -860,3 +861,96 @@ def delete_states_meta_rows(metadata_ids: Iterable[int]) -> StatementLambdaEleme .where(StatesMeta.metadata_id.in_(metadata_ids)) .execution_options(synchronize_session=False) ) + + +def find_unmigrated_short_term_statistics_rows( + max_bind_vars: int, +) -> StatementLambdaElement: + """Find unmigrated short term statistics rows.""" + return lambda_stmt( + lambda: select( + StatisticsShortTerm.id, + StatisticsShortTerm.start, + StatisticsShortTerm.created, + StatisticsShortTerm.last_reset, + ) + .filter(StatisticsShortTerm.start_ts.is_(None)) + .filter(StatisticsShortTerm.start.isnot(None)) + .limit(max_bind_vars) + ) + + +def find_unmigrated_statistics_rows(max_bind_vars: int) -> StatementLambdaElement: + """Find unmigrated statistics rows.""" + return lambda_stmt( + lambda: select( + Statistics.id, Statistics.start, Statistics.created, Statistics.last_reset + ) + .filter(Statistics.start_ts.is_(None)) + .filter(Statistics.start.isnot(None)) + .limit(max_bind_vars) + ) + + +def migrate_single_short_term_statistics_row_to_timestamp( + statistic_id: int, + start_ts: float | None, + created_ts: float | None, + last_reset_ts: float | None, +) -> StatementLambdaElement: + """Migrate a single short term statistics row to timestamp.""" + return lambda_stmt( + lambda: update(StatisticsShortTerm) + .where(StatisticsShortTerm.id == statistic_id) + .values( + start_ts=start_ts, + start=None, + created_ts=created_ts, + created=None, + last_reset_ts=last_reset_ts, + last_reset=None, + ) + .execution_options(synchronize_session=False) + ) + + +def migrate_single_statistics_row_to_timestamp( + statistic_id: int, + start_ts: float | None, + created_ts: float | None, + last_reset_ts: float | None, +) -> StatementLambdaElement: + """Migrate a single statistics row to timestamp.""" + return lambda_stmt( + lambda: update(Statistics) + .where(Statistics.id == statistic_id) + .values( + start_ts=start_ts, + start=None, + created_ts=created_ts, + created=None, + last_reset_ts=last_reset_ts, + last_reset=None, + ) + .execution_options(synchronize_session=False) + ) + + +def delete_duplicate_short_term_statistics_row( + statistic_id: int, +) -> StatementLambdaElement: + """Delete a single duplicate short term statistics row.""" + return lambda_stmt( + lambda: delete(StatisticsShortTerm) + .where(StatisticsShortTerm.id == statistic_id) + .execution_options(synchronize_session=False) + ) + + +def delete_duplicate_statistics_row(statistic_id: int) -> StatementLambdaElement: + """Delete a single duplicate statistics row.""" + return lambda_stmt( + lambda: delete(Statistics) + .where(Statistics.id == statistic_id) + .execution_options(synchronize_session=False) + ) diff --git a/tests/components/recorder/common.py b/tests/components/recorder/common.py index 521be81c89b..a982eeb39be 100644 --- a/tests/components/recorder/common.py +++ b/tests/components/recorder/common.py @@ -11,7 +11,7 @@ import importlib import sys import time from typing import Any, Literal, cast -from unittest.mock import patch, sentinel +from unittest.mock import MagicMock, patch, sentinel from freezegun import freeze_time from sqlalchemy import create_engine @@ -430,3 +430,16 @@ def old_db_schema(schema_version_postfix: str) -> Iterator[None]: ), ): yield + + +async def async_attach_db_engine(hass: HomeAssistant) -> None: + """Attach a database engine to the recorder.""" + instance = recorder.get_instance(hass) + + def _mock_setup_recorder_connection(): + with instance.engine.connect() as connection: + instance._setup_recorder_connection( + connection._dbapi_connection, MagicMock() + ) + + await instance.async_add_executor_job(_mock_setup_recorder_connection) diff --git a/tests/components/recorder/test_migration_from_schema_32.py b/tests/components/recorder/test_migration_from_schema_32.py index e007d2408dd..852419559b2 100644 --- a/tests/components/recorder/test_migration_from_schema_32.py +++ b/tests/components/recorder/test_migration_from_schema_32.py @@ -1,22 +1,26 @@ """The tests for the recorder filter matching the EntityFilter component.""" +import datetime import importlib import sys +from typing import Any from unittest.mock import patch import uuid from freezegun import freeze_time import pytest from sqlalchemy import create_engine, inspect +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from homeassistant.components import recorder -from homeassistant.components.recorder import core, migration, statistics +from homeassistant.components.recorder import core, db_schema, migration, statistics from homeassistant.components.recorder.db_schema import ( Events, EventTypes, States, StatesMeta, ) +from homeassistant.components.recorder.models import process_timestamp from homeassistant.components.recorder.queries import select_event_type_ids from homeassistant.components.recorder.tasks import ( EntityIDMigrationTask, @@ -30,7 +34,11 @@ from homeassistant.core import HomeAssistant import homeassistant.util.dt as dt_util from homeassistant.util.ulid import bytes_to_ulid, ulid_at_time, ulid_to_bytes -from .common import async_recorder_block_till_done, async_wait_recording_done +from .common import ( + async_attach_db_engine, + async_recorder_block_till_done, + async_wait_recording_done, +) from tests.typing import RecorderInstanceGenerator @@ -844,3 +852,578 @@ async def test_migrate_null_event_type_ids( events_by_type = await instance.async_add_executor_job(_fetch_migrated_events) assert len(events_by_type["event_type_one"]) == 2 assert len(events_by_type[migration._EMPTY_EVENT_TYPE]) == 1000 + + +async def test_stats_timestamp_conversion_is_reentrant( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, +) -> None: + """Test stats migration is reentrant.""" + instance = await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + await async_attach_db_engine(hass) + importlib.import_module(SCHEMA_MODULE) + old_db_schema = sys.modules[SCHEMA_MODULE] + now = dt_util.utcnow() + one_year_ago = now - datetime.timedelta(days=365) + six_months_ago = now - datetime.timedelta(days=180) + one_month_ago = now - datetime.timedelta(days=30) + + def _do_migration(): + migration._migrate_statistics_columns_to_timestamp_removing_duplicates( + hass, instance, instance.get_session, instance.engine + ) + + def _insert_fake_metadata(): + with session_scope(hass=hass) as session: + session.add( + old_db_schema.StatisticsMeta( + id=1000, + statistic_id="test", + source="test", + unit_of_measurement="test", + has_mean=True, + has_sum=True, + name="1", + ) + ) + + def _insert_pre_timestamp_stat(date_time: datetime) -> None: + with session_scope(hass=hass) as session: + session.add( + old_db_schema.StatisticsShortTerm( + metadata_id=1000, + created=date_time, + created_ts=None, + start=date_time, + start_ts=None, + last_reset=date_time, + last_reset_ts=None, + state="1", + ) + ) + + def _insert_post_timestamp_stat(date_time: datetime) -> None: + with session_scope(hass=hass) as session: + session.add( + db_schema.StatisticsShortTerm( + metadata_id=1000, + created=None, + created_ts=date_time.timestamp(), + start=None, + start_ts=date_time.timestamp(), + last_reset=None, + last_reset_ts=date_time.timestamp(), + state="1", + ) + ) + + def _get_all_short_term_stats() -> list[dict[str, Any]]: + with session_scope(hass=hass) as session: + results = [] + for result in ( + session.query(old_db_schema.StatisticsShortTerm) + .where(old_db_schema.StatisticsShortTerm.metadata_id == 1000) + .all() + ): + results.append( + { + field.name: getattr(result, field.name) + for field in old_db_schema.StatisticsShortTerm.__table__.c + } + ) + return sorted(results, key=lambda row: row["start_ts"]) + + # Do not optimize this block, its intentionally written to interleave + # with the migration + await hass.async_add_executor_job(_insert_fake_metadata) + await async_wait_recording_done(hass) + await hass.async_add_executor_job(_insert_pre_timestamp_stat, one_year_ago) + await async_wait_recording_done(hass) + await hass.async_add_executor_job(_do_migration) + await hass.async_add_executor_job(_insert_post_timestamp_stat, six_months_ago) + await async_wait_recording_done(hass) + await hass.async_add_executor_job(_do_migration) + await hass.async_add_executor_job(_insert_pre_timestamp_stat, one_month_ago) + await async_wait_recording_done(hass) + await hass.async_add_executor_job(_do_migration) + + final_result = await hass.async_add_executor_job(_get_all_short_term_stats) + # Normalize timestamps since each engine returns them differently + for row in final_result: + if row["created"] is not None: + row["created"] = process_timestamp(row["created"]).replace(tzinfo=None) + if row["start"] is not None: + row["start"] = process_timestamp(row["start"]).replace(tzinfo=None) + if row["last_reset"] is not None: + row["last_reset"] = process_timestamp(row["last_reset"]).replace( + tzinfo=None + ) + + assert final_result == [ + { + "created": process_timestamp(one_year_ago).replace(tzinfo=None), + "created_ts": one_year_ago.timestamp(), + "id": 1, + "last_reset": process_timestamp(one_year_ago).replace(tzinfo=None), + "last_reset_ts": one_year_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": process_timestamp(one_year_ago).replace(tzinfo=None), + "start_ts": one_year_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": six_months_ago.timestamp(), + "id": 2, + "last_reset": None, + "last_reset_ts": six_months_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": six_months_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": process_timestamp(one_month_ago).replace(tzinfo=None), + "created_ts": one_month_ago.timestamp(), + "id": 3, + "last_reset": process_timestamp(one_month_ago).replace(tzinfo=None), + "last_reset_ts": one_month_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": process_timestamp(one_month_ago).replace(tzinfo=None), + "start_ts": one_month_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + ] + + +async def test_stats_timestamp_with_one_by_one( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, +) -> None: + """Test stats migration with one by one.""" + instance = await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + await async_attach_db_engine(hass) + importlib.import_module(SCHEMA_MODULE) + old_db_schema = sys.modules[SCHEMA_MODULE] + now = dt_util.utcnow() + one_year_ago = now - datetime.timedelta(days=365) + six_months_ago = now - datetime.timedelta(days=180) + one_month_ago = now - datetime.timedelta(days=30) + + def _do_migration(): + with patch.object( + migration, + "_migrate_statistics_columns_to_timestamp", + side_effect=IntegrityError("test", "test", "test"), + ): + migration._migrate_statistics_columns_to_timestamp_removing_duplicates( + hass, instance, instance.get_session, instance.engine + ) + + def _insert_fake_metadata(): + with session_scope(hass=hass) as session: + session.add( + old_db_schema.StatisticsMeta( + id=1000, + statistic_id="test", + source="test", + unit_of_measurement="test", + has_mean=True, + has_sum=True, + name="1", + ) + ) + + def _insert_pre_timestamp_stat(date_time: datetime) -> None: + with session_scope(hass=hass) as session: + session.add_all( + ( + old_db_schema.StatisticsShortTerm( + metadata_id=1000, + created=date_time, + created_ts=None, + start=date_time, + start_ts=None, + last_reset=date_time, + last_reset_ts=None, + state="1", + ), + old_db_schema.Statistics( + metadata_id=1000, + created=date_time, + created_ts=None, + start=date_time, + start_ts=None, + last_reset=date_time, + last_reset_ts=None, + state="1", + ), + ) + ) + + def _insert_post_timestamp_stat(date_time: datetime) -> None: + with session_scope(hass=hass) as session: + session.add_all( + ( + db_schema.StatisticsShortTerm( + metadata_id=1000, + created=None, + created_ts=date_time.timestamp(), + start=None, + start_ts=date_time.timestamp(), + last_reset=None, + last_reset_ts=date_time.timestamp(), + state="1", + ), + db_schema.Statistics( + metadata_id=1000, + created=None, + created_ts=date_time.timestamp(), + start=None, + start_ts=date_time.timestamp(), + last_reset=None, + last_reset_ts=date_time.timestamp(), + state="1", + ), + ) + ) + + def _get_all_stats(table: old_db_schema.StatisticsBase) -> list[dict[str, Any]]: + """Get all stats from a table.""" + with session_scope(hass=hass) as session: + results = [] + for result in session.query(table).where(table.metadata_id == 1000).all(): + results.append( + { + field.name: getattr(result, field.name) + for field in table.__table__.c + } + ) + return sorted(results, key=lambda row: row["start_ts"]) + + def _insert_and_do_migration(): + _insert_fake_metadata() + _insert_pre_timestamp_stat(one_year_ago) + _insert_post_timestamp_stat(six_months_ago) + _insert_pre_timestamp_stat(one_month_ago) + _do_migration() + + await hass.async_add_executor_job(_insert_and_do_migration) + final_short_term_result = await hass.async_add_executor_job( + _get_all_stats, old_db_schema.StatisticsShortTerm + ) + final_short_term_result = sorted( + final_short_term_result, key=lambda row: row["start_ts"] + ) + + assert final_short_term_result == [ + { + "created": None, + "created_ts": one_year_ago.timestamp(), + "id": 1, + "last_reset": None, + "last_reset_ts": one_year_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": one_year_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": six_months_ago.timestamp(), + "id": 2, + "last_reset": None, + "last_reset_ts": six_months_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": six_months_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": one_month_ago.timestamp(), + "id": 3, + "last_reset": None, + "last_reset_ts": one_month_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": one_month_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + ] + + final_result = await hass.async_add_executor_job( + _get_all_stats, old_db_schema.Statistics + ) + final_result = sorted(final_result, key=lambda row: row["start_ts"]) + + assert final_result == [ + { + "created": None, + "created_ts": one_year_ago.timestamp(), + "id": 1, + "last_reset": None, + "last_reset_ts": one_year_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": one_year_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": six_months_ago.timestamp(), + "id": 2, + "last_reset": None, + "last_reset_ts": six_months_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": six_months_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": one_month_ago.timestamp(), + "id": 3, + "last_reset": None, + "last_reset_ts": one_month_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": one_month_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + ] + + +async def test_stats_timestamp_with_one_by_one_removes_duplicates( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, +) -> None: + """Test stats migration with one by one removes duplicates.""" + instance = await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + await async_attach_db_engine(hass) + importlib.import_module(SCHEMA_MODULE) + old_db_schema = sys.modules[SCHEMA_MODULE] + now = dt_util.utcnow() + one_year_ago = now - datetime.timedelta(days=365) + six_months_ago = now - datetime.timedelta(days=180) + one_month_ago = now - datetime.timedelta(days=30) + + def _do_migration(): + with patch.object( + migration, + "_migrate_statistics_columns_to_timestamp", + side_effect=IntegrityError("test", "test", "test"), + ), patch.object( + migration, + "migrate_single_statistics_row_to_timestamp", + side_effect=IntegrityError("test", "test", "test"), + ): + migration._migrate_statistics_columns_to_timestamp_removing_duplicates( + hass, instance, instance.get_session, instance.engine + ) + + def _insert_fake_metadata(): + with session_scope(hass=hass) as session: + session.add( + old_db_schema.StatisticsMeta( + id=1000, + statistic_id="test", + source="test", + unit_of_measurement="test", + has_mean=True, + has_sum=True, + name="1", + ) + ) + + def _insert_pre_timestamp_stat(date_time: datetime) -> None: + with session_scope(hass=hass) as session: + session.add_all( + ( + old_db_schema.StatisticsShortTerm( + metadata_id=1000, + created=date_time, + created_ts=None, + start=date_time, + start_ts=None, + last_reset=date_time, + last_reset_ts=None, + state="1", + ), + old_db_schema.Statistics( + metadata_id=1000, + created=date_time, + created_ts=None, + start=date_time, + start_ts=None, + last_reset=date_time, + last_reset_ts=None, + state="1", + ), + ) + ) + + def _insert_post_timestamp_stat(date_time: datetime) -> None: + with session_scope(hass=hass) as session: + session.add_all( + ( + db_schema.StatisticsShortTerm( + metadata_id=1000, + created=None, + created_ts=date_time.timestamp(), + start=None, + start_ts=date_time.timestamp(), + last_reset=None, + last_reset_ts=date_time.timestamp(), + state="1", + ), + db_schema.Statistics( + metadata_id=1000, + created=None, + created_ts=date_time.timestamp(), + start=None, + start_ts=date_time.timestamp(), + last_reset=None, + last_reset_ts=date_time.timestamp(), + state="1", + ), + ) + ) + + def _get_all_stats(table: old_db_schema.StatisticsBase) -> list[dict[str, Any]]: + """Get all stats from a table.""" + with session_scope(hass=hass) as session: + results = [] + for result in session.query(table).where(table.metadata_id == 1000).all(): + results.append( + { + field.name: getattr(result, field.name) + for field in table.__table__.c + } + ) + return sorted(results, key=lambda row: row["start_ts"]) + + def _insert_and_do_migration(): + _insert_fake_metadata() + _insert_pre_timestamp_stat(one_year_ago) + _insert_post_timestamp_stat(six_months_ago) + _insert_pre_timestamp_stat(one_month_ago) + _do_migration() + + await hass.async_add_executor_job(_insert_and_do_migration) + final_short_term_result = await hass.async_add_executor_job( + _get_all_stats, old_db_schema.StatisticsShortTerm + ) + final_short_term_result = sorted( + final_short_term_result, key=lambda row: row["start_ts"] + ) + + assert final_short_term_result == [ + { + "created": None, + "created_ts": one_year_ago.timestamp(), + "id": 1, + "last_reset": None, + "last_reset_ts": one_year_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": one_year_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": six_months_ago.timestamp(), + "id": 2, + "last_reset": None, + "last_reset_ts": six_months_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": six_months_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + { + "created": None, + "created_ts": one_month_ago.timestamp(), + "id": 3, + "last_reset": None, + "last_reset_ts": one_month_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": one_month_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + ] + + # All the duplicates should have been removed but + # the non-duplicates should still be there + final_result = await hass.async_add_executor_job( + _get_all_stats, old_db_schema.Statistics + ) + assert final_result == [ + { + "created": None, + "created_ts": six_months_ago.timestamp(), + "id": 2, + "last_reset": None, + "last_reset_ts": six_months_ago.timestamp(), + "max": None, + "mean": None, + "metadata_id": 1000, + "min": None, + "start": None, + "start_ts": six_months_ago.timestamp(), + "state": 1.0, + "sum": None, + }, + ] diff --git a/tests/components/recorder/test_purge_v32_schema.py b/tests/components/recorder/test_purge_v32_schema.py index b3c20ad4e26..f386fd19e36 100644 --- a/tests/components/recorder/test_purge_v32_schema.py +++ b/tests/components/recorder/test_purge_v32_schema.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta import json import sqlite3 -from unittest.mock import MagicMock, patch +from unittest.mock import patch from freezegun import freeze_time import pytest @@ -27,6 +27,7 @@ from homeassistant.core import HomeAssistant from homeassistant.util import dt as dt_util from .common import ( + async_attach_db_engine, async_recorder_block_till_done, async_wait_purge_done, async_wait_recording_done, @@ -64,25 +65,12 @@ def mock_use_sqlite(request): yield -async def _async_attach_db_engine(hass: HomeAssistant) -> None: - """Attach a database engine to the recorder.""" - instance = recorder.get_instance(hass) - - def _mock_setup_recorder_connection(): - with instance.engine.connect() as connection: - instance._setup_recorder_connection( - connection._dbapi_connection, MagicMock() - ) - - await instance.async_add_executor_job(_mock_setup_recorder_connection) - - async def test_purge_old_states( async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant ) -> None: """Test deleting old states.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_states(hass) @@ -178,7 +166,7 @@ async def test_purge_old_states_encouters_database_corruption( return await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_states(hass) await async_wait_recording_done(hass) @@ -211,7 +199,7 @@ async def test_purge_old_states_encounters_temporary_mysql_error( ) -> None: """Test retry on specific mysql operational errors.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_states(hass) await async_wait_recording_done(hass) @@ -243,7 +231,7 @@ async def test_purge_old_states_encounters_operational_error( ) -> None: """Test error on operational errors that are not mysql does not retry.""" await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_states(hass) await async_wait_recording_done(hass) @@ -268,7 +256,7 @@ async def test_purge_old_events( ) -> None: """Test deleting old events.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_events(hass) @@ -306,7 +294,7 @@ async def test_purge_old_recorder_runs( ) -> None: """Test deleting old recorder runs keeps current run.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_recorder_runs(hass) @@ -343,7 +331,7 @@ async def test_purge_old_statistics_runs( ) -> None: """Test deleting old statistics runs keeps the latest run.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await _add_test_statistics_runs(hass) @@ -384,7 +372,7 @@ async def test_purge_method( assert run1.start == run2.start await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) service_data = {"keep_days": 4} await _add_test_events(hass) @@ -522,7 +510,7 @@ async def test_purge_edge_case( ) await async_setup_recorder_instance(hass, None) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await async_wait_purge_done(hass) @@ -621,7 +609,7 @@ async def test_purge_cutoff_date( ) instance = await async_setup_recorder_instance(hass, None) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await async_wait_purge_done(hass) @@ -948,7 +936,7 @@ async def test_purge_many_old_events( ) -> None: """Test deleting old events.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) old_events_count = 5 with patch.object(instance, "max_bind_vars", old_events_count), patch.object( @@ -1001,7 +989,7 @@ async def test_purge_can_mix_legacy_and_new_format( ) -> None: """Test purging with legacy and new events.""" instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await async_wait_recording_done(hass) # New databases are no longer created with the legacy events index @@ -1114,7 +1102,7 @@ async def test_purge_can_mix_legacy_and_new_format_with_detached_state( return pytest.skip("This tests disables foreign key checks on SQLite") instance = await async_setup_recorder_instance(hass) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await async_wait_recording_done(hass) # New databases are no longer created with the legacy events index @@ -1254,7 +1242,7 @@ async def test_purge_entities_keep_days( ) -> None: """Test purging states with an entity filter and keep_days.""" instance = await async_setup_recorder_instance(hass, {}) - await _async_attach_db_engine(hass) + await async_attach_db_engine(hass) await hass.async_block_till_done() await async_wait_recording_done(hass)