From 7206f199ff035478cb5c8d7e0b8e0c3d4571cf94 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Thu, 2 Dec 2021 19:43:34 +0000 Subject: [PATCH 01/18] Allow to set CONF_DB_URL This is useful for test which need a custom DB path. --- tests/common.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/common.py b/tests/common.py index 19f0aaec44b..cf06ba4deae 100644 --- a/tests/common.py +++ b/tests/common.py @@ -897,10 +897,10 @@ def init_recorder_component(hass, add_config=None): _LOGGER.info("In-memory recorder successfully started") -async def async_init_recorder_component(hass, add_config=None): +async def async_init_recorder_component(hass, config={}): """Initialize the recorder asynchronously.""" - config = dict(add_config) if add_config else {} - config[recorder.CONF_DB_URL] = "sqlite://" + if recorder.CONF_DB_URL not in config: + config[recorder.CONF_DB_URL] = "sqlite://" with patch("homeassistant.components.recorder.migration.migrate_schema"): assert await async_setup_component( From 6ec5280842ba2c74dab2c237708cc0b59f991420 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Thu, 2 Dec 2021 19:46:03 +0000 Subject: [PATCH 02/18] Introduce write_lock_db helper to lock SQLite database --- homeassistant/components/recorder/util.py | 19 +++++++++++++++++++ tests/components/recorder/test_util.py | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index c63f6abee3a..3900641db63 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -457,6 +457,25 @@ def perodic_db_cleanups(instance: Recorder): connection.execute(text("PRAGMA wal_checkpoint(TRUNCATE);")) +@contextmanager +def write_lock_db(instance: Recorder): + """Lock database for writes.""" + + if instance.engine.dialect.name == "sqlite": + with instance.engine.connect() as connection: + # Execute sqlite to create a wal checkpoint + # This is optional but makes sure the backup is going to be minimal + connection.execute(text("PRAGMA wal_checkpoint(TRUNCATE)")) + # Create write lock + _LOGGER.debug("Lock database") + connection.execute(text("BEGIN IMMEDIATE;")) + try: + yield + finally: + _LOGGER.debug("Unlock database") + connection.execute(text("END;")) + + def async_migration_in_progress(hass: HomeAssistant) -> bool: """Determine is a migration is in progress. diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index 940925c48ca..16c76d01916 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -7,6 +7,7 @@ from unittest.mock import MagicMock, patch import pytest from sqlalchemy import text from sqlalchemy.sql.elements import TextClause +from homeassistant.components import recorder from homeassistant.components.recorder import run_information_with_session, util from homeassistant.components.recorder.const import DATA_INSTANCE, SQLITE_URL_PREFIX @@ -556,3 +557,21 @@ def test_perodic_db_cleanups(hass_recorder): ][0] assert isinstance(text_obj, TextClause) assert str(text_obj) == "PRAGMA wal_checkpoint(TRUNCATE);" + + +async def test_write_lock_db(hass, tmp_path): + """Test database write lock.""" + from sqlalchemy.exc import OperationalError + + # Use file DB, in memory DB cannot do write locks. + config = {recorder.CONF_DB_URL: "sqlite:///" + str(tmp_path / "pytest.db")} + await async_init_recorder_component(hass, config) + await hass.async_block_till_done() + + instance = hass.data[DATA_INSTANCE] + + with util.write_lock_db(instance): + # Database should be locked now, try writing SQL command + with instance.engine.connect() as connection: + with pytest.raises(OperationalError): + connection.execute(text("DROP TABLE events;")) From 4240b126fb705d21eea3059a46374df2356db063 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Wed, 1 Dec 2021 12:55:41 +0000 Subject: [PATCH 03/18] Introduce Websocket API which allows to lock database during backup --- homeassistant/components/recorder/__init__.py | 89 +++++++++++++++++-- .../components/recorder/websocket_api.py | 40 +++++++++ .../components/recorder/test_websocket_api.py | 41 +++++++++ 3 files changed, 161 insertions(+), 9 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index da3955cb9b8..18a3d5a2088 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio from collections.abc import Callable, Iterable import concurrent.futures +from dataclasses import dataclass from datetime import datetime, timedelta import logging import queue @@ -76,6 +77,7 @@ from .util import ( session_scope, setup_connection_for_dialect, validate_or_move_away_sqlite_database, + write_lock_db, ) _LOGGER = logging.getLogger(__name__) @@ -370,6 +372,15 @@ class WaitTask: """An object to insert into the recorder queue to tell it set the _queue_watch event.""" +@dataclass +class DatabaseLockTask: + """An object to insert into the recorder queue to prevent writes to the database.""" + + database_locked: threading.Event + database_unlock: threading.Event + queue_overflow: bool + + class Recorder(threading.Thread): """A threaded recorder class.""" @@ -419,6 +430,7 @@ class Recorder(threading.Thread): self.migration_in_progress = False self._queue_watcher = None self._db_supports_row_number = True + self._database_lock_task: DatabaseLockTask | None = None self.enabled = True @@ -687,6 +699,8 @@ class Recorder(threading.Thread): def _process_one_event_or_recover(self, event): """Process an event, reconnect, or recover a malformed database.""" try: + if self._process_one_task(event): + return self._process_one_event(event) return except exc.DatabaseError as err: @@ -788,34 +802,56 @@ class Recorder(threading.Thread): # Schedule a new statistics task if this one didn't finish self.queue.put(ExternalStatisticsTask(metadata, stats)) - def _process_one_event(self, event): + def _lock_database(self, task: DatabaseLockTask): + with write_lock_db(self): + # Notify that lock is being held, wait until database can be used again. + task.database_locked.set() + while not task.database_unlock.wait(timeout=1): + if self.queue.qsize() > MAX_QUEUE_BACKLOG * 0.9: + _LOGGER.warning( + "Database queue backlog reached more than 90% of maximum queue length. Continue writing. Your Backup might be corrupted." + ) + task.queue_overflow = True + break + _LOGGER.info( + "Database queue backlog reached %d entries during backup.", + self.queue.qsize(), + ) + + def _process_one_task(self, event) -> bool: """Process one event.""" if isinstance(event, PurgeTask): self._run_purge(event.purge_before, event.repack, event.apply_filter) - return + return True if isinstance(event, PurgeEntitiesTask): self._run_purge_entities(event.entity_filter) - return + return True if isinstance(event, PerodicCleanupTask): perodic_db_cleanups(self) - return + return True if isinstance(event, StatisticsTask): self._run_statistics(event.start) - return + return True if isinstance(event, ClearStatisticsTask): statistics.clear_statistics(self, event.statistic_ids) - return + return True if isinstance(event, UpdateStatisticsMetadataTask): statistics.update_statistics_metadata( self, event.statistic_id, event.unit_of_measurement ) - return + return True if isinstance(event, ExternalStatisticsTask): self._run_external_statistics(event.metadata, event.statistics) - return + return True if isinstance(event, WaitTask): self._queue_watch.set() - return + return True + if isinstance(event, DatabaseLockTask): + self._lock_database(event) + return True + return False + + def _process_one_event(self, event): if event.event_type == EVENT_TIME_CHANGED: self._keepalive_count += 1 if self._keepalive_count >= KEEPALIVE_TIME: @@ -982,6 +1018,41 @@ class Recorder(threading.Thread): self.queue.put(WaitTask()) self._queue_watch.wait() + async def lock_database(self) -> bool: + """Lock database so it can be backed up safely.""" + if self._database_lock_task: + _LOGGER.warning("Database already locked.") + return False + + task = DatabaseLockTask(threading.Event(), threading.Event(), False) + self.queue.put(task) + lock_timeout = 30 + if not await self.hass.async_add_executor_job( + task.database_locked.wait, lock_timeout + ): + raise TimeoutError( + f"Could not lock database within {lock_timeout} seconds." + ) + self._database_lock_task = task + return True + + @callback + def unlock_database(self) -> bool: + """Unlock database. + + Returns true if database lock has been held throughout the process. + """ + if not self._database_lock_task: + _LOGGER.warning("Database currently not locked.") + return False + + self._database_lock_task.database_unlock.set() + success = not self._database_lock_task.queue_overflow + + self._database_lock_task = None + + return success + def _setup_connection(self): """Ensure database is ready to fly.""" kwargs = {} diff --git a/homeassistant/components/recorder/websocket_api.py b/homeassistant/components/recorder/websocket_api.py index 5a4f0425919..7ff749c31ff 100644 --- a/homeassistant/components/recorder/websocket_api.py +++ b/homeassistant/components/recorder/websocket_api.py @@ -1,6 +1,7 @@ """The Energy websocket API.""" from __future__ import annotations +import logging from typing import TYPE_CHECKING import voluptuous as vol @@ -15,6 +16,8 @@ from .util import async_migration_in_progress if TYPE_CHECKING: from . import Recorder +_LOGGER: logging.Logger = logging.getLogger(__package__) + @callback def async_setup(hass: HomeAssistant) -> None: @@ -23,6 +26,8 @@ def async_setup(hass: HomeAssistant) -> None: websocket_api.async_register_command(hass, ws_clear_statistics) websocket_api.async_register_command(hass, ws_update_statistics_metadata) websocket_api.async_register_command(hass, ws_info) + websocket_api.async_register_command(hass, ws_backup_start) + websocket_api.async_register_command(hass, ws_backup_end) @websocket_api.websocket_command( @@ -106,3 +111,38 @@ def ws_info( "thread_running": thread_alive, } connection.send_result(msg["id"], recorder_info) + + +@websocket_api.require_admin +@websocket_api.websocket_command({vol.Required("type"): "backup/start"}) +@websocket_api.async_response +async def ws_backup_start( + hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict +) -> None: + """Backup start notification.""" + + _LOGGER.info("Received backup start notification. Locking database for writes.") + instance: Recorder = hass.data[DATA_INSTANCE] + try: + await instance.lock_database() + except TimeoutError as err: + connection.send_error(msg["id"], "timeout_error", str(err)) + return + connection.send_result(msg["id"]) + + +@websocket_api.require_admin +@websocket_api.websocket_command({vol.Required("type"): "backup/end"}) +@websocket_api.async_response +async def ws_backup_end( + hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict +) -> None: + """Backup end notification.""" + + instance: Recorder = hass.data[DATA_INSTANCE] + _LOGGER.info("Received end of backup, releasing write lock.") + if not instance.unlock_database(): + connection.send_error( + msg["id"], "database_unlock_failed", "Failed to unlock database." + ) + connection.send_result(msg["id"]) diff --git a/tests/components/recorder/test_websocket_api.py b/tests/components/recorder/test_websocket_api.py index 7a45dea0379..6a495bcf29f 100644 --- a/tests/components/recorder/test_websocket_api.py +++ b/tests/components/recorder/test_websocket_api.py @@ -358,3 +358,44 @@ async def test_recorder_info_migration_queue_exhausted(hass, hass_ws_client): assert response["result"]["migration_in_progress"] is False assert response["result"]["recording"] is True assert response["result"]["thread_running"] is True + + +async def test_backup_start_no_recorder(hass, hass_ws_client): + """Test getting backup start when recorder is not present.""" + client = await hass_ws_client() + + await client.send_json({"id": 1, "type": "backup/start"}) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "unknown_command" + + +async def test_backup_end(hass, hass_ws_client): + """Test backup start.""" + client = await hass_ws_client() + await async_init_recorder_component(hass) + + # Ensure there are no queued events + await async_wait_recording_done_without_instance(hass) + + await client.send_json({"id": 1, "type": "backup/start"}) + response = await client.receive_json() + assert response["success"] + + await client.send_json({"id": 2, "type": "backup/end"}) + response = await client.receive_json() + assert response["success"] + + +async def test_backup_end_without_start(hass, hass_ws_client): + """Test backup start.""" + client = await hass_ws_client() + await async_init_recorder_component(hass) + + # Ensure there are no queued events + await async_wait_recording_done_without_instance(hass) + + await client.send_json({"id": 1, "type": "backup/end"}) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "database_unlock_failed" From ed5b9a9ada7d66731cf0b5561867f99289b7a893 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Thu, 2 Dec 2021 22:01:39 +0000 Subject: [PATCH 04/18] Fix isort --- tests/components/recorder/test_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index 16c76d01916..fa449aefefc 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -7,8 +7,8 @@ from unittest.mock import MagicMock, patch import pytest from sqlalchemy import text from sqlalchemy.sql.elements import TextClause -from homeassistant.components import recorder +from homeassistant.components import recorder from homeassistant.components.recorder import run_information_with_session, util from homeassistant.components.recorder.const import DATA_INSTANCE, SQLITE_URL_PREFIX from homeassistant.components.recorder.models import RecorderRuns From eee53755dd3cd9efe74fab30b8fe752c3322dba2 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Fri, 3 Dec 2021 10:39:03 +0000 Subject: [PATCH 05/18] Avoid mutable default arguments --- tests/common.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/common.py b/tests/common.py index cf06ba4deae..3c485b1a774 100644 --- a/tests/common.py +++ b/tests/common.py @@ -897,8 +897,9 @@ def init_recorder_component(hass, add_config=None): _LOGGER.info("In-memory recorder successfully started") -async def async_init_recorder_component(hass, config={}): +async def async_init_recorder_component(hass, add_config=None): """Initialize the recorder asynchronously.""" + config = add_config or {} if recorder.CONF_DB_URL not in config: config[recorder.CONF_DB_URL] = "sqlite://" From 08cafc4b4c4ef4724426917582937457aafa275a Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Fri, 3 Dec 2021 10:44:46 +0000 Subject: [PATCH 06/18] Address pylint issues --- homeassistant/components/recorder/__init__.py | 8 ++++---- homeassistant/components/recorder/websocket_api.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 18a3d5a2088..8883ee7a97a 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -809,12 +809,12 @@ class Recorder(threading.Thread): while not task.database_unlock.wait(timeout=1): if self.queue.qsize() > MAX_QUEUE_BACKLOG * 0.9: _LOGGER.warning( - "Database queue backlog reached more than 90% of maximum queue length. Continue writing. Your Backup might be corrupted." + "Database queue backlog reached more than 90% of maximum queue length. Continue writing to database. Your Backup might be corrupted" ) task.queue_overflow = True break _LOGGER.info( - "Database queue backlog reached %d entries during backup.", + "Database queue backlog reached %d entries during backup", self.queue.qsize(), ) @@ -1021,7 +1021,7 @@ class Recorder(threading.Thread): async def lock_database(self) -> bool: """Lock database so it can be backed up safely.""" if self._database_lock_task: - _LOGGER.warning("Database already locked.") + _LOGGER.warning("Database already locked") return False task = DatabaseLockTask(threading.Event(), threading.Event(), False) @@ -1043,7 +1043,7 @@ class Recorder(threading.Thread): Returns true if database lock has been held throughout the process. """ if not self._database_lock_task: - _LOGGER.warning("Database currently not locked.") + _LOGGER.warning("Database currently not locked") return False self._database_lock_task.database_unlock.set() diff --git a/homeassistant/components/recorder/websocket_api.py b/homeassistant/components/recorder/websocket_api.py index 7ff749c31ff..f6d4d57a7e5 100644 --- a/homeassistant/components/recorder/websocket_api.py +++ b/homeassistant/components/recorder/websocket_api.py @@ -121,7 +121,7 @@ async def ws_backup_start( ) -> None: """Backup start notification.""" - _LOGGER.info("Received backup start notification. Locking database for writes.") + _LOGGER.info("Backup start notification, locking database for writes") instance: Recorder = hass.data[DATA_INSTANCE] try: await instance.lock_database() @@ -140,7 +140,7 @@ async def ws_backup_end( """Backup end notification.""" instance: Recorder = hass.data[DATA_INSTANCE] - _LOGGER.info("Received end of backup, releasing write lock.") + _LOGGER.info("Backup end notification, releasing write lock") if not instance.unlock_database(): connection.send_error( msg["id"], "database_unlock_failed", "Failed to unlock database." From 9eed4f5b0f0eedc8396d584c1e50f877ef344b20 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 2 Dec 2021 23:24:01 -1000 Subject: [PATCH 07/18] Avoid holding executor thread --- homeassistant/components/recorder/__init__.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 8883ee7a97a..28d78772eee 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -376,7 +376,7 @@ class WaitTask: class DatabaseLockTask: """An object to insert into the recorder queue to prevent writes to the database.""" - database_locked: threading.Event + database_locked: asyncio.Event database_unlock: threading.Event queue_overflow: bool @@ -802,10 +802,14 @@ class Recorder(threading.Thread): # Schedule a new statistics task if this one didn't finish self.queue.put(ExternalStatisticsTask(metadata, stats)) + @callback + def _async_set_database_locked(self, task: DatabaseLockTask): + task.database_locked.set() + def _lock_database(self, task: DatabaseLockTask): with write_lock_db(self): # Notify that lock is being held, wait until database can be used again. - task.database_locked.set() + self.hass.add_job(self._async_set_database_locked, task) while not task.database_unlock.wait(timeout=1): if self.queue.qsize() > MAX_QUEUE_BACKLOG * 0.9: _LOGGER.warning( @@ -1024,12 +1028,13 @@ class Recorder(threading.Thread): _LOGGER.warning("Database already locked") return False - task = DatabaseLockTask(threading.Event(), threading.Event(), False) + database_locked = asyncio.Event() + task = DatabaseLockTask(database_locked, threading.Event(), False) self.queue.put(task) lock_timeout = 30 - if not await self.hass.async_add_executor_job( - task.database_locked.wait, lock_timeout - ): + try: + await asyncio.wait_for(database_locked.wait(), timeout=lock_timeout) + except asyncio.TimeoutError: raise TimeoutError( f"Could not lock database within {lock_timeout} seconds." ) From b4c6cf8e9db4ca07faac280f3a829c621916b56f Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Fri, 3 Dec 2021 13:03:15 +0000 Subject: [PATCH 08/18] Set unlock event in case timeout occures This makes sure the database is left unlocked even in case of a race condition. --- homeassistant/components/recorder/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 28d78772eee..49e3c39a38d 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -1022,7 +1022,7 @@ class Recorder(threading.Thread): self.queue.put(WaitTask()) self._queue_watch.wait() - async def lock_database(self) -> bool: + async def lock_database(self, lock_timeout=30) -> bool: """Lock database so it can be backed up safely.""" if self._database_lock_task: _LOGGER.warning("Database already locked") @@ -1031,10 +1031,10 @@ class Recorder(threading.Thread): database_locked = asyncio.Event() task = DatabaseLockTask(database_locked, threading.Event(), False) self.queue.put(task) - lock_timeout = 30 try: await asyncio.wait_for(database_locked.wait(), timeout=lock_timeout) except asyncio.TimeoutError: + task.database_unlock.set() raise TimeoutError( f"Could not lock database within {lock_timeout} seconds." ) From 34b13172f9fda2fb8c2d1b46ad465116e060d46e Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Fri, 3 Dec 2021 13:05:06 +0000 Subject: [PATCH 09/18] Add more unit tests --- tests/components/recorder/test_init.py | 44 ++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index e41a0da34ba..bf2746da771 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -1,5 +1,6 @@ """The tests for the Recorder component.""" # pylint: disable=protected-access +import asyncio from datetime import datetime, timedelta import sqlite3 from unittest.mock import patch @@ -1134,3 +1135,46 @@ def test_entity_id_filter(hass_recorder): db_events = list(session.query(Events).filter_by(event_type="hello")) # Keep referring idx + 1, as no new events are being added assert len(db_events) == idx + 1, data + + +async def test_database_lock_and_unlock(hass: HomeAssistant, tmp_path): + """Test writing events during lock getting written after unlocking.""" + # Use file DB, in memory DB cannot do write locks. + config = {recorder.CONF_DB_URL: "sqlite:///" + str(tmp_path / "pytest.db")} + await async_init_recorder_component(hass, config) + await hass.async_block_till_done() + + instance: Recorder = hass.data[DATA_INSTANCE] + + await instance.lock_database() + + event_type = "EVENT_TEST" + event_data = {"test_attr": 5, "test_attr_10": "nice"} + hass.bus.fire(event_type, event_data) + task = asyncio.create_task(async_wait_recording_done(hass, instance)) + + # Recording can't be finished while lock is held + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(asyncio.shield(task), timeout=5) + + with session_scope(hass=hass) as session: + db_events = list(session.query(Events).filter_by(event_type=event_type)) + assert len(db_events) == 0 + + assert instance.unlock_database() + + await task + with session_scope(hass=hass) as session: + db_events = list(session.query(Events).filter_by(event_type=event_type)) + assert len(db_events) == 1 + + +async def test_database_lock_timeout(hass): + """Test locking database timeout when recorder stopped.""" + await async_init_recorder_component(hass) + hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) + await hass.async_block_till_done() + + instance: Recorder = hass.data[DATA_INSTANCE] + with pytest.raises(TimeoutError): + await instance.lock_database(lock_timeout=1) From 787aaa4a37e93c74bc8293afa17dd4ccbff72f3d Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Fri, 3 Dec 2021 15:16:44 +0000 Subject: [PATCH 10/18] Address new pylint errors --- homeassistant/components/recorder/__init__.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 49e3c39a38d..4085ee228f4 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -802,14 +802,14 @@ class Recorder(threading.Thread): # Schedule a new statistics task if this one didn't finish self.queue.put(ExternalStatisticsTask(metadata, stats)) - @callback - def _async_set_database_locked(self, task: DatabaseLockTask): - task.database_locked.set() - def _lock_database(self, task: DatabaseLockTask): + @callback + def _async_set_database_locked(task: DatabaseLockTask): + task.database_locked.set() + with write_lock_db(self): # Notify that lock is being held, wait until database can be used again. - self.hass.add_job(self._async_set_database_locked, task) + self.hass.add_job(_async_set_database_locked, task) while not task.database_unlock.wait(timeout=1): if self.queue.qsize() > MAX_QUEUE_BACKLOG * 0.9: _LOGGER.warning( @@ -1033,11 +1033,11 @@ class Recorder(threading.Thread): self.queue.put(task) try: await asyncio.wait_for(database_locked.wait(), timeout=lock_timeout) - except asyncio.TimeoutError: + except asyncio.TimeoutError as err: task.database_unlock.set() raise TimeoutError( f"Could not lock database within {lock_timeout} seconds." - ) + ) from err self._database_lock_task = task return True From 8dd9a118a51f9f21e80cdb7fa5991ebb88e5a05d Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Mon, 6 Dec 2021 10:27:47 +0000 Subject: [PATCH 11/18] Lower timeout to speedup tests --- tests/components/recorder/test_init.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index bf2746da771..c98ba8cf32a 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -1155,7 +1155,7 @@ async def test_database_lock_and_unlock(hass: HomeAssistant, tmp_path): # Recording can't be finished while lock is held with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(asyncio.shield(task), timeout=5) + await asyncio.wait_for(asyncio.shield(task), timeout=1) with session_scope(hass=hass) as session: db_events = list(session.query(Events).filter_by(event_type=event_type)) @@ -1177,4 +1177,4 @@ async def test_database_lock_timeout(hass): instance: Recorder = hass.data[DATA_INSTANCE] with pytest.raises(TimeoutError): - await instance.lock_database(lock_timeout=1) + await instance.lock_database(lock_timeout=0.1) From 14347df22b9f820321baf9e710657ff071c4c87b Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Mon, 6 Dec 2021 10:28:21 +0000 Subject: [PATCH 12/18] Introduce queue overflow test --- homeassistant/components/recorder/__init__.py | 4 ++- tests/components/recorder/test_init.py | 29 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 4085ee228f4..d0f615e0332 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -125,6 +125,8 @@ KEEPALIVE_TIME = 30 # States and Events objects EXPIRE_AFTER_COMMITS = 120 +DB_LOCK_QUEUE_CHECK_TIMEOUT = 1 + CONF_AUTO_PURGE = "auto_purge" CONF_DB_URL = "db_url" CONF_DB_MAX_RETRIES = "db_max_retries" @@ -810,7 +812,7 @@ class Recorder(threading.Thread): with write_lock_db(self): # Notify that lock is being held, wait until database can be used again. self.hass.add_job(_async_set_database_locked, task) - while not task.database_unlock.wait(timeout=1): + while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT): if self.queue.qsize() > MAX_QUEUE_BACKLOG * 0.9: _LOGGER.warning( "Database queue backlog reached more than 90% of maximum queue length. Continue writing to database. Your Backup might be corrupted" diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index c98ba8cf32a..d1e63941aed 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -1169,6 +1169,35 @@ async def test_database_lock_and_unlock(hass: HomeAssistant, tmp_path): assert len(db_events) == 1 +async def test_database_lock_and_overflow(hass: HomeAssistant, tmp_path): + """Test writing events during lock leading to overflow the queue causes the database to unlock.""" + # Use file DB, in memory DB cannot do write locks. + config = {recorder.CONF_DB_URL: "sqlite:///" + str(tmp_path / "pytest.db")} + await async_init_recorder_component(hass, config) + await hass.async_block_till_done() + + instance: Recorder = hass.data[DATA_INSTANCE] + + with patch.object(recorder, "MAX_QUEUE_BACKLOG", 1), patch.object( + recorder, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.1 + ): + await instance.lock_database() + + event_type = "EVENT_TEST" + event_data = {"test_attr": 5, "test_attr_10": "nice"} + hass.bus.fire(event_type, event_data) + + # Check that this causes the queue to overflow and write succeeds + # even before unlocking. + await async_wait_recording_done(hass, instance) + + with session_scope(hass=hass) as session: + db_events = list(session.query(Events).filter_by(event_type=event_type)) + assert len(db_events) == 1 + + assert not instance.unlock_database() + + async def test_database_lock_timeout(hass): """Test locking database timeout when recorder stopped.""" await async_init_recorder_component(hass) From d9d6249750f112a773cfc4d4132ae8711acfed87 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Mon, 6 Dec 2021 11:13:13 +0000 Subject: [PATCH 13/18] Unlock database if necessary This makes sure that the test runs through in case locking actually succeeds (and the test fails). --- tests/components/recorder/test_init.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index d1e63941aed..2410b5f4d86 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -1205,5 +1205,8 @@ async def test_database_lock_timeout(hass): await hass.async_block_till_done() instance: Recorder = hass.data[DATA_INSTANCE] - with pytest.raises(TimeoutError): - await instance.lock_database(lock_timeout=0.1) + try: + with pytest.raises(TimeoutError): + await instance.lock_database(lock_timeout=0.1) + finally: + instance.unlock_database() From f07c6e097065047c4a2f5402c2511674543f679f Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Mon, 6 Dec 2021 11:24:38 +0000 Subject: [PATCH 14/18] Make DB_LOCK_TIMEOUT a global There is no good reason for this to be an argument. The recorder needs to pick a sensible value. --- homeassistant/components/recorder/__init__.py | 7 ++++--- tests/components/recorder/test_init.py | 11 ++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index d0f615e0332..cdae61e8c7f 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -125,6 +125,7 @@ KEEPALIVE_TIME = 30 # States and Events objects EXPIRE_AFTER_COMMITS = 120 +DB_LOCK_TIMEOUT = 30 DB_LOCK_QUEUE_CHECK_TIMEOUT = 1 CONF_AUTO_PURGE = "auto_purge" @@ -1024,7 +1025,7 @@ class Recorder(threading.Thread): self.queue.put(WaitTask()) self._queue_watch.wait() - async def lock_database(self, lock_timeout=30) -> bool: + async def lock_database(self) -> bool: """Lock database so it can be backed up safely.""" if self._database_lock_task: _LOGGER.warning("Database already locked") @@ -1034,11 +1035,11 @@ class Recorder(threading.Thread): task = DatabaseLockTask(database_locked, threading.Event(), False) self.queue.put(task) try: - await asyncio.wait_for(database_locked.wait(), timeout=lock_timeout) + await asyncio.wait_for(database_locked.wait(), timeout=DB_LOCK_TIMEOUT) except asyncio.TimeoutError as err: task.database_unlock.set() raise TimeoutError( - f"Could not lock database within {lock_timeout} seconds." + f"Could not lock database within {DB_LOCK_TIMEOUT} seconds." ) from err self._database_lock_task = task return True diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 2410b5f4d86..3d9ea63c241 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -1205,8 +1205,9 @@ async def test_database_lock_timeout(hass): await hass.async_block_till_done() instance: Recorder = hass.data[DATA_INSTANCE] - try: - with pytest.raises(TimeoutError): - await instance.lock_database(lock_timeout=0.1) - finally: - instance.unlock_database() + with patch.object(recorder, "DB_LOCK_TIMEOUT", 0.1): + try: + with pytest.raises(TimeoutError): + await instance.lock_database() + finally: + instance.unlock_database() From ab01f7c6cb72b5e12b0c5280bb6b267137cdb0e1 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Mon, 6 Dec 2021 11:49:06 +0000 Subject: [PATCH 15/18] Add Websocket Timeout test --- .../components/recorder/test_websocket_api.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/components/recorder/test_websocket_api.py b/tests/components/recorder/test_websocket_api.py index 6a495bcf29f..994d1c677af 100644 --- a/tests/components/recorder/test_websocket_api.py +++ b/tests/components/recorder/test_websocket_api.py @@ -370,6 +370,24 @@ async def test_backup_start_no_recorder(hass, hass_ws_client): assert response["error"]["code"] == "unknown_command" +async def test_backup_start_timeout(hass, hass_ws_client): + """Test getting backup start when recorder is not present.""" + client = await hass_ws_client() + await async_init_recorder_component(hass) + + # Ensure there are no queued events + await async_wait_recording_done_without_instance(hass) + + with patch.object(recorder, "DB_LOCK_TIMEOUT", 0): + try: + await client.send_json({"id": 1, "type": "backup/start"}) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "timeout_error" + finally: + await client.send_json({"id": 2, "type": "backup/end"}) + + async def test_backup_end(hass, hass_ws_client): """Test backup start.""" client = await hass_ws_client() From d62c06ffb5f5dcec86e59c38653a0a8b5a90356b Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Mon, 6 Dec 2021 15:46:47 +0000 Subject: [PATCH 16/18] Test lock_database() return --- tests/components/recorder/test_init.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 3d9ea63c241..7d7c3f27fb6 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -1146,7 +1146,9 @@ async def test_database_lock_and_unlock(hass: HomeAssistant, tmp_path): instance: Recorder = hass.data[DATA_INSTANCE] - await instance.lock_database() + assert await instance.lock_database() + + assert not await instance.lock_database() event_type = "EVENT_TEST" event_data = {"test_attr": 5, "test_attr_10": "nice"} From d1ddf225b4d0b68381cffd13236d45f9e49cf879 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 7 Dec 2021 12:07:54 +0100 Subject: [PATCH 17/18] Update homeassistant/components/recorder/__init__.py Co-authored-by: Erik Montnemery --- homeassistant/components/recorder/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index cdae61e8c7f..9f91ee55f8c 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -816,7 +816,10 @@ class Recorder(threading.Thread): while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT): if self.queue.qsize() > MAX_QUEUE_BACKLOG * 0.9: _LOGGER.warning( - "Database queue backlog reached more than 90% of maximum queue length. Continue writing to database. Your Backup might be corrupted" + "Database queue backlog reached more than 90% of maximum queue " + "length while waiting for backup to finish; recorder will now " + "resume writing to database. The backup can not be trusted and " + "must be restarted" ) task.queue_overflow = True break From 2c9ee54977022aef7493562a24d3cd6eee485a45 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 7 Dec 2021 11:15:54 +0000 Subject: [PATCH 18/18] Replace tabs with spaces --- homeassistant/components/recorder/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 9f91ee55f8c..8a907a8d9fa 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -817,9 +817,9 @@ class Recorder(threading.Thread): if self.queue.qsize() > MAX_QUEUE_BACKLOG * 0.9: _LOGGER.warning( "Database queue backlog reached more than 90% of maximum queue " - "length while waiting for backup to finish; recorder will now " - "resume writing to database. The backup can not be trusted and " - "must be restarted" + "length while waiting for backup to finish; recorder will now " + "resume writing to database. The backup can not be trusted and " + "must be restarted" ) task.queue_overflow = True break