Guard against db corruption when renaming entities (#112718)

This commit is contained in:
J. Nick Koston 2024-03-08 11:34:07 -10:00 committed by GitHub
parent d868b8d4c5
commit af6f2a516e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 364 additions and 60 deletions

View file

@ -925,7 +925,7 @@ class Recorder(threading.Thread):
# that is pending before running the task
if TYPE_CHECKING:
assert isinstance(task, RecorderTask)
if not task.commit_before:
if task.commit_before:
self._commit_event_session_or_retry()
return task.run(self)
except exc.DatabaseError as err:

View file

@ -7,7 +7,7 @@ from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.start import async_at_start
from .core import Recorder
from .util import get_instance, session_scope
from .util import filter_unique_constraint_integrity_error, get_instance, session_scope
_LOGGER = logging.getLogger(__name__)
@ -62,7 +62,10 @@ def update_states_metadata(
)
return
with session_scope(session=instance.get_session()) as session:
with session_scope(
session=instance.get_session(),
exception_filter=filter_unique_constraint_integrity_error(instance, "state"),
) as session:
if not states_meta_manager.update_metadata(session, entity_id, new_entity_id):
_LOGGER.warning(
"Cannot migrate history for entity_id `%s` to `%s` "

View file

@ -4,7 +4,6 @@ from __future__ import annotations
from collections import defaultdict
from collections.abc import Callable, Iterable, Sequence
import contextlib
import dataclasses
from datetime import datetime, timedelta
from functools import lru_cache, partial
@ -16,7 +15,7 @@ from typing import TYPE_CHECKING, Any, Literal, TypedDict, cast
from sqlalchemy import Select, and_, bindparam, func, lambda_stmt, select, text
from sqlalchemy.engine.row import Row
from sqlalchemy.exc import SQLAlchemyError, StatementError
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.lambdas import StatementLambdaElement
import voluptuous as vol
@ -73,6 +72,7 @@ from .models import (
from .util import (
execute,
execute_stmt_lambda_element,
filter_unique_constraint_integrity_error,
get_instance,
retryable_database_job,
session_scope,
@ -455,7 +455,9 @@ def compile_missing_statistics(instance: Recorder) -> bool:
with session_scope(
session=instance.get_session(),
exception_filter=_filter_unique_constraint_integrity_error(instance),
exception_filter=filter_unique_constraint_integrity_error(
instance, "statistic"
),
) as session:
# Find the newest statistics run, if any
if last_run := session.query(func.max(StatisticsRuns.start)).scalar():
@ -487,7 +489,9 @@ def compile_statistics(instance: Recorder, start: datetime, fire_events: bool) -
# Return if we already have 5-minute statistics for the requested period
with session_scope(
session=instance.get_session(),
exception_filter=_filter_unique_constraint_integrity_error(instance),
exception_filter=filter_unique_constraint_integrity_error(
instance, "statistic"
),
) as session:
modified_statistic_ids = _compile_statistics(
instance, session, start, fire_events
@ -738,7 +742,9 @@ def update_statistics_metadata(
if new_statistic_id is not UNDEFINED and new_statistic_id is not None:
with session_scope(
session=instance.get_session(),
exception_filter=_filter_unique_constraint_integrity_error(instance),
exception_filter=filter_unique_constraint_integrity_error(
instance, "statistic"
),
) as session:
statistics_meta_manager.update_statistic_id(
session, DOMAIN, statistic_id, new_statistic_id
@ -2247,54 +2253,6 @@ def async_add_external_statistics(
_async_import_statistics(hass, metadata, statistics)
def _filter_unique_constraint_integrity_error(
instance: Recorder,
) -> Callable[[Exception], bool]:
def _filter_unique_constraint_integrity_error(err: Exception) -> bool:
"""Handle unique constraint integrity errors."""
if not isinstance(err, StatementError):
return False
assert instance.engine is not None
dialect_name = instance.engine.dialect.name
ignore = False
if (
dialect_name == SupportedDialect.SQLITE
and "UNIQUE constraint failed" in str(err)
):
ignore = True
if (
dialect_name == SupportedDialect.POSTGRESQL
and err.orig
and hasattr(err.orig, "pgcode")
and err.orig.pgcode == "23505"
):
ignore = True
if (
dialect_name == SupportedDialect.MYSQL
and err.orig
and hasattr(err.orig, "args")
):
with contextlib.suppress(TypeError):
if err.orig.args[0] == 1062:
ignore = True
if ignore:
_LOGGER.warning(
(
"Blocked attempt to insert duplicated statistic rows, please report"
" at %s"
),
"https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22",
exc_info=err,
)
return ignore
return _filter_unique_constraint_integrity_error
def _import_statistics_with_session(
instance: Recorder,
session: Session,
@ -2398,7 +2356,9 @@ def import_statistics(
with session_scope(
session=instance.get_session(),
exception_filter=_filter_unique_constraint_integrity_error(instance),
exception_filter=filter_unique_constraint_integrity_error(
instance, "statistic"
),
) as session:
return _import_statistics_with_session(
instance, session, metadata, statistics, table

View file

@ -308,11 +308,18 @@ class StatisticsMetaManager:
recorder thread.
"""
self._assert_in_recorder_thread()
if self.get(session, new_statistic_id):
_LOGGER.error(
"Cannot rename statistic_id `%s` to `%s` because the new statistic_id is already in use",
old_statistic_id,
new_statistic_id,
)
return
session.query(StatisticsMeta).filter(
(StatisticsMeta.statistic_id == old_statistic_id)
& (StatisticsMeta.source == source)
).update({StatisticsMeta.statistic_id: new_statistic_id})
self._clear_cache([old_statistic_id, new_statistic_id])
self._clear_cache([old_statistic_id])
def delete(self, session: Session, statistic_ids: list[str]) -> None:
"""Clear statistics for a list of statistic_ids.

View file

@ -3,6 +3,7 @@
from __future__ import annotations
from collections.abc import Callable, Collection, Generator, Iterable, Sequence
import contextlib
from contextlib import contextmanager
from datetime import date, datetime, timedelta
import functools
@ -22,7 +23,7 @@ import ciso8601
from sqlalchemy import inspect, text
from sqlalchemy.engine import Result, Row
from sqlalchemy.engine.interfaces import DBAPIConnection
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.exc import OperationalError, SQLAlchemyError, StatementError
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.lambdas import StatementLambdaElement
@ -907,3 +908,54 @@ def get_index_by_name(session: Session, table_name: str, index_name: str) -> str
),
None,
)
def filter_unique_constraint_integrity_error(
instance: Recorder, row_type: str
) -> Callable[[Exception], bool]:
"""Create a filter for unique constraint integrity errors."""
def _filter_unique_constraint_integrity_error(err: Exception) -> bool:
"""Handle unique constraint integrity errors."""
if not isinstance(err, StatementError):
return False
assert instance.engine is not None
dialect_name = instance.engine.dialect.name
ignore = False
if (
dialect_name == SupportedDialect.SQLITE
and "UNIQUE constraint failed" in str(err)
):
ignore = True
if (
dialect_name == SupportedDialect.POSTGRESQL
and err.orig
and hasattr(err.orig, "pgcode")
and err.orig.pgcode == "23505"
):
ignore = True
if (
dialect_name == SupportedDialect.MYSQL
and err.orig
and hasattr(err.orig, "args")
):
with contextlib.suppress(TypeError):
if err.orig.args[0] == 1062:
ignore = True
if ignore:
_LOGGER.warning(
(
"Blocked attempt to insert duplicated %s rows, please report"
" at %s"
),
row_type,
"https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22",
exc_info=err,
)
return ignore
return _filter_unique_constraint_integrity_error

View file

@ -1,11 +1,13 @@
"""The tests for sensor recorder platform."""
from collections.abc import Callable
from unittest.mock import patch
import pytest
from sqlalchemy import select
from sqlalchemy.orm import Session
from homeassistant.components import recorder
from homeassistant.components.recorder import history
from homeassistant.components.recorder.db_schema import StatesMeta
from homeassistant.components.recorder.util import session_scope
@ -261,4 +263,101 @@ def test_rename_entity_collision(
assert _count_entity_id_in_states_meta(hass, session, "sensor.test99") == 1
assert _count_entity_id_in_states_meta(hass, session, "sensor.test1") == 1
# We should hit the safeguard in the states_meta_manager
assert "the new entity_id is already in use" in caplog.text
# We should not hit the safeguard in the entity_registry
assert "Blocked attempt to insert duplicated state rows" not in caplog.text
def test_rename_entity_collision_without_states_meta_safeguard(
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture
) -> None:
"""Test states meta is not migrated when there is a collision.
This test disables the safeguard in the states_meta_manager
and relies on the filter_unique_constraint_integrity_error safeguard.
"""
hass = hass_recorder()
setup_component(hass, "sensor", {})
entity_reg = mock_registry(hass)
@callback
def add_entry():
reg_entry = entity_reg.async_get_or_create(
"sensor",
"test",
"unique_0000",
suggested_object_id="test1",
)
assert reg_entry.entity_id == "sensor.test1"
hass.add_job(add_entry)
hass.block_till_done()
zero, four, states = record_states(hass)
hist = history.get_significant_states(
hass, zero, four, list(set(states) | {"sensor.test99", "sensor.test1"})
)
assert_dict_of_states_equal_without_context_and_last_changed(states, hist)
assert len(hist["sensor.test1"]) == 3
hass.states.set("sensor.test99", "collision")
hass.states.remove("sensor.test99")
hass.block_till_done()
wait_recording_done(hass)
# Verify history before collision
hist = history.get_significant_states(
hass, zero, four, list(set(states) | {"sensor.test99", "sensor.test1"})
)
assert len(hist["sensor.test1"]) == 3
assert len(hist["sensor.test99"]) == 2
instance = recorder.get_instance(hass)
# Patch out the safeguard in the states meta manager
# so that we hit the filter_unique_constraint_integrity_error safeguard in the entity_registry
with patch.object(instance.states_meta_manager, "get", return_value=None):
# Rename entity sensor.test1 to sensor.test99
@callback
def rename_entry():
entity_reg.async_update_entity(
"sensor.test1", new_entity_id="sensor.test99"
)
hass.add_job(rename_entry)
wait_recording_done(hass)
# History is not migrated on collision
hist = history.get_significant_states(
hass, zero, four, list(set(states) | {"sensor.test99", "sensor.test1"})
)
assert len(hist["sensor.test1"]) == 3
assert len(hist["sensor.test99"]) == 2
with session_scope(hass=hass) as session:
assert _count_entity_id_in_states_meta(hass, session, "sensor.test99") == 1
hass.states.set("sensor.test99", "post_migrate")
wait_recording_done(hass)
new_hist = history.get_significant_states(
hass,
zero,
dt_util.utcnow(),
list(set(states) | {"sensor.test99", "sensor.test1"}),
)
assert new_hist["sensor.test99"][-1].state == "post_migrate"
assert len(hist["sensor.test99"]) == 2
with session_scope(hass=hass) as session:
assert _count_entity_id_in_states_meta(hass, session, "sensor.test99") == 1
assert _count_entity_id_in_states_meta(hass, session, "sensor.test1") == 1
# We should not hit the safeguard in the states_meta_manager
assert "the new entity_id is already in use" not in caplog.text
# We should hit the safeguard in the entity_registry
assert "Blocked attempt to insert duplicated state rows" in caplog.text

View file

@ -2490,3 +2490,73 @@ async def test_events_are_recorded_until_final_write(
await hass.async_block_till_done()
assert not instance.engine
async def test_commit_before_commits_pending_writes(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
recorder_db_url: str,
tmp_path: Path,
) -> None:
"""Test commit_before with a non-zero commit interval.
All of our test run with a commit interval of 0 by
default, so we need to test this with a non-zero commit
"""
config = {
recorder.CONF_DB_URL: recorder_db_url,
recorder.CONF_COMMIT_INTERVAL: 60,
}
recorder_helper.async_initialize_recorder(hass)
hass.create_task(async_setup_recorder_instance(hass, config))
await recorder_helper.async_wait_recorder(hass)
instance = get_instance(hass)
assert instance.commit_interval == 60
verify_states_in_queue_future = hass.loop.create_future()
verify_session_commit_future = hass.loop.create_future()
class VerifyCommitBeforeTask(recorder.tasks.RecorderTask):
"""Task to verify that commit before ran.
If commit_before is true, we should have no pending writes.
"""
commit_before = True
def run(self, instance: Recorder) -> None:
if not instance._event_session_has_pending_writes:
hass.loop.call_soon_threadsafe(
verify_session_commit_future.set_result, None
)
return
hass.loop.call_soon_threadsafe(
verify_session_commit_future.set_exception,
RuntimeError("Session still has pending write"),
)
class VerifyStatesInQueueTask(recorder.tasks.RecorderTask):
"""Task to verify that states are in the queue."""
commit_before = False
def run(self, instance: Recorder) -> None:
if instance._event_session_has_pending_writes:
hass.loop.call_soon_threadsafe(
verify_states_in_queue_future.set_result, None
)
return
hass.loop.call_soon_threadsafe(
verify_states_in_queue_future.set_exception,
RuntimeError("Session has no pending write"),
)
# First insert an event
instance.queue_task(Event("fake_event"))
# Next verify that the event session has pending writes
instance.queue_task(VerifyStatesInQueueTask())
# Finally, verify that the session was committed
instance.queue_task(VerifyCommitBeforeTask())
await verify_states_in_queue_future
await verify_session_commit_future

View file

@ -454,7 +454,11 @@ def test_statistics_during_period_set_back_compat(
def test_rename_entity_collision(
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture
) -> None:
"""Test statistics is migrated when entity_id is changed."""
"""Test statistics is migrated when entity_id is changed.
This test relies on the the safeguard in the statistics_meta_manager
and should not hit the filter_unique_constraint_integrity_error safeguard.
"""
hass = hass_recorder()
setup_component(hass, "sensor", {})
@ -531,8 +535,117 @@ def test_rename_entity_collision(
# Statistics failed to migrate due to the collision
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {"sensor.test1": expected_stats1, "sensor.test2": expected_stats2}
# Verify the safeguard in the states meta manager was hit
assert (
"Cannot rename statistic_id `sensor.test1` to `sensor.test99` "
"because the new statistic_id is already in use"
) in caplog.text
# Verify the filter_unique_constraint_integrity_error safeguard was not hit
assert "Blocked attempt to insert duplicated statistic rows" not in caplog.text
def test_rename_entity_collision_states_meta_check_disabled(
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture
) -> None:
"""Test statistics is migrated when entity_id is changed.
This test disables the safeguard in the statistics_meta_manager
and relies on the filter_unique_constraint_integrity_error safeguard.
"""
hass = hass_recorder()
setup_component(hass, "sensor", {})
entity_reg = mock_registry(hass)
@callback
def add_entry():
reg_entry = entity_reg.async_get_or_create(
"sensor",
"test",
"unique_0000",
suggested_object_id="test1",
)
assert reg_entry.entity_id == "sensor.test1"
hass.add_job(add_entry)
hass.block_till_done()
zero, four, states = record_states(hass)
hist = history.get_significant_states(hass, zero, four, list(states))
assert_dict_of_states_equal_without_context_and_last_changed(states, hist)
for kwargs in ({}, {"statistic_ids": ["sensor.test1"]}):
stats = statistics_during_period(hass, zero, period="5minute", **kwargs)
assert stats == {}
stats = get_last_short_term_statistics(
hass,
0,
"sensor.test1",
True,
{"last_reset", "max", "mean", "min", "state", "sum"},
)
assert stats == {}
do_adhoc_statistics(hass, start=zero)
wait_recording_done(hass)
expected_1 = {
"start": process_timestamp(zero).timestamp(),
"end": process_timestamp(zero + timedelta(minutes=5)).timestamp(),
"mean": pytest.approx(14.915254237288135),
"min": pytest.approx(10.0),
"max": pytest.approx(20.0),
"last_reset": None,
"state": None,
"sum": None,
}
expected_stats1 = [expected_1]
expected_stats2 = [expected_1]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {"sensor.test1": expected_stats1, "sensor.test2": expected_stats2}
# Insert metadata for sensor.test99
metadata_1 = {
"has_mean": True,
"has_sum": False,
"name": "Total imported energy",
"source": "test",
"statistic_id": "sensor.test99",
"unit_of_measurement": "kWh",
}
with session_scope(hass=hass) as session:
session.add(recorder.db_schema.StatisticsMeta.from_meta(metadata_1))
instance = recorder.get_instance(hass)
# Patch out the safeguard in the states meta manager
# so that we hit the filter_unique_constraint_integrity_error safeguard in the statistics
with patch.object(instance.statistics_meta_manager, "get", return_value=None):
# Rename entity sensor.test1 to sensor.test99
@callback
def rename_entry():
entity_reg.async_update_entity(
"sensor.test1", new_entity_id="sensor.test99"
)
hass.add_job(rename_entry)
wait_recording_done(hass)
# Statistics failed to migrate due to the collision
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {"sensor.test1": expected_stats1, "sensor.test2": expected_stats2}
# Verify the filter_unique_constraint_integrity_error safeguard was hit
assert "Blocked attempt to insert duplicated statistic rows" in caplog.text
# Verify the safeguard in the states meta manager was not hit
assert (
"Cannot rename statistic_id `sensor.test1` to `sensor.test99` "
"because the new statistic_id is already in use"
) not in caplog.text
def test_statistics_duplicated(
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture