Add Estimated Database Size to the recorder system health (#71463)

This commit is contained in:
J. Nick Koston 2022-05-07 23:02:54 -05:00 committed by GitHub
parent 49d13b9981
commit a8aa0e1cca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 200 additions and 67 deletions

View file

@ -4,6 +4,7 @@ from functools import partial
import json
from typing import Final
from homeassistant.backports.enum import StrEnum
from homeassistant.const import ATTR_ATTRIBUTION, ATTR_RESTORED, ATTR_SUPPORTED_FEATURES
from homeassistant.helpers.json import JSONEncoder
@ -37,3 +38,11 @@ KEEPALIVE_TIME = 30
EXCLUDE_ATTRIBUTES = f"{DOMAIN}_exclude_attributes_by_domain"
class SupportedDialect(StrEnum):
"""Supported dialects."""
SQLITE = "sqlite"
MYSQL = "mysql"
POSTGRESQL = "postgresql"

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable, Iterable
import contextlib
from datetime import datetime, timedelta
import logging
import queue
@ -41,6 +42,7 @@ from .const import (
KEEPALIVE_TIME,
MAX_QUEUE_BACKLOG,
SQLITE_URL_PREFIX,
SupportedDialect,
)
from .executor import DBInterruptibleThreadPoolExecutor
from .models import (
@ -193,6 +195,13 @@ class Recorder(threading.Thread):
"""Return the number of items in the recorder backlog."""
return self._queue.qsize()
@property
def dialect_name(self) -> SupportedDialect | None:
"""Return the dialect the recorder uses."""
with contextlib.suppress(ValueError):
return SupportedDialect(self.engine.dialect.name) if self.engine else None
return None
@property
def _using_file_sqlite(self) -> bool:
"""Short version to check if we are using sqlite3 as a file."""
@ -459,11 +468,6 @@ class Recorder(threading.Thread):
"""Schedule external statistics."""
self.queue_task(ExternalStatisticsTask(metadata, stats))
@callback
def using_sqlite(self) -> bool:
"""Return if recorder uses sqlite as the engine."""
return bool(self.engine and self.engine.dialect.name == "sqlite")
@callback
def _async_setup_periodic_tasks(self) -> None:
"""Prepare periodic tasks."""
@ -473,7 +477,7 @@ class Recorder(threading.Thread):
# If the db is using a socket connection, we need to keep alive
# to prevent errors from unexpected disconnects
if not self.using_sqlite():
if self.dialect_name != SupportedDialect.SQLITE:
self._keep_alive_listener = async_track_time_interval(
self.hass, self._async_keep_alive, timedelta(seconds=KEEPALIVE_TIME)
)
@ -939,7 +943,7 @@ class Recorder(threading.Thread):
async def lock_database(self) -> bool:
"""Lock database so it can be backed up safely."""
if not self.using_sqlite():
if self.dialect_name != SupportedDialect.SQLITE:
_LOGGER.debug(
"Not a SQLite database or not connected, locking not necessary"
)
@ -968,7 +972,7 @@ class Recorder(threading.Thread):
Returns true if database lock has been held throughout the process.
"""
if not self.using_sqlite():
if self.dialect_name != SupportedDialect.SQLITE:
_LOGGER.debug(
"Not a SQLite database or not connected, unlocking not necessary"
)

View file

@ -21,6 +21,7 @@ from sqlalchemy.sql.expression import true
from homeassistant.core import HomeAssistant
from .const import SupportedDialect
from .models import (
SCHEMA_VERSION,
TABLE_STATES,
@ -263,7 +264,7 @@ def _modify_columns(
columns_def: list[str],
) -> None:
"""Modify columns in a table."""
if engine.dialect.name == "sqlite":
if engine.dialect.name == SupportedDialect.SQLITE:
_LOGGER.debug(
"Skipping to modify columns %s in table %s; "
"Modifying column length in SQLite is unnecessary, "
@ -281,7 +282,7 @@ def _modify_columns(
table_name,
)
if engine.dialect.name == "postgresql":
if engine.dialect.name == SupportedDialect.POSTGRESQL:
columns_def = [
"ALTER {column} TYPE {type}".format(
**dict(zip(["column", "type"], col_def.split(" ", 1)))
@ -408,7 +409,7 @@ def _apply_update( # noqa: C901
) -> None:
"""Perform operations to bring schema up to date."""
dialect = engine.dialect.name
big_int = "INTEGER(20)" if dialect == "mysql" else "INTEGER"
big_int = "INTEGER(20)" if dialect == SupportedDialect.MYSQL else "INTEGER"
if new_version == 1:
_create_index(session_maker, "events", "ix_events_time_fired")
@ -487,11 +488,11 @@ def _apply_update( # noqa: C901
_create_index(session_maker, "states", "ix_states_old_state_id")
_update_states_table_with_foreign_key_options(session_maker, engine)
elif new_version == 12:
if engine.dialect.name == "mysql":
if engine.dialect.name == SupportedDialect.MYSQL:
_modify_columns(session_maker, engine, "events", ["event_data LONGTEXT"])
_modify_columns(session_maker, engine, "states", ["attributes LONGTEXT"])
elif new_version == 13:
if engine.dialect.name == "mysql":
if engine.dialect.name == SupportedDialect.MYSQL:
_modify_columns(
session_maker,
engine,
@ -545,7 +546,7 @@ def _apply_update( # noqa: C901
session.add(StatisticsRuns(start=get_start_time()))
elif new_version == 20:
# This changed the precision of statistics from float to double
if engine.dialect.name in ["mysql", "postgresql"]:
if engine.dialect.name in [SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL]:
_modify_columns(
session_maker,
engine,
@ -560,7 +561,7 @@ def _apply_update( # noqa: C901
)
elif new_version == 21:
# Try to change the character set of the statistic_meta table
if engine.dialect.name == "mysql":
if engine.dialect.name == SupportedDialect.MYSQL:
for table in ("events", "states", "statistics_meta"):
_LOGGER.warning(
"Updating character set and collation of table %s to utf8mb4. "

View file

@ -13,7 +13,7 @@ from sqlalchemy.sql.expression import distinct
from homeassistant.const import EVENT_STATE_CHANGED
from .const import MAX_ROWS_TO_PURGE
from .const import MAX_ROWS_TO_PURGE, SupportedDialect
from .models import (
EventData,
Events,
@ -45,7 +45,7 @@ def purge_old_data(
"Purging states and events before target %s",
purge_before.isoformat(sep=" ", timespec="seconds"),
)
using_sqlite = instance.using_sqlite()
using_sqlite = instance.dialect_name == SupportedDialect.SQLITE
with session_scope(session=instance.get_session()) as session:
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
@ -425,7 +425,7 @@ def _purge_old_recorder_runs(
def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
"""Remove filtered states and events that shouldn't be in the database."""
_LOGGER.debug("Cleanup filtered data")
using_sqlite = instance.using_sqlite()
using_sqlite = instance.dialect_name == SupportedDialect.SQLITE
# Check if excluded entity_ids are in database
excluded_entity_ids: list[str] = [
@ -484,7 +484,7 @@ def _purge_filtered_events(
instance: Recorder, session: Session, excluded_event_types: list[str]
) -> None:
"""Remove filtered events and linked states."""
using_sqlite = instance.using_sqlite()
using_sqlite = instance.dialect_name == SupportedDialect.SQLITE
event_ids, data_ids = zip(
*(
session.query(Events.event_id, Events.data_id)
@ -514,7 +514,7 @@ def _purge_filtered_events(
@retryable_database_job("purge")
def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) -> bool:
"""Purge states and events of specified entities."""
using_sqlite = instance.using_sqlite()
using_sqlite = instance.dialect_name == SupportedDialect.SQLITE
with session_scope(session=instance.get_session()) as session:
selected_entity_ids: list[str] = [
entity_id

View file

@ -6,6 +6,8 @@ from typing import TYPE_CHECKING
from sqlalchemy import text
from .const import SupportedDialect
if TYPE_CHECKING:
from . import Recorder
@ -18,7 +20,7 @@ def repack_database(instance: Recorder) -> None:
dialect_name = instance.engine.dialect.name
# Execute sqlite command to free up space on disk
if dialect_name == "sqlite":
if dialect_name == SupportedDialect.SQLITE:
_LOGGER.debug("Vacuuming SQL DB to free space")
with instance.engine.connect() as conn:
conn.execute(text("VACUUM"))
@ -26,7 +28,7 @@ def repack_database(instance: Recorder) -> None:
return
# Execute postgresql vacuum command to free up space on disk
if dialect_name == "postgresql":
if dialect_name == SupportedDialect.POSTGRESQL:
_LOGGER.debug("Vacuuming SQL DB to free space")
with instance.engine.connect().execution_options(
isolation_level="AUTOCOMMIT"
@ -36,7 +38,7 @@ def repack_database(instance: Recorder) -> None:
return
# Optimize mysql / mariadb tables to free up space on disk
if dialect_name == "mysql":
if dialect_name == SupportedDialect.MYSQL:
_LOGGER.debug("Optimizing SQL DB to free space")
with instance.engine.connect() as conn:
conn.execute(text("OPTIMIZE TABLE states, events, recorder_runs"))

View file

@ -38,7 +38,7 @@ import homeassistant.util.temperature as temperature_util
from homeassistant.util.unit_system import UnitSystem
import homeassistant.util.volume as volume_util
from .const import DATA_INSTANCE, DOMAIN, MAX_ROWS_TO_PURGE
from .const import DATA_INSTANCE, DOMAIN, MAX_ROWS_TO_PURGE, SupportedDialect
from .models import (
StatisticData,
StatisticMetaData,
@ -1342,10 +1342,13 @@ def _filter_unique_constraint_integrity_error(
dialect_name = instance.engine.dialect.name
ignore = False
if dialect_name == "sqlite" and "UNIQUE constraint failed" in str(err):
if (
dialect_name == SupportedDialect.SQLITE
and "UNIQUE constraint failed" in str(err)
):
ignore = True
if (
dialect_name == "postgresql"
dialect_name == SupportedDialect.POSTGRESQL
and hasattr(err.orig, "pgcode")
and err.orig.pgcode == "23505"
):

View file

@ -2,7 +2,8 @@
"system_health": {
"info": {
"oldest_recorder_run": "Oldest Run Start Time",
"current_recorder_run": "Current Run Start Time"
"current_recorder_run": "Current Run Start Time",
"estimated_db_size": "Estimated Database Size (MiB)"
}
}
}

View file

@ -1,26 +0,0 @@
"""Provide info to system health."""
from typing import Any
from homeassistant.components import system_health
from homeassistant.core import HomeAssistant, callback
from . import get_instance
@callback
def async_register(
hass: HomeAssistant, register: system_health.SystemHealthRegistration
) -> None:
"""Register system health callbacks."""
register.async_register_info(system_health_info)
async def system_health_info(hass: HomeAssistant) -> dict[str, Any]:
"""Get info for the info page."""
instance = get_instance(hass)
run_history = instance.run_history
return {
"oldest_recorder_run": run_history.first.start,
"current_recorder_run": run_history.current.start,
}

View file

@ -0,0 +1,60 @@
"""Provide info to system health."""
from __future__ import annotations
from typing import Any
from yarl import URL
from homeassistant.components import system_health
from homeassistant.components.recorder.core import Recorder
from homeassistant.components.recorder.util import session_scope
from homeassistant.core import HomeAssistant, callback
from .. import get_instance
from ..const import SupportedDialect
from .mysql import db_size_bytes as mysql_db_size_bytes
from .postgresql import db_size_bytes as postgresql_db_size_bytes
from .sqlite import db_size_bytes as sqlite_db_size_bytes
DIALECT_TO_GET_SIZE = {
SupportedDialect.SQLITE: sqlite_db_size_bytes,
SupportedDialect.MYSQL: mysql_db_size_bytes,
SupportedDialect.POSTGRESQL: postgresql_db_size_bytes,
}
@callback
def async_register(
hass: HomeAssistant, register: system_health.SystemHealthRegistration
) -> None:
"""Register system health callbacks."""
register.async_register_info(system_health_info)
def _get_db_stats(instance: Recorder, database_name: str) -> dict[str, Any]:
"""Get the stats about the database."""
db_stats: dict[str, Any] = {}
with session_scope(session=instance.get_session()) as session:
if (
(dialect_name := instance.dialect_name)
and (get_size := DIALECT_TO_GET_SIZE.get(dialect_name))
and (db_bytes := get_size(session, database_name))
):
db_stats["estimated_db_size"] = f"{db_bytes/1024/1024:.2f} MiB"
return db_stats
async def system_health_info(hass: HomeAssistant) -> dict[str, Any]:
"""Get info for the info page."""
instance = get_instance(hass)
run_history = instance.run_history
database_name = URL(instance.db_url).path.lstrip("/")
db_stats: dict[str, Any] = {}
if instance.async_db_ready.done():
db_stats = await instance.async_add_executor_job(
_get_db_stats, instance, database_name
)
return {
"oldest_recorder_run": run_history.first.start,
"current_recorder_run": run_history.current.start,
} | db_stats

View file

@ -0,0 +1,19 @@
"""Provide info to system health for mysql."""
from __future__ import annotations
from sqlalchemy import text
from sqlalchemy.orm.session import Session
def db_size_bytes(session: Session, database_name: str) -> float:
"""Get the mysql database size."""
return float(
session.execute(
text(
"SELECT ROUND(SUM(DATA_LENGTH + INDEX_LENGTH), 2) "
"FROM information_schema.TABLES WHERE "
"TABLE_SCHEMA=:database_name"
),
{"database_name": database_name},
).first()[0]
)

View file

@ -0,0 +1,15 @@
"""Provide info to system health for postgresql."""
from __future__ import annotations
from sqlalchemy import text
from sqlalchemy.orm.session import Session
def db_size_bytes(session: Session, database_name: str) -> float:
"""Get the mysql database size."""
return float(
session.execute(
text("select pg_database_size(:database_name);"),
{"database_name": database_name},
).first()[0]
)

View file

@ -0,0 +1,17 @@
"""Provide info to system health for sqlite."""
from __future__ import annotations
from sqlalchemy import text
from sqlalchemy.orm.session import Session
def db_size_bytes(session: Session, database_name: str) -> float:
"""Get the mysql database size."""
return float(
session.execute(
text(
"SELECT page_count * page_size as size "
"FROM pragma_page_count(), pragma_page_size();"
)
).first()[0]
)

View file

@ -2,7 +2,8 @@
"system_health": {
"info": {
"current_recorder_run": "Current Run Start Time",
"oldest_recorder_run": "Oldest Run Start Time"
"oldest_recorder_run": "Oldest Run Start Time",
"estimated_db_size": "Estimated Database Size"
}
}
}

View file

@ -25,7 +25,7 @@ from typing_extensions import Concatenate, ParamSpec
from homeassistant.core import HomeAssistant
import homeassistant.util.dt as dt_util
from .const import DATA_INSTANCE, SQLITE_URL_PREFIX
from .const import DATA_INSTANCE, SQLITE_URL_PREFIX, SupportedDialect
from .models import (
ALL_TABLES,
TABLE_RECORDER_RUNS,
@ -353,7 +353,7 @@ def setup_connection_for_dialect(
# Returns False if the the connection needs to be setup
# on the next connection, returns True if the connection
# never needs to be setup again.
if dialect_name == "sqlite":
if dialect_name == SupportedDialect.SQLITE:
if first_connection:
old_isolation = dbapi_connection.isolation_level
dbapi_connection.isolation_level = None
@ -381,7 +381,7 @@ def setup_connection_for_dialect(
# enable support for foreign keys
execute_on_connection(dbapi_connection, "PRAGMA foreign_keys=ON")
elif dialect_name == "mysql":
elif dialect_name == SupportedDialect.MYSQL:
execute_on_connection(dbapi_connection, "SET session wait_timeout=28800")
if first_connection:
result = query_on_connection(dbapi_connection, "SELECT VERSION()")
@ -408,7 +408,7 @@ def setup_connection_for_dialect(
version or version_string, "MySQL", MIN_VERSION_MYSQL
)
elif dialect_name == "postgresql":
elif dialect_name == SupportedDialect.POSTGRESQL:
if first_connection:
# server_version_num was added in 2006
result = query_on_connection(dbapi_connection, "SHOW server_version")
@ -455,7 +455,7 @@ def retryable_database_job(
except OperationalError as err:
assert instance.engine is not None
if (
instance.engine.dialect.name == "mysql"
instance.engine.dialect.name == SupportedDialect.MYSQL
and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS
):
_LOGGER.info(
@ -481,7 +481,7 @@ def periodic_db_cleanups(instance: Recorder) -> None:
These cleanups will happen nightly or after any purge.
"""
assert instance.engine is not None
if instance.engine.dialect.name == "sqlite":
if instance.engine.dialect.name == SupportedDialect.SQLITE:
# Execute sqlite to create a wal checkpoint and free up disk space
_LOGGER.debug("WAL checkpoint")
with instance.engine.connect() as connection:

View file

@ -1440,9 +1440,7 @@ async def test_database_connection_keep_alive(
caplog: pytest.LogCaptureFixture,
):
"""Test we keep alive socket based dialects."""
with patch(
"homeassistant.components.recorder.Recorder.using_sqlite", return_value=False
):
with patch("homeassistant.components.recorder.Recorder.dialect_name"):
instance = await async_setup_recorder_instance(hass)
# We have to mock this since we don't have a mock
# MySQL server available in tests.

View file

@ -9,7 +9,7 @@ from sqlalchemy.exc import DatabaseError, OperationalError
from sqlalchemy.orm.session import Session
from homeassistant.components import recorder
from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE
from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE, SupportedDialect
from homeassistant.components.recorder.models import (
Events,
RecorderRuns,
@ -43,8 +43,10 @@ from tests.common import SetupRecorderInstanceT
def mock_use_sqlite(request):
"""Pytest fixture to switch purge method."""
with patch(
"homeassistant.components.recorder.Recorder.using_sqlite",
return_value=request.param,
"homeassistant.components.recorder.core.Recorder.dialect_name",
return_value=SupportedDialect.SQLITE
if request.param
else SupportedDialect.MYSQL,
):
yield

View file

@ -1,8 +1,11 @@
"""Test recorder system health."""
from unittest.mock import patch
from unittest.mock import ANY, Mock, patch
import pytest
from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.const import SupportedDialect
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
@ -20,6 +23,29 @@ async def test_recorder_system_health(hass, recorder_mock):
assert info == {
"current_recorder_run": instance.run_history.current.start,
"oldest_recorder_run": instance.run_history.first.start,
"estimated_db_size": ANY,
}
@pytest.mark.parametrize(
"dialect_name", [SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL]
)
async def test_recorder_system_health_alternate_dbms(hass, recorder_mock, dialect_name):
"""Test recorder system health."""
assert await async_setup_component(hass, "system_health", {})
await async_wait_recording_done(hass)
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", dialect_name
), patch(
"sqlalchemy.orm.session.Session.execute",
return_value=Mock(first=Mock(return_value=("1048576",))),
):
info = await get_system_health_info(hass, "recorder")
instance = get_instance(hass)
assert info == {
"current_recorder_run": instance.run_history.current.start,
"oldest_recorder_run": instance.run_history.first.start,
"estimated_db_size": "1.00 MiB",
}
@ -35,4 +61,5 @@ async def test_recorder_system_health_crashed_recorder_runs_table(
assert info == {
"current_recorder_run": instance.run_history.current.start,
"oldest_recorder_run": instance.run_history.current.start,
"estimated_db_size": ANY,
}