Recover from previously failed statistics migrations (#101781)

* Handle statistics columns being unmigrated from previous downgrades

If the user downgraded HA from 2023.3.x to an older version without
restoring the database and they upgrade again with the same database
they will have unmigrated statistics columns since we only migrate them
once.

As its expensive to check, we do not want to check every time
at startup, so we will only do this one more time since the
risk that someone will downgrade to an older version is very
low at this point.

* add guard to sqlite to prevent re-migrate

* test

* move test to insert with old schema

* use helper

* normalize timestamps

* remove

* add check

* add fallback migration

* add fallback migration

* commit

* remove useless logging

* remove useless logging

* do the other columns at the same time

* coverage

* dry

* comment

* Update tests/components/recorder/test_migration_from_schema_32.py
This commit is contained in:
J. Nick Koston 2023-10-22 17:34:43 -10:00 committed by GitHub
parent 4ee9a6f130
commit 268425b5e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 819 additions and 53 deletions

View file

@ -68,7 +68,7 @@ class Base(DeclarativeBase):
"""Base class for tables.""" """Base class for tables."""
SCHEMA_VERSION = 41 SCHEMA_VERSION = 42
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)

View file

@ -68,13 +68,20 @@ from .db_schema import (
StatisticsShortTerm, StatisticsShortTerm,
) )
from .models import process_timestamp from .models import process_timestamp
from .models.time import datetime_to_timestamp_or_none
from .queries import ( from .queries import (
batch_cleanup_entity_ids, batch_cleanup_entity_ids,
delete_duplicate_short_term_statistics_row,
delete_duplicate_statistics_row,
find_entity_ids_to_migrate, find_entity_ids_to_migrate,
find_event_type_to_migrate, find_event_type_to_migrate,
find_events_context_ids_to_migrate, find_events_context_ids_to_migrate,
find_states_context_ids_to_migrate, find_states_context_ids_to_migrate,
find_unmigrated_short_term_statistics_rows,
find_unmigrated_statistics_rows,
has_used_states_event_ids, has_used_states_event_ids,
migrate_single_short_term_statistics_row_to_timestamp,
migrate_single_statistics_row_to_timestamp,
) )
from .statistics import get_start_time from .statistics import get_start_time
from .tasks import ( from .tasks import (
@ -950,26 +957,9 @@ def _apply_update( # noqa: C901
"statistics_short_term", "statistics_short_term",
"ix_statistics_short_term_statistic_id_start_ts", "ix_statistics_short_term_statistic_id_start_ts",
) )
try: _migrate_statistics_columns_to_timestamp_removing_duplicates(
_migrate_statistics_columns_to_timestamp(instance, session_maker, engine) hass, instance, session_maker, engine
except IntegrityError as ex: )
_LOGGER.error(
"Statistics table contains duplicate entries: %s; "
"Cleaning up duplicates and trying again; "
"This will take a while; "
"Please be patient!",
ex,
)
# There may be duplicated statistics entries, delete duplicates
# and try again
with session_scope(session=session_maker()) as session:
delete_statistics_duplicates(instance, hass, session)
_migrate_statistics_columns_to_timestamp(instance, session_maker, engine)
# Log at error level to ensure the user sees this message in the log
# since we logged the error above.
_LOGGER.error(
"Statistics migration successfully recovered after statistics table duplicate cleanup"
)
elif new_version == 35: elif new_version == 35:
# Migration is done in two steps to ensure we can start using # Migration is done in two steps to ensure we can start using
# the new columns before we wipe the old ones. # the new columns before we wipe the old ones.
@ -1060,10 +1050,55 @@ def _apply_update( # noqa: C901
elif new_version == 41: elif new_version == 41:
_create_index(session_maker, "event_types", "ix_event_types_event_type") _create_index(session_maker, "event_types", "ix_event_types_event_type")
_create_index(session_maker, "states_meta", "ix_states_meta_entity_id") _create_index(session_maker, "states_meta", "ix_states_meta_entity_id")
elif new_version == 42:
# If the user had a previously failed migration, or they
# downgraded from 2023.3.x to an older version we will have
# unmigrated statistics columns so we want to clean this up
# one last time since compiling the statistics will be slow
# or fail if we have unmigrated statistics.
_migrate_statistics_columns_to_timestamp_removing_duplicates(
hass, instance, session_maker, engine
)
else: else:
raise ValueError(f"No schema migration defined for version {new_version}") raise ValueError(f"No schema migration defined for version {new_version}")
def _migrate_statistics_columns_to_timestamp_removing_duplicates(
hass: HomeAssistant,
instance: Recorder,
session_maker: Callable[[], Session],
engine: Engine,
) -> None:
"""Migrate statistics columns to timestamp or cleanup duplicates."""
try:
_migrate_statistics_columns_to_timestamp(instance, session_maker, engine)
except IntegrityError as ex:
_LOGGER.error(
"Statistics table contains duplicate entries: %s; "
"Cleaning up duplicates and trying again; "
"This will take a while; "
"Please be patient!",
ex,
)
# There may be duplicated statistics entries, delete duplicates
# and try again
with session_scope(session=session_maker()) as session:
delete_statistics_duplicates(instance, hass, session)
try:
_migrate_statistics_columns_to_timestamp(instance, session_maker, engine)
except IntegrityError:
_LOGGER.warning(
"Statistics table still contains duplicate entries after cleanup; "
"Falling back to a one by one migration"
)
_migrate_statistics_columns_to_timestamp_one_by_one(instance, session_maker)
# Log at error level to ensure the user sees this message in the log
# since we logged the error above.
_LOGGER.error(
"Statistics migration successfully recovered after statistics table duplicate cleanup"
)
def _correct_table_character_set_and_collation( def _correct_table_character_set_and_collation(
table: str, table: str,
session_maker: Callable[[], Session], session_maker: Callable[[], Session],
@ -1269,6 +1304,59 @@ def _migrate_columns_to_timestamp(
) )
@database_job_retry_wrapper("Migrate statistics columns to timestamp one by one", 3)
def _migrate_statistics_columns_to_timestamp_one_by_one(
instance: Recorder, session_maker: Callable[[], Session]
) -> None:
"""Migrate statistics columns to use timestamp on by one.
If something manually inserted data into the statistics table
in the past it may have inserted duplicate rows.
Before we had the unique index on (statistic_id, start) this
the data could have been inserted without any errors and we
could end up with duplicate rows that go undetected (even by
our current duplicate cleanup code) until we try to migrate the
data to use timestamps.
This will migrate the data one by one to ensure we do not hit any
duplicate rows, and remove the duplicate rows as they are found.
"""
for find_func, migrate_func, delete_func in (
(
find_unmigrated_statistics_rows,
migrate_single_statistics_row_to_timestamp,
delete_duplicate_statistics_row,
),
(
find_unmigrated_short_term_statistics_rows,
migrate_single_short_term_statistics_row_to_timestamp,
delete_duplicate_short_term_statistics_row,
),
):
with session_scope(session=session_maker()) as session:
while stats := session.execute(find_func(instance.max_bind_vars)).all():
for statistic_id, start, created, last_reset in stats:
start_ts = datetime_to_timestamp_or_none(process_timestamp(start))
created_ts = datetime_to_timestamp_or_none(
process_timestamp(created)
)
last_reset_ts = datetime_to_timestamp_or_none(
process_timestamp(last_reset)
)
try:
session.execute(
migrate_func(
statistic_id, start_ts, created_ts, last_reset_ts
)
)
except IntegrityError:
# This can happen if we have duplicate rows
# in the statistics table.
session.execute(delete_func(statistic_id))
session.commit()
@database_job_retry_wrapper("Migrate statistics columns to timestamp", 3) @database_job_retry_wrapper("Migrate statistics columns to timestamp", 3)
def _migrate_statistics_columns_to_timestamp( def _migrate_statistics_columns_to_timestamp(
instance: Recorder, session_maker: Callable[[], Session], engine: Engine instance: Recorder, session_maker: Callable[[], Session], engine: Engine
@ -1292,7 +1380,7 @@ def _migrate_statistics_columns_to_timestamp(
f"created_ts=strftime('%s',created) + " f"created_ts=strftime('%s',created) + "
"cast(substr(created,-7) AS FLOAT), " "cast(substr(created,-7) AS FLOAT), "
f"last_reset_ts=strftime('%s',last_reset) + " f"last_reset_ts=strftime('%s',last_reset) + "
"cast(substr(last_reset,-7) AS FLOAT);" "cast(substr(last_reset,-7) AS FLOAT) where start_ts is NULL;"
) )
) )
elif engine.dialect.name == SupportedDialect.MYSQL: elif engine.dialect.name == SupportedDialect.MYSQL:

View file

@ -16,6 +16,7 @@ from .db_schema import (
StateAttributes, StateAttributes,
States, States,
StatesMeta, StatesMeta,
Statistics,
StatisticsRuns, StatisticsRuns,
StatisticsShortTerm, StatisticsShortTerm,
) )
@ -860,3 +861,96 @@ def delete_states_meta_rows(metadata_ids: Iterable[int]) -> StatementLambdaEleme
.where(StatesMeta.metadata_id.in_(metadata_ids)) .where(StatesMeta.metadata_id.in_(metadata_ids))
.execution_options(synchronize_session=False) .execution_options(synchronize_session=False)
) )
def find_unmigrated_short_term_statistics_rows(
max_bind_vars: int,
) -> StatementLambdaElement:
"""Find unmigrated short term statistics rows."""
return lambda_stmt(
lambda: select(
StatisticsShortTerm.id,
StatisticsShortTerm.start,
StatisticsShortTerm.created,
StatisticsShortTerm.last_reset,
)
.filter(StatisticsShortTerm.start_ts.is_(None))
.filter(StatisticsShortTerm.start.isnot(None))
.limit(max_bind_vars)
)
def find_unmigrated_statistics_rows(max_bind_vars: int) -> StatementLambdaElement:
"""Find unmigrated statistics rows."""
return lambda_stmt(
lambda: select(
Statistics.id, Statistics.start, Statistics.created, Statistics.last_reset
)
.filter(Statistics.start_ts.is_(None))
.filter(Statistics.start.isnot(None))
.limit(max_bind_vars)
)
def migrate_single_short_term_statistics_row_to_timestamp(
statistic_id: int,
start_ts: float | None,
created_ts: float | None,
last_reset_ts: float | None,
) -> StatementLambdaElement:
"""Migrate a single short term statistics row to timestamp."""
return lambda_stmt(
lambda: update(StatisticsShortTerm)
.where(StatisticsShortTerm.id == statistic_id)
.values(
start_ts=start_ts,
start=None,
created_ts=created_ts,
created=None,
last_reset_ts=last_reset_ts,
last_reset=None,
)
.execution_options(synchronize_session=False)
)
def migrate_single_statistics_row_to_timestamp(
statistic_id: int,
start_ts: float | None,
created_ts: float | None,
last_reset_ts: float | None,
) -> StatementLambdaElement:
"""Migrate a single statistics row to timestamp."""
return lambda_stmt(
lambda: update(Statistics)
.where(Statistics.id == statistic_id)
.values(
start_ts=start_ts,
start=None,
created_ts=created_ts,
created=None,
last_reset_ts=last_reset_ts,
last_reset=None,
)
.execution_options(synchronize_session=False)
)
def delete_duplicate_short_term_statistics_row(
statistic_id: int,
) -> StatementLambdaElement:
"""Delete a single duplicate short term statistics row."""
return lambda_stmt(
lambda: delete(StatisticsShortTerm)
.where(StatisticsShortTerm.id == statistic_id)
.execution_options(synchronize_session=False)
)
def delete_duplicate_statistics_row(statistic_id: int) -> StatementLambdaElement:
"""Delete a single duplicate statistics row."""
return lambda_stmt(
lambda: delete(Statistics)
.where(Statistics.id == statistic_id)
.execution_options(synchronize_session=False)
)

View file

@ -11,7 +11,7 @@ import importlib
import sys import sys
import time import time
from typing import Any, Literal, cast from typing import Any, Literal, cast
from unittest.mock import patch, sentinel from unittest.mock import MagicMock, patch, sentinel
from freezegun import freeze_time from freezegun import freeze_time
from sqlalchemy import create_engine from sqlalchemy import create_engine
@ -430,3 +430,16 @@ def old_db_schema(schema_version_postfix: str) -> Iterator[None]:
), ),
): ):
yield yield
async def async_attach_db_engine(hass: HomeAssistant) -> None:
"""Attach a database engine to the recorder."""
instance = recorder.get_instance(hass)
def _mock_setup_recorder_connection():
with instance.engine.connect() as connection:
instance._setup_recorder_connection(
connection._dbapi_connection, MagicMock()
)
await instance.async_add_executor_job(_mock_setup_recorder_connection)

View file

@ -1,22 +1,26 @@
"""The tests for the recorder filter matching the EntityFilter component.""" """The tests for the recorder filter matching the EntityFilter component."""
import datetime
import importlib import importlib
import sys import sys
from typing import Any
from unittest.mock import patch from unittest.mock import patch
import uuid import uuid
from freezegun import freeze_time from freezegun import freeze_time
import pytest import pytest
from sqlalchemy import create_engine, inspect from sqlalchemy import create_engine, inspect
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from homeassistant.components import recorder from homeassistant.components import recorder
from homeassistant.components.recorder import core, migration, statistics from homeassistant.components.recorder import core, db_schema, migration, statistics
from homeassistant.components.recorder.db_schema import ( from homeassistant.components.recorder.db_schema import (
Events, Events,
EventTypes, EventTypes,
States, States,
StatesMeta, StatesMeta,
) )
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.components.recorder.queries import select_event_type_ids from homeassistant.components.recorder.queries import select_event_type_ids
from homeassistant.components.recorder.tasks import ( from homeassistant.components.recorder.tasks import (
EntityIDMigrationTask, EntityIDMigrationTask,
@ -30,7 +34,11 @@ from homeassistant.core import HomeAssistant
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from homeassistant.util.ulid import bytes_to_ulid, ulid_at_time, ulid_to_bytes from homeassistant.util.ulid import bytes_to_ulid, ulid_at_time, ulid_to_bytes
from .common import async_recorder_block_till_done, async_wait_recording_done from .common import (
async_attach_db_engine,
async_recorder_block_till_done,
async_wait_recording_done,
)
from tests.typing import RecorderInstanceGenerator from tests.typing import RecorderInstanceGenerator
@ -844,3 +852,578 @@ async def test_migrate_null_event_type_ids(
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events) events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
assert len(events_by_type["event_type_one"]) == 2 assert len(events_by_type["event_type_one"]) == 2
assert len(events_by_type[migration._EMPTY_EVENT_TYPE]) == 1000 assert len(events_by_type[migration._EMPTY_EVENT_TYPE]) == 1000
async def test_stats_timestamp_conversion_is_reentrant(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
) -> None:
"""Test stats migration is reentrant."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
await async_attach_db_engine(hass)
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
now = dt_util.utcnow()
one_year_ago = now - datetime.timedelta(days=365)
six_months_ago = now - datetime.timedelta(days=180)
one_month_ago = now - datetime.timedelta(days=30)
def _do_migration():
migration._migrate_statistics_columns_to_timestamp_removing_duplicates(
hass, instance, instance.get_session, instance.engine
)
def _insert_fake_metadata():
with session_scope(hass=hass) as session:
session.add(
old_db_schema.StatisticsMeta(
id=1000,
statistic_id="test",
source="test",
unit_of_measurement="test",
has_mean=True,
has_sum=True,
name="1",
)
)
def _insert_pre_timestamp_stat(date_time: datetime) -> None:
with session_scope(hass=hass) as session:
session.add(
old_db_schema.StatisticsShortTerm(
metadata_id=1000,
created=date_time,
created_ts=None,
start=date_time,
start_ts=None,
last_reset=date_time,
last_reset_ts=None,
state="1",
)
)
def _insert_post_timestamp_stat(date_time: datetime) -> None:
with session_scope(hass=hass) as session:
session.add(
db_schema.StatisticsShortTerm(
metadata_id=1000,
created=None,
created_ts=date_time.timestamp(),
start=None,
start_ts=date_time.timestamp(),
last_reset=None,
last_reset_ts=date_time.timestamp(),
state="1",
)
)
def _get_all_short_term_stats() -> list[dict[str, Any]]:
with session_scope(hass=hass) as session:
results = []
for result in (
session.query(old_db_schema.StatisticsShortTerm)
.where(old_db_schema.StatisticsShortTerm.metadata_id == 1000)
.all()
):
results.append(
{
field.name: getattr(result, field.name)
for field in old_db_schema.StatisticsShortTerm.__table__.c
}
)
return sorted(results, key=lambda row: row["start_ts"])
# Do not optimize this block, its intentionally written to interleave
# with the migration
await hass.async_add_executor_job(_insert_fake_metadata)
await async_wait_recording_done(hass)
await hass.async_add_executor_job(_insert_pre_timestamp_stat, one_year_ago)
await async_wait_recording_done(hass)
await hass.async_add_executor_job(_do_migration)
await hass.async_add_executor_job(_insert_post_timestamp_stat, six_months_ago)
await async_wait_recording_done(hass)
await hass.async_add_executor_job(_do_migration)
await hass.async_add_executor_job(_insert_pre_timestamp_stat, one_month_ago)
await async_wait_recording_done(hass)
await hass.async_add_executor_job(_do_migration)
final_result = await hass.async_add_executor_job(_get_all_short_term_stats)
# Normalize timestamps since each engine returns them differently
for row in final_result:
if row["created"] is not None:
row["created"] = process_timestamp(row["created"]).replace(tzinfo=None)
if row["start"] is not None:
row["start"] = process_timestamp(row["start"]).replace(tzinfo=None)
if row["last_reset"] is not None:
row["last_reset"] = process_timestamp(row["last_reset"]).replace(
tzinfo=None
)
assert final_result == [
{
"created": process_timestamp(one_year_ago).replace(tzinfo=None),
"created_ts": one_year_ago.timestamp(),
"id": 1,
"last_reset": process_timestamp(one_year_ago).replace(tzinfo=None),
"last_reset_ts": one_year_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": process_timestamp(one_year_ago).replace(tzinfo=None),
"start_ts": one_year_ago.timestamp(),
"state": 1.0,
"sum": None,
},
{
"created": None,
"created_ts": six_months_ago.timestamp(),
"id": 2,
"last_reset": None,
"last_reset_ts": six_months_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": None,
"start_ts": six_months_ago.timestamp(),
"state": 1.0,
"sum": None,
},
{
"created": process_timestamp(one_month_ago).replace(tzinfo=None),
"created_ts": one_month_ago.timestamp(),
"id": 3,
"last_reset": process_timestamp(one_month_ago).replace(tzinfo=None),
"last_reset_ts": one_month_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": process_timestamp(one_month_ago).replace(tzinfo=None),
"start_ts": one_month_ago.timestamp(),
"state": 1.0,
"sum": None,
},
]
async def test_stats_timestamp_with_one_by_one(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
) -> None:
"""Test stats migration with one by one."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
await async_attach_db_engine(hass)
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
now = dt_util.utcnow()
one_year_ago = now - datetime.timedelta(days=365)
six_months_ago = now - datetime.timedelta(days=180)
one_month_ago = now - datetime.timedelta(days=30)
def _do_migration():
with patch.object(
migration,
"_migrate_statistics_columns_to_timestamp",
side_effect=IntegrityError("test", "test", "test"),
):
migration._migrate_statistics_columns_to_timestamp_removing_duplicates(
hass, instance, instance.get_session, instance.engine
)
def _insert_fake_metadata():
with session_scope(hass=hass) as session:
session.add(
old_db_schema.StatisticsMeta(
id=1000,
statistic_id="test",
source="test",
unit_of_measurement="test",
has_mean=True,
has_sum=True,
name="1",
)
)
def _insert_pre_timestamp_stat(date_time: datetime) -> None:
with session_scope(hass=hass) as session:
session.add_all(
(
old_db_schema.StatisticsShortTerm(
metadata_id=1000,
created=date_time,
created_ts=None,
start=date_time,
start_ts=None,
last_reset=date_time,
last_reset_ts=None,
state="1",
),
old_db_schema.Statistics(
metadata_id=1000,
created=date_time,
created_ts=None,
start=date_time,
start_ts=None,
last_reset=date_time,
last_reset_ts=None,
state="1",
),
)
)
def _insert_post_timestamp_stat(date_time: datetime) -> None:
with session_scope(hass=hass) as session:
session.add_all(
(
db_schema.StatisticsShortTerm(
metadata_id=1000,
created=None,
created_ts=date_time.timestamp(),
start=None,
start_ts=date_time.timestamp(),
last_reset=None,
last_reset_ts=date_time.timestamp(),
state="1",
),
db_schema.Statistics(
metadata_id=1000,
created=None,
created_ts=date_time.timestamp(),
start=None,
start_ts=date_time.timestamp(),
last_reset=None,
last_reset_ts=date_time.timestamp(),
state="1",
),
)
)
def _get_all_stats(table: old_db_schema.StatisticsBase) -> list[dict[str, Any]]:
"""Get all stats from a table."""
with session_scope(hass=hass) as session:
results = []
for result in session.query(table).where(table.metadata_id == 1000).all():
results.append(
{
field.name: getattr(result, field.name)
for field in table.__table__.c
}
)
return sorted(results, key=lambda row: row["start_ts"])
def _insert_and_do_migration():
_insert_fake_metadata()
_insert_pre_timestamp_stat(one_year_ago)
_insert_post_timestamp_stat(six_months_ago)
_insert_pre_timestamp_stat(one_month_ago)
_do_migration()
await hass.async_add_executor_job(_insert_and_do_migration)
final_short_term_result = await hass.async_add_executor_job(
_get_all_stats, old_db_schema.StatisticsShortTerm
)
final_short_term_result = sorted(
final_short_term_result, key=lambda row: row["start_ts"]
)
assert final_short_term_result == [
{
"created": None,
"created_ts": one_year_ago.timestamp(),
"id": 1,
"last_reset": None,
"last_reset_ts": one_year_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": None,
"start_ts": one_year_ago.timestamp(),
"state": 1.0,
"sum": None,
},
{
"created": None,
"created_ts": six_months_ago.timestamp(),
"id": 2,
"last_reset": None,
"last_reset_ts": six_months_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": None,
"start_ts": six_months_ago.timestamp(),
"state": 1.0,
"sum": None,
},
{
"created": None,
"created_ts": one_month_ago.timestamp(),
"id": 3,
"last_reset": None,
"last_reset_ts": one_month_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": None,
"start_ts": one_month_ago.timestamp(),
"state": 1.0,
"sum": None,
},
]
final_result = await hass.async_add_executor_job(
_get_all_stats, old_db_schema.Statistics
)
final_result = sorted(final_result, key=lambda row: row["start_ts"])
assert final_result == [
{
"created": None,
"created_ts": one_year_ago.timestamp(),
"id": 1,
"last_reset": None,
"last_reset_ts": one_year_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": None,
"start_ts": one_year_ago.timestamp(),
"state": 1.0,
"sum": None,
},
{
"created": None,
"created_ts": six_months_ago.timestamp(),
"id": 2,
"last_reset": None,
"last_reset_ts": six_months_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": None,
"start_ts": six_months_ago.timestamp(),
"state": 1.0,
"sum": None,
},
{
"created": None,
"created_ts": one_month_ago.timestamp(),
"id": 3,
"last_reset": None,
"last_reset_ts": one_month_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": None,
"start_ts": one_month_ago.timestamp(),
"state": 1.0,
"sum": None,
},
]
async def test_stats_timestamp_with_one_by_one_removes_duplicates(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
) -> None:
"""Test stats migration with one by one removes duplicates."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
await async_attach_db_engine(hass)
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
now = dt_util.utcnow()
one_year_ago = now - datetime.timedelta(days=365)
six_months_ago = now - datetime.timedelta(days=180)
one_month_ago = now - datetime.timedelta(days=30)
def _do_migration():
with patch.object(
migration,
"_migrate_statistics_columns_to_timestamp",
side_effect=IntegrityError("test", "test", "test"),
), patch.object(
migration,
"migrate_single_statistics_row_to_timestamp",
side_effect=IntegrityError("test", "test", "test"),
):
migration._migrate_statistics_columns_to_timestamp_removing_duplicates(
hass, instance, instance.get_session, instance.engine
)
def _insert_fake_metadata():
with session_scope(hass=hass) as session:
session.add(
old_db_schema.StatisticsMeta(
id=1000,
statistic_id="test",
source="test",
unit_of_measurement="test",
has_mean=True,
has_sum=True,
name="1",
)
)
def _insert_pre_timestamp_stat(date_time: datetime) -> None:
with session_scope(hass=hass) as session:
session.add_all(
(
old_db_schema.StatisticsShortTerm(
metadata_id=1000,
created=date_time,
created_ts=None,
start=date_time,
start_ts=None,
last_reset=date_time,
last_reset_ts=None,
state="1",
),
old_db_schema.Statistics(
metadata_id=1000,
created=date_time,
created_ts=None,
start=date_time,
start_ts=None,
last_reset=date_time,
last_reset_ts=None,
state="1",
),
)
)
def _insert_post_timestamp_stat(date_time: datetime) -> None:
with session_scope(hass=hass) as session:
session.add_all(
(
db_schema.StatisticsShortTerm(
metadata_id=1000,
created=None,
created_ts=date_time.timestamp(),
start=None,
start_ts=date_time.timestamp(),
last_reset=None,
last_reset_ts=date_time.timestamp(),
state="1",
),
db_schema.Statistics(
metadata_id=1000,
created=None,
created_ts=date_time.timestamp(),
start=None,
start_ts=date_time.timestamp(),
last_reset=None,
last_reset_ts=date_time.timestamp(),
state="1",
),
)
)
def _get_all_stats(table: old_db_schema.StatisticsBase) -> list[dict[str, Any]]:
"""Get all stats from a table."""
with session_scope(hass=hass) as session:
results = []
for result in session.query(table).where(table.metadata_id == 1000).all():
results.append(
{
field.name: getattr(result, field.name)
for field in table.__table__.c
}
)
return sorted(results, key=lambda row: row["start_ts"])
def _insert_and_do_migration():
_insert_fake_metadata()
_insert_pre_timestamp_stat(one_year_ago)
_insert_post_timestamp_stat(six_months_ago)
_insert_pre_timestamp_stat(one_month_ago)
_do_migration()
await hass.async_add_executor_job(_insert_and_do_migration)
final_short_term_result = await hass.async_add_executor_job(
_get_all_stats, old_db_schema.StatisticsShortTerm
)
final_short_term_result = sorted(
final_short_term_result, key=lambda row: row["start_ts"]
)
assert final_short_term_result == [
{
"created": None,
"created_ts": one_year_ago.timestamp(),
"id": 1,
"last_reset": None,
"last_reset_ts": one_year_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": None,
"start_ts": one_year_ago.timestamp(),
"state": 1.0,
"sum": None,
},
{
"created": None,
"created_ts": six_months_ago.timestamp(),
"id": 2,
"last_reset": None,
"last_reset_ts": six_months_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": None,
"start_ts": six_months_ago.timestamp(),
"state": 1.0,
"sum": None,
},
{
"created": None,
"created_ts": one_month_ago.timestamp(),
"id": 3,
"last_reset": None,
"last_reset_ts": one_month_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": None,
"start_ts": one_month_ago.timestamp(),
"state": 1.0,
"sum": None,
},
]
# All the duplicates should have been removed but
# the non-duplicates should still be there
final_result = await hass.async_add_executor_job(
_get_all_stats, old_db_schema.Statistics
)
assert final_result == [
{
"created": None,
"created_ts": six_months_ago.timestamp(),
"id": 2,
"last_reset": None,
"last_reset_ts": six_months_ago.timestamp(),
"max": None,
"mean": None,
"metadata_id": 1000,
"min": None,
"start": None,
"start_ts": six_months_ago.timestamp(),
"state": 1.0,
"sum": None,
},
]

View file

@ -3,7 +3,7 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
import json import json
import sqlite3 import sqlite3
from unittest.mock import MagicMock, patch from unittest.mock import patch
from freezegun import freeze_time from freezegun import freeze_time
import pytest import pytest
@ -27,6 +27,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
from .common import ( from .common import (
async_attach_db_engine,
async_recorder_block_till_done, async_recorder_block_till_done,
async_wait_purge_done, async_wait_purge_done,
async_wait_recording_done, async_wait_recording_done,
@ -64,25 +65,12 @@ def mock_use_sqlite(request):
yield yield
async def _async_attach_db_engine(hass: HomeAssistant) -> None:
"""Attach a database engine to the recorder."""
instance = recorder.get_instance(hass)
def _mock_setup_recorder_connection():
with instance.engine.connect() as connection:
instance._setup_recorder_connection(
connection._dbapi_connection, MagicMock()
)
await instance.async_add_executor_job(_mock_setup_recorder_connection)
async def test_purge_old_states( async def test_purge_old_states(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None: ) -> None:
"""Test deleting old states.""" """Test deleting old states."""
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
await _add_test_states(hass) await _add_test_states(hass)
@ -178,7 +166,7 @@ async def test_purge_old_states_encouters_database_corruption(
return return
await async_setup_recorder_instance(hass) await async_setup_recorder_instance(hass)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
await _add_test_states(hass) await _add_test_states(hass)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
@ -211,7 +199,7 @@ async def test_purge_old_states_encounters_temporary_mysql_error(
) -> None: ) -> None:
"""Test retry on specific mysql operational errors.""" """Test retry on specific mysql operational errors."""
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
await _add_test_states(hass) await _add_test_states(hass)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
@ -243,7 +231,7 @@ async def test_purge_old_states_encounters_operational_error(
) -> None: ) -> None:
"""Test error on operational errors that are not mysql does not retry.""" """Test error on operational errors that are not mysql does not retry."""
await async_setup_recorder_instance(hass) await async_setup_recorder_instance(hass)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
await _add_test_states(hass) await _add_test_states(hass)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
@ -268,7 +256,7 @@ async def test_purge_old_events(
) -> None: ) -> None:
"""Test deleting old events.""" """Test deleting old events."""
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
await _add_test_events(hass) await _add_test_events(hass)
@ -306,7 +294,7 @@ async def test_purge_old_recorder_runs(
) -> None: ) -> None:
"""Test deleting old recorder runs keeps current run.""" """Test deleting old recorder runs keeps current run."""
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
await _add_test_recorder_runs(hass) await _add_test_recorder_runs(hass)
@ -343,7 +331,7 @@ async def test_purge_old_statistics_runs(
) -> None: ) -> None:
"""Test deleting old statistics runs keeps the latest run.""" """Test deleting old statistics runs keeps the latest run."""
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
await _add_test_statistics_runs(hass) await _add_test_statistics_runs(hass)
@ -384,7 +372,7 @@ async def test_purge_method(
assert run1.start == run2.start assert run1.start == run2.start
await async_setup_recorder_instance(hass) await async_setup_recorder_instance(hass)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
service_data = {"keep_days": 4} service_data = {"keep_days": 4}
await _add_test_events(hass) await _add_test_events(hass)
@ -522,7 +510,7 @@ async def test_purge_edge_case(
) )
await async_setup_recorder_instance(hass, None) await async_setup_recorder_instance(hass, None)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
await async_wait_purge_done(hass) await async_wait_purge_done(hass)
@ -621,7 +609,7 @@ async def test_purge_cutoff_date(
) )
instance = await async_setup_recorder_instance(hass, None) instance = await async_setup_recorder_instance(hass, None)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
await async_wait_purge_done(hass) await async_wait_purge_done(hass)
@ -948,7 +936,7 @@ async def test_purge_many_old_events(
) -> None: ) -> None:
"""Test deleting old events.""" """Test deleting old events."""
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
old_events_count = 5 old_events_count = 5
with patch.object(instance, "max_bind_vars", old_events_count), patch.object( with patch.object(instance, "max_bind_vars", old_events_count), patch.object(
@ -1001,7 +989,7 @@ async def test_purge_can_mix_legacy_and_new_format(
) -> None: ) -> None:
"""Test purging with legacy and new events.""" """Test purging with legacy and new events."""
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
# New databases are no longer created with the legacy events index # New databases are no longer created with the legacy events index
@ -1114,7 +1102,7 @@ async def test_purge_can_mix_legacy_and_new_format_with_detached_state(
return pytest.skip("This tests disables foreign key checks on SQLite") return pytest.skip("This tests disables foreign key checks on SQLite")
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
# New databases are no longer created with the legacy events index # New databases are no longer created with the legacy events index
@ -1254,7 +1242,7 @@ async def test_purge_entities_keep_days(
) -> None: ) -> None:
"""Test purging states with an entity filter and keep_days.""" """Test purging states with an entity filter and keep_days."""
instance = await async_setup_recorder_instance(hass, {}) instance = await async_setup_recorder_instance(hass, {})
await _async_attach_db_engine(hass) await async_attach_db_engine(hass)
await hass.async_block_till_done() await hass.async_block_till_done()
await async_wait_recording_done(hass) await async_wait_recording_done(hass)