Drop statistic_id and source columns from statistics table (#52417)

* Drop statistic_id and source columns from statistics table

* Remove useless double drop of statistics table

* Update homeassistant/components/recorder/models.py

Co-authored-by: Franck Nijhof <git@frenck.dev>

* black

Co-authored-by: Franck Nijhof <git@frenck.dev>
This commit is contained in:
Erik Montnemery 2021-07-02 13:17:00 +02:00 committed by GitHub
parent d339e3bd8c
commit 24ae05b734
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 107 additions and 73 deletions

View file

@ -11,7 +11,14 @@ from sqlalchemy.exc import (
)
from sqlalchemy.schema import AddConstraint, DropConstraint
from .models import SCHEMA_VERSION, TABLE_STATES, Base, SchemaChanges, Statistics
from .models import (
SCHEMA_VERSION,
TABLE_STATES,
Base,
SchemaChanges,
Statistics,
StatisticsMeta,
)
from .util import session_scope
_LOGGER = logging.getLogger(__name__)
@ -453,10 +460,15 @@ def _apply_update(engine, session, new_version, old_version):
connection, engine, TABLE_STATES, ["old_state_id"]
)
elif new_version == 17:
# This dropped the statistics table, done again in version 18.
pass
elif new_version == 18:
if sqlalchemy.inspect(engine).has_table(Statistics.__tablename__):
# Recreate the statistics table
# Recreate the statistics and statisticsmeta tables
Statistics.__table__.drop(engine)
Statistics.__table__.create(engine)
StatisticsMeta.__table__.drop(engine)
StatisticsMeta.__table__.create(engine)
else:
raise ValueError(f"No schema migration defined for version {new_version}")

View file

@ -36,7 +36,7 @@ import homeassistant.util.dt as dt_util
# pylint: disable=invalid-name
Base = declarative_base()
SCHEMA_VERSION = 17
SCHEMA_VERSION = 18
_LOGGER = logging.getLogger(__name__)
@ -224,8 +224,11 @@ class Statistics(Base): # type: ignore
__tablename__ = TABLE_STATISTICS
id = Column(Integer, primary_key=True)
created = Column(DATETIME_TYPE, default=dt_util.utcnow)
source = Column(String(32))
statistic_id = Column(String(255))
metadata_id = Column(
Integer,
ForeignKey(f"{TABLE_STATISTICS_META}.id", ondelete="CASCADE"),
index=True,
)
start = Column(DATETIME_TYPE, index=True)
mean = Column(Float())
min = Column(Float())
@ -236,15 +239,14 @@ class Statistics(Base): # type: ignore
__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index("ix_statistics_statistic_id_start", "statistic_id", "start"),
Index("ix_statistics_statistic_id_start", "metadata_id", "start"),
)
@staticmethod
def from_stats(source, statistic_id, start, stats):
def from_stats(metadata_id, start, stats):
"""Create object from a statistics."""
return Statistics(
source=source,
statistic_id=statistic_id,
metadata_id=metadata_id,
start=start,
**stats,
)
@ -258,17 +260,22 @@ class StatisticsMeta(Base): # type: ignore
"mysql_collate": "utf8mb4_unicode_ci",
}
__tablename__ = TABLE_STATISTICS_META
statistic_id = Column(String(255), primary_key=True)
id = Column(Integer, primary_key=True)
statistic_id = Column(String(255), index=True)
source = Column(String(32))
unit_of_measurement = Column(String(255))
has_mean = Column(Boolean)
has_sum = Column(Boolean)
@staticmethod
def from_meta(source, statistic_id, unit_of_measurement):
def from_meta(source, statistic_id, unit_of_measurement, has_mean, has_sum):
"""Create object from meta data."""
return StatisticsMeta(
source=source,
statistic_id=statistic_id,
unit_of_measurement=unit_of_measurement,
has_mean=has_mean,
has_sum=has_sum,
)

View file

@ -23,7 +23,7 @@ if TYPE_CHECKING:
from . import Recorder
QUERY_STATISTICS = [
Statistics.statistic_id,
Statistics.metadata_id,
Statistics.start,
Statistics.mean,
Statistics.min,
@ -33,11 +33,8 @@ QUERY_STATISTICS = [
Statistics.sum,
]
QUERY_STATISTIC_IDS = [
Statistics.statistic_id,
]
QUERY_STATISTIC_META = [
StatisticsMeta.id,
StatisticsMeta.statistic_id,
StatisticsMeta.unit_of_measurement,
]
@ -76,16 +73,39 @@ def get_start_time() -> datetime.datetime:
return start
def _get_metadata_ids(hass, session, statistic_ids):
"""Resolve metadata_id for a list of statistic_ids."""
baked_query = hass.data[STATISTICS_META_BAKERY](
lambda session: session.query(*QUERY_STATISTIC_META)
)
baked_query += lambda q: q.filter(
StatisticsMeta.statistic_id.in_(bindparam("statistic_ids"))
)
result = execute(baked_query(session).params(statistic_ids=statistic_ids))
return [id for id, _, _ in result]
def _get_or_add_metadata_id(hass, session, statistic_id, metadata):
"""Get metadata_id for a statistic_id, add if it doesn't exist."""
metadata_id = _get_metadata_ids(hass, session, [statistic_id])
if not metadata_id:
unit = metadata["unit_of_measurement"]
has_mean = metadata["has_mean"]
has_sum = metadata["has_sum"]
session.add(
StatisticsMeta.from_meta(DOMAIN, statistic_id, unit, has_mean, has_sum)
)
metadata_id = _get_metadata_ids(hass, session, [statistic_id])
return metadata_id[0]
@retryable_database_job("statistics")
def compile_statistics(instance: Recorder, start: datetime.datetime) -> bool:
"""Compile statistics."""
start = dt_util.as_utc(start)
end = start + timedelta(hours=1)
_LOGGER.debug(
"Compiling statistics for %s-%s",
start,
end,
)
_LOGGER.debug("Compiling statistics for %s-%s", start, end)
platform_stats = []
for domain, platform in instance.hass.data[DOMAIN].items():
if not hasattr(platform, "compile_statistics"):
@ -98,29 +118,22 @@ def compile_statistics(instance: Recorder, start: datetime.datetime) -> bool:
with session_scope(session=instance.get_session()) as session: # type: ignore
for stats in platform_stats:
for entity_id, stat in stats.items():
session.add(
Statistics.from_stats(DOMAIN, entity_id, start, stat["stat"])
metadata_id = _get_or_add_metadata_id(
instance.hass, session, entity_id, stat["meta"]
)
exists = session.query(
session.query(StatisticsMeta)
.filter_by(statistic_id=entity_id)
.exists()
).scalar()
if not exists:
unit = stat["meta"]["unit_of_measurement"]
session.add(StatisticsMeta.from_meta(DOMAIN, entity_id, unit))
session.add(Statistics.from_stats(metadata_id, start, stat["stat"]))
return True
def _get_meta_data(hass, session, statistic_ids):
def _get_meta_data(hass, session, statistic_ids, statistic_type):
"""Fetch meta data."""
def _meta(metas, wanted_statistic_id):
meta = {"statistic_id": wanted_statistic_id, "unit_of_measurement": None}
for statistic_id, unit in metas:
if statistic_id == wanted_statistic_id:
meta["unit_of_measurement"] = unit
def _meta(metas, wanted_metadata_id):
meta = None
for metadata_id, statistic_id, unit in metas:
if metadata_id == wanted_metadata_id:
meta = {"unit_of_measurement": unit, "statistic_id": statistic_id}
return meta
baked_query = hass.data[STATISTICS_META_BAKERY](
@ -130,13 +143,14 @@ def _get_meta_data(hass, session, statistic_ids):
baked_query += lambda q: q.filter(
StatisticsMeta.statistic_id.in_(bindparam("statistic_ids"))
)
if statistic_type == "mean":
baked_query += lambda q: q.filter(StatisticsMeta.has_mean.isnot(False))
if statistic_type == "sum":
baked_query += lambda q: q.filter(StatisticsMeta.has_sum.isnot(False))
result = execute(baked_query(session).params(statistic_ids=statistic_ids))
if statistic_ids is None:
statistic_ids = [statistic_id[0] for statistic_id in result]
return {id: _meta(result, id) for id in statistic_ids}
metadata_ids = [metadata[0] for metadata in result]
return {id: _meta(result, id) for id in metadata_ids}
def _configured_unit(unit: str, units) -> str:
@ -152,24 +166,11 @@ def list_statistic_ids(hass, statistic_type=None):
"""Return statistic_ids and meta data."""
units = hass.config.units
with session_scope(hass=hass) as session:
baked_query = hass.data[STATISTICS_BAKERY](
lambda session: session.query(*QUERY_STATISTIC_IDS).distinct()
)
meta_data = _get_meta_data(hass, session, None, statistic_type)
if statistic_type == "mean":
baked_query += lambda q: q.filter(Statistics.mean.isnot(None))
if statistic_type == "sum":
baked_query += lambda q: q.filter(Statistics.sum.isnot(None))
baked_query += lambda q: q.order_by(Statistics.statistic_id)
result = execute(baked_query(session))
statistic_ids = [statistic_id[0] for statistic_id in result]
meta_data = _get_meta_data(hass, session, statistic_ids)
for item in meta_data.values():
unit = _configured_unit(item["unit_of_measurement"], units)
item["unit_of_measurement"] = unit
for meta in meta_data.values():
unit = _configured_unit(meta["unit_of_measurement"], units)
meta["unit_of_measurement"] = unit
return list(meta_data.values())
@ -186,20 +187,24 @@ def statistics_during_period(hass, start_time, end_time=None, statistic_ids=None
if end_time is not None:
baked_query += lambda q: q.filter(Statistics.start < bindparam("end_time"))
metadata_ids = None
if statistic_ids is not None:
baked_query += lambda q: q.filter(
Statistics.statistic_id.in_(bindparam("statistic_ids"))
Statistics.metadata_id.in_(bindparam("metadata_ids"))
)
statistic_ids = [statistic_id.lower() for statistic_id in statistic_ids]
metadata_ids = _get_metadata_ids(hass, session, statistic_ids)
if not metadata_ids:
return {}
baked_query += lambda q: q.order_by(Statistics.statistic_id, Statistics.start)
baked_query += lambda q: q.order_by(Statistics.metadata_id, Statistics.start)
stats = execute(
baked_query(session).params(
start_time=start_time, end_time=end_time, statistic_ids=statistic_ids
start_time=start_time, end_time=end_time, metadata_ids=metadata_ids
)
)
meta_data = _get_meta_data(hass, session, statistic_ids)
meta_data = _get_meta_data(hass, session, statistic_ids, None)
return _sorted_statistics_to_dict(hass, stats, statistic_ids, meta_data)
@ -210,23 +215,28 @@ def get_last_statistics(hass, number_of_stats, statistic_id=None):
lambda session: session.query(*QUERY_STATISTICS)
)
metadata_id = None
if statistic_id is not None:
baked_query += lambda q: q.filter_by(statistic_id=bindparam("statistic_id"))
baked_query += lambda q: q.filter_by(metadata_id=bindparam("metadata_id"))
metadata_ids = _get_metadata_ids(hass, session, [statistic_id])
if not metadata_ids:
return {}
metadata_id = metadata_ids[0]
baked_query += lambda q: q.order_by(
Statistics.statistic_id, Statistics.start.desc()
Statistics.metadata_id, Statistics.start.desc()
)
baked_query += lambda q: q.limit(bindparam("number_of_stats"))
stats = execute(
baked_query(session).params(
number_of_stats=number_of_stats, statistic_id=statistic_id
number_of_stats=number_of_stats, metadata_id=metadata_id
)
)
statistic_ids = [statistic_id] if statistic_id is not None else None
meta_data = _get_meta_data(hass, session, statistic_ids)
meta_data = _get_meta_data(hass, session, statistic_ids, None)
return _sorted_statistics_to_dict(hass, stats, statistic_ids, meta_data)
@ -249,13 +259,14 @@ def _sorted_statistics_to_dict(
_process_timestamp_to_utc_isoformat = process_timestamp_to_utc_isoformat
# Append all statistic entries, and do unit conversion
for ent_id, group in groupby(stats, lambda state: state.statistic_id):
unit = meta_data[ent_id]["unit_of_measurement"]
for meta_id, group in groupby(stats, lambda state: state.metadata_id):
unit = meta_data[meta_id]["unit_of_measurement"]
statistic_id = meta_data[meta_id]["statistic_id"]
convert = UNIT_CONVERSIONS.get(unit, lambda x, units: x)
ent_results = result[ent_id]
ent_results = result[meta_id]
ent_results.extend(
{
"statistic_id": db_state.statistic_id,
"statistic_id": statistic_id,
"start": _process_timestamp_to_utc_isoformat(db_state.start),
"mean": convert(db_state.mean, units),
"min": convert(db_state.min, units),
@ -268,4 +279,4 @@ def _sorted_statistics_to_dict(
)
# Filter out the empty lists if some states had 0 results.
return {key: val for key, val in result.items() if val}
return {meta_data[key]["statistic_id"]: val for key, val in result.items() if val}

View file

@ -224,7 +224,11 @@ def compile_statistics(
result[entity_id] = {}
# Set meta data
result[entity_id]["meta"] = {"unit_of_measurement": unit}
result[entity_id]["meta"] = {
"unit_of_measurement": unit,
"has_mean": "mean" in wanted_statistics,
"has_sum": "sum" in wanted_statistics,
}
# Make calculations
stat: dict = {}