Add support for external statistics (#56607)

* Support external statistics

* Update tests

* Lint

* Adjust code after rebase

* Separate external statistic_id with :, add name to metadata

* Adjust tests

* Simplify get_metadata_with_session

* Address review comments

* Allow updating external statistics

* Validate input

* Adjust tests after rebase

* Pylint

* Adjust tests

* Improve test coverage
This commit is contained in:
Erik Montnemery 2021-10-26 10:26:50 +02:00 committed by GitHub
parent 2f346a8048
commit f594bc353b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 634 additions and 111 deletions

View file

@ -2,7 +2,7 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable
from collections.abc import Callable, Iterable
import concurrent.futures
from datetime import datetime, timedelta
import logging
@ -366,6 +366,13 @@ class StatisticsTask(NamedTuple):
start: datetime
class ExternalStatisticsTask(NamedTuple):
"""An object to insert into the recorder queue to run an external statistics task."""
metadata: dict
statistics: Iterable[dict]
class WaitTask:
"""An object to insert into the recorder queue to tell it set the _queue_watch event."""
@ -597,6 +604,11 @@ class Recorder(threading.Thread):
"""Update statistics metadata for a statistic_id."""
self.queue.put(UpdateStatisticsMetadataTask(statistic_id, unit_of_measurement))
@callback
def async_external_statistics(self, metadata, stats):
"""Schedule external statistics."""
self.queue.put(ExternalStatisticsTask(metadata, stats))
@callback
def _async_setup_periodic_tasks(self):
"""Prepare periodic tasks."""
@ -776,6 +788,13 @@ class Recorder(threading.Thread):
# Schedule a new statistics task if this one didn't finish
self.queue.put(StatisticsTask(start))
def _run_external_statistics(self, metadata, stats):
"""Run statistics task."""
if statistics.add_external_statistics(self, metadata, stats):
return
# Schedule a new statistics task if this one didn't finish
self.queue.put(StatisticsTask(metadata, stats))
def _process_one_event(self, event):
"""Process one event."""
if isinstance(event, PurgeTask):
@ -798,6 +817,9 @@ class Recorder(threading.Thread):
self, event.statistic_id, event.unit_of_measurement
)
return
if isinstance(event, ExternalStatisticsTask):
self._run_external_statistics(event.metadata, event.statistics)
return
if isinstance(event, WaitTask):
self._queue_watch.set()
return

View file

@ -579,6 +579,10 @@ def _apply_update(instance, session, new_version, old_version): # noqa: C901
sum=last_statistic.sum,
)
)
elif new_version == 23:
# Add name column to StatisticsMeta
_add_columns(session, "statistics_meta", ["name VARCHAR(255)"])
else:
raise ValueError(f"No schema migration defined for version {new_version}")

View file

@ -1,7 +1,6 @@
"""Models for SQLAlchemy."""
from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime, timedelta
import json
import logging
@ -41,7 +40,7 @@ import homeassistant.util.dt as dt_util
# pylint: disable=invalid-name
Base = declarative_base()
SCHEMA_VERSION = 22
SCHEMA_VERSION = 23
_LOGGER = logging.getLogger(__name__)
@ -231,7 +230,7 @@ class StatisticResult(TypedDict):
"""
meta: StatisticMetaData
stat: Iterable[StatisticData]
stat: StatisticData
class StatisticDataBase(TypedDict):
@ -310,10 +309,12 @@ class StatisticsShortTerm(Base, StatisticsBase): # type: ignore
class StatisticMetaData(TypedDict):
"""Statistic meta data class."""
statistic_id: str
unit_of_measurement: str | None
has_mean: bool
has_sum: bool
name: str | None
source: str
statistic_id: str
unit_of_measurement: str | None
class StatisticsMeta(Base): # type: ignore
@ -329,23 +330,12 @@ class StatisticsMeta(Base): # type: ignore
unit_of_measurement = Column(String(255))
has_mean = Column(Boolean)
has_sum = Column(Boolean)
name = Column(String(255))
@staticmethod
def from_meta(
source: str,
statistic_id: str,
unit_of_measurement: str | None,
has_mean: bool,
has_sum: bool,
) -> StatisticsMeta:
def from_meta(meta: StatisticMetaData) -> StatisticsMeta:
"""Create object from meta data."""
return StatisticsMeta(
source=source,
statistic_id=statistic_id,
unit_of_measurement=unit_of_measurement,
has_mean=has_mean,
has_sum=has_sum,
)
return StatisticsMeta(**meta)
class RecorderRuns(Base): # type: ignore

View file

@ -7,6 +7,7 @@ import dataclasses
from datetime import datetime, timedelta
from itertools import chain, groupby
import logging
import re
from statistics import mean
from typing import TYPE_CHECKING, Any, Literal
@ -23,6 +24,7 @@ from homeassistant.const import (
VOLUME_CUBIC_METERS,
)
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry
import homeassistant.util.dt as dt_util
import homeassistant.util.pressure as pressure_util
@ -30,7 +32,7 @@ import homeassistant.util.temperature as temperature_util
from homeassistant.util.unit_system import UnitSystem
import homeassistant.util.volume as volume_util
from .const import DOMAIN
from .const import DATA_INSTANCE, DOMAIN
from .models import (
StatisticData,
StatisticMetaData,
@ -100,9 +102,11 @@ QUERY_STATISTICS_SUMMARY_SUM_LEGACY = [
QUERY_STATISTIC_META = [
StatisticsMeta.id,
StatisticsMeta.statistic_id,
StatisticsMeta.source,
StatisticsMeta.unit_of_measurement,
StatisticsMeta.has_mean,
StatisticsMeta.has_sum,
StatisticsMeta.name,
]
QUERY_STATISTIC_META_ID = [
@ -138,6 +142,22 @@ UNIT_CONVERSIONS = {
_LOGGER = logging.getLogger(__name__)
def split_statistic_id(entity_id: str) -> list[str]:
"""Split a state entity ID into domain and object ID."""
return entity_id.split(":", 1)
VALID_STATISTIC_ID = re.compile(r"^(?!.+__)(?!_)[\da-z_]+(?<!_):(?!_)[\da-z_]+(?<!_)$")
def valid_statistic_id(statistic_id: str) -> bool:
"""Test if a statistic ID is a valid format.
Format: <domain>:<statistic> where both are slugs.
"""
return VALID_STATISTIC_ID.match(statistic_id) is not None
@dataclasses.dataclass
class ValidationIssue:
"""Error or warning message."""
@ -208,10 +228,7 @@ def _update_or_add_metadata(
hass, session, statistic_ids=[statistic_id]
)
if not old_metadata_dict:
unit = new_metadata["unit_of_measurement"]
has_mean = new_metadata["has_mean"]
has_sum = new_metadata["has_sum"]
meta = StatisticsMeta.from_meta(DOMAIN, statistic_id, unit, has_mean, has_sum)
meta = StatisticsMeta.from_meta(new_metadata)
session.add(meta)
session.flush() # Flush to get the metadata id assigned
_LOGGER.debug(
@ -397,15 +414,12 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
with session_scope(session=instance.get_session()) as session: # type: ignore
for stats in platform_stats:
metadata_id = _update_or_add_metadata(instance.hass, session, stats["meta"])
for stat in stats["stat"]:
try:
session.add(StatisticsShortTerm.from_stats(metadata_id, stat))
except SQLAlchemyError:
_LOGGER.exception(
"Unexpected exception when inserting statistics %s:%s ",
metadata_id,
stats,
)
_insert_statistics(
session,
StatisticsShortTerm,
metadata_id,
stats["stat"],
)
if start.minute == 55:
# A full hour is ready, summarize it
@ -416,6 +430,50 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
return True
def _insert_statistics(
session: scoped_session,
table: type[Statistics | StatisticsShortTerm],
metadata_id: int,
statistic: StatisticData,
) -> None:
"""Insert statistics in the database."""
try:
session.add(table.from_stats(metadata_id, statistic))
except SQLAlchemyError:
_LOGGER.exception(
"Unexpected exception when inserting statistics %s:%s ",
metadata_id,
statistic,
)
def _update_statistics(
session: scoped_session,
table: type[Statistics | StatisticsShortTerm],
stat_id: int,
statistic: StatisticData,
) -> None:
"""Insert statistics in the database."""
try:
session.query(table).filter_by(id=stat_id).update(
{
table.mean: statistic["mean"],
table.min: statistic["min"],
table.max: statistic["max"],
table.last_reset: statistic["last_reset"],
table.state: statistic["state"],
table.sum: statistic["sum"],
},
synchronize_session=False,
)
except SQLAlchemyError:
_LOGGER.exception(
"Unexpected exception when updating statistics %s:%s ",
id,
statistic,
)
def get_metadata_with_session(
hass: HomeAssistant,
session: scoped_session,
@ -426,24 +484,12 @@ def get_metadata_with_session(
) -> dict[str, tuple[int, StatisticMetaData]]:
"""Fetch meta data.
Returns a dict of (metadata_id, StatisticMetaData) indexed by statistic_id.
Returns a dict of (metadata_id, StatisticMetaData) tuples indexed by statistic_id.
If statistic_ids is given, fetch metadata only for the listed statistics_ids.
If statistic_type is given, fetch metadata only for statistic_ids supporting it.
"""
def _meta(metas: list, wanted_metadata_id: str) -> StatisticMetaData | None:
meta: StatisticMetaData | None = None
for metadata_id, statistic_id, unit, has_mean, has_sum in metas:
if metadata_id == wanted_metadata_id:
meta = {
"statistic_id": statistic_id,
"unit_of_measurement": unit,
"has_mean": has_mean,
"has_sum": has_sum,
}
return meta
# Fetch metatadata from the database
baked_query = hass.data[STATISTICS_META_BAKERY](
lambda session: session.query(*QUERY_STATISTIC_META)
@ -468,14 +514,20 @@ def get_metadata_with_session(
if not result:
return {}
metadata_ids = [metadata[0] for metadata in result]
# Prepare the result dict
metadata: dict[str, tuple[int, StatisticMetaData]] = {}
for _id in metadata_ids:
meta = _meta(result, _id)
if meta:
metadata[meta["statistic_id"]] = (_id, meta)
return metadata
return {
meta["statistic_id"]: (
meta["id"],
{
"source": meta["source"],
"statistic_id": meta["statistic_id"],
"unit_of_measurement": meta["unit_of_measurement"],
"has_mean": meta["has_mean"],
"has_sum": meta["has_sum"],
"name": meta["name"],
},
)
for meta in result
}
def get_metadata(
@ -553,7 +605,11 @@ def list_statistic_ids(
meta["unit_of_measurement"] = unit
statistic_ids = {
meta["statistic_id"]: meta["unit_of_measurement"]
meta["statistic_id"]: {
"name": meta["name"],
"source": meta["source"],
"unit_of_measurement": meta["unit_of_measurement"],
}
for _, meta in metadata.values()
}
@ -563,19 +619,25 @@ def list_statistic_ids(
continue
platform_statistic_ids = platform.list_statistic_ids(hass, statistic_type)
for statistic_id, unit in platform_statistic_ids.items():
for statistic_id, info in platform_statistic_ids.items():
unit = info["unit_of_measurement"]
if unit is not None:
# Display unit according to user settings
unit = _configured_unit(unit, units)
platform_statistic_ids[statistic_id] = unit
platform_statistic_ids[statistic_id]["unit_of_measurement"] = unit
for key, value in platform_statistic_ids.items():
statistic_ids.setdefault(key, value)
# Return a map of statistic_id to unit_of_measurement
# Return a list of statistic_id + metadata
return [
{"statistic_id": _id, "unit_of_measurement": unit}
for _id, unit in statistic_ids.items()
{
"statistic_id": _id,
"name": info.get("name"),
"source": info["source"],
"unit_of_measurement": info["unit_of_measurement"],
}
for _id, info in statistic_ids.items()
]
@ -919,3 +981,69 @@ def validate_statistics(hass: HomeAssistant) -> dict[str, list[ValidationIssue]]
continue
platform_validation.update(platform.validate_statistics(hass))
return platform_validation
def _statistics_exists(
session: scoped_session,
table: type[Statistics | StatisticsShortTerm],
metadata_id: int,
start: datetime,
) -> int | None:
"""Return id if a statistics entry already exists."""
result = (
session.query(table.id)
.filter(table.metadata_id == metadata_id and table.start == start)
.first()
)
return result["id"] if result else None
@callback
def async_add_external_statistics(
hass: HomeAssistant,
metadata: StatisticMetaData,
statistics: Iterable[StatisticData],
) -> None:
"""Add hourly statistics from an external source.
This inserts an add_external_statistics job in the recorder's queue.
"""
# The statistic_id has same limitations as an entity_id, but with a ':' as separator
if not valid_statistic_id(metadata["statistic_id"]):
raise HomeAssistantError("Invalid statistic_id")
# The source must not be empty and must be aligned with the statistic_id
domain, _object_id = split_statistic_id(metadata["statistic_id"])
if not metadata["source"] or metadata["source"] != domain:
raise HomeAssistantError("Invalid source")
for statistic in statistics:
start = statistic["start"]
if start.tzinfo is None or start.tzinfo.utcoffset(start) is None:
raise HomeAssistantError("Naive timestamp")
if start.minute != 0 or start.second != 0 or start.microsecond != 0:
raise HomeAssistantError("Invalid timestamp")
statistic["start"] = dt_util.as_utc(start)
# Insert job in recorder's queue
hass.data[DATA_INSTANCE].async_external_statistics(metadata, statistics)
@retryable_database_job("statistics")
def add_external_statistics(
instance: Recorder,
metadata: StatisticMetaData,
statistics: Iterable[StatisticData],
) -> bool:
"""Process an add_statistics job."""
with session_scope(session=instance.get_session()) as session: # type: ignore
metadata_id = _update_or_add_metadata(instance.hass, session, metadata)
for stat in statistics:
if stat_id := _statistics_exists(
session, Statistics, metadata_id, stat["start"]
):
_update_statistics(session, Statistics, stat_id, stat)
else:
_insert_statistics(session, Statistics, metadata_id, stat)
return True

View file

@ -490,10 +490,12 @@ def _compile_statistics( # noqa: C901
# Set meta data
meta: StatisticMetaData = {
"statistic_id": entity_id,
"unit_of_measurement": unit,
"has_mean": "mean" in wanted_statistics[entity_id],
"has_sum": "sum" in wanted_statistics[entity_id],
"name": None,
"source": RECORDER_DOMAIN,
"statistic_id": entity_id,
"unit_of_measurement": unit,
}
# Make calculations
@ -606,7 +608,7 @@ def _compile_statistics( # noqa: C901
stat["sum"] = _sum
stat["state"] = new_state
result.append({"meta": meta, "stat": (stat,)})
result.append({"meta": meta, "stat": stat})
return result
@ -638,14 +640,20 @@ def list_statistic_ids(hass: HomeAssistant, statistic_type: str | None = None) -
continue
if device_class not in UNIT_CONVERSIONS:
statistic_ids[state.entity_id] = native_unit
statistic_ids[state.entity_id] = {
"source": RECORDER_DOMAIN,
"unit_of_measurement": native_unit,
}
continue
if native_unit not in UNIT_CONVERSIONS[device_class]:
continue
statistics_unit = DEVICE_CLASS_UNITS[device_class]
statistic_ids[state.entity_id] = statistics_unit
statistic_ids[state.entity_id] = {
"source": RECORDER_DOMAIN,
"unit_of_measurement": statistics_unit,
}
return statistic_ids

View file

@ -232,7 +232,7 @@ async def test_cost_sensor_price_entity_total_increasing(
await async_wait_recording_done_without_instance(hass)
all_statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
statistics = get_statistics_for_entity(all_statistics, cost_sensor_entity_id)
assert statistics["stat"][0]["sum"] == 19.0
assert statistics["stat"]["sum"] == 19.0
# Energy sensor has a small dip, no reset should be detected
hass.states.async_set(
@ -272,7 +272,7 @@ async def test_cost_sensor_price_entity_total_increasing(
await async_wait_recording_done_without_instance(hass)
all_statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
statistics = get_statistics_for_entity(all_statistics, cost_sensor_entity_id)
assert statistics["stat"][0]["sum"] == 38.0
assert statistics["stat"]["sum"] == 38.0
@pytest.mark.parametrize("initial_energy,initial_cost", [(0, "0.0"), (None, "unknown")])
@ -437,7 +437,7 @@ async def test_cost_sensor_price_entity_total(
await async_wait_recording_done_without_instance(hass)
all_statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
statistics = get_statistics_for_entity(all_statistics, cost_sensor_entity_id)
assert statistics["stat"][0]["sum"] == 19.0
assert statistics["stat"]["sum"] == 19.0
# Energy sensor has a small dip
hass.states.async_set(
@ -478,7 +478,7 @@ async def test_cost_sensor_price_entity_total(
await async_wait_recording_done_without_instance(hass)
all_statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
statistics = get_statistics_for_entity(all_statistics, cost_sensor_entity_id)
assert statistics["stat"][0]["sum"] == 38.0
assert statistics["stat"]["sum"] == 38.0
@pytest.mark.parametrize("initial_energy,initial_cost", [(0, "0.0"), (None, "unknown")])
@ -642,7 +642,7 @@ async def test_cost_sensor_price_entity_total_no_reset(
await async_wait_recording_done_without_instance(hass)
all_statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
statistics = get_statistics_for_entity(all_statistics, cost_sensor_entity_id)
assert statistics["stat"][0]["sum"] == 19.0
assert statistics["stat"]["sum"] == 19.0
# Energy sensor has a small dip
hass.states.async_set(
@ -659,7 +659,7 @@ async def test_cost_sensor_price_entity_total_no_reset(
await async_wait_recording_done_without_instance(hass)
all_statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
statistics = get_statistics_for_entity(all_statistics, cost_sensor_entity_id)
assert statistics["stat"][0]["sum"] == 18.0
assert statistics["stat"]["sum"] == 18.0
async def test_cost_sensor_handle_wh(hass, hass_storage) -> None:

View file

@ -1010,7 +1010,12 @@ async def test_list_statistic_ids(hass, hass_ws_client, units, attributes, unit)
response = await client.receive_json()
assert response["success"]
assert response["result"] == [
{"statistic_id": "sensor.test", "unit_of_measurement": unit}
{
"statistic_id": "sensor.test",
"name": None,
"source": "recorder",
"unit_of_measurement": unit,
}
]
hass.data[recorder.DATA_INSTANCE].do_adhoc_statistics(start=now)
@ -1023,7 +1028,12 @@ async def test_list_statistic_ids(hass, hass_ws_client, units, attributes, unit)
response = await client.receive_json()
assert response["success"]
assert response["result"] == [
{"statistic_id": "sensor.test", "unit_of_measurement": unit}
{
"statistic_id": "sensor.test",
"name": None,
"source": "recorder",
"unit_of_measurement": unit,
}
]
await client.send_json(
@ -1038,7 +1048,12 @@ async def test_list_statistic_ids(hass, hass_ws_client, units, attributes, unit)
response = await client.receive_json()
assert response["success"]
assert response["result"] == [
{"statistic_id": "sensor.test", "unit_of_measurement": unit}
{
"statistic_id": "sensor.test",
"name": None,
"source": "recorder",
"unit_of_measurement": unit,
}
]
await client.send_json(

View file

@ -13,10 +13,14 @@ from homeassistant.components.recorder.models import (
process_timestamp_to_utc_isoformat,
)
from homeassistant.components.recorder.statistics import (
async_add_external_statistics,
get_last_statistics,
get_metadata,
list_statistic_ids,
statistics_during_period,
)
from homeassistant.const import TEMP_CELSIUS
from homeassistant.exceptions import HomeAssistantError
from homeassistant.setup import setup_component
import homeassistant.util.dt as dt_util
@ -117,7 +121,7 @@ def mock_sensor_statistics():
"has_mean": True,
"has_sum": False,
},
"stat": ({"start": start},),
"stat": {"start": start},
}
def get_fake_stats(_hass, start, _end):
@ -301,6 +305,177 @@ def test_statistics_duplicated(hass_recorder, caplog):
caplog.clear()
def test_external_statistics(hass_recorder, caplog):
"""Test inserting external statistics."""
hass = hass_recorder()
wait_recording_done(hass)
assert "Compiling statistics for" not in caplog.text
assert "Statistics already compiled" not in caplog.text
zero = dt_util.utcnow()
period1 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
external_statistics = {
"start": period1,
"last_reset": None,
"state": 0,
"sum": 2,
}
external_metadata = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import",
"unit_of_measurement": "kWh",
}
async_add_external_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
stats = statistics_during_period(hass, zero, period="hour")
assert stats == {
"test:total_energy_import": [
{
"statistic_id": "test:total_energy_import",
"start": period1.isoformat(),
"end": (period1 + timedelta(hours=1)).isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": approx(0.0),
"sum": approx(2.0),
}
]
}
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{
"statistic_id": "test:total_energy_import",
"name": "Total imported energy",
"source": "test",
"unit_of_measurement": "kWh",
}
]
metadata = get_metadata(hass, statistic_ids=("test:total_energy_import",))
assert metadata == {
"test:total_energy_import": (
1,
{
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import",
"unit_of_measurement": "kWh",
},
)
}
# Update the previously inserted statistics
external_statistics = {
"start": period1,
"max": 1,
"mean": 2,
"min": 3,
"last_reset": None,
"state": 4,
"sum": 5,
}
async_add_external_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
stats = statistics_during_period(hass, zero, period="hour")
assert stats == {
"test:total_energy_import": [
{
"statistic_id": "test:total_energy_import",
"start": period1.isoformat(),
"end": (period1 + timedelta(hours=1)).isoformat(),
"max": approx(1.0),
"mean": approx(2.0),
"min": approx(3.0),
"last_reset": None,
"state": approx(4.0),
"sum": approx(5.0),
}
]
}
def test_external_statistics_errors(hass_recorder, caplog):
"""Test validation of external statistics."""
hass = hass_recorder()
wait_recording_done(hass)
assert "Compiling statistics for" not in caplog.text
assert "Statistics already compiled" not in caplog.text
zero = dt_util.utcnow()
period1 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
_external_statistics = {
"start": period1,
"last_reset": None,
"state": 0,
"sum": 2,
}
_external_metadata = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import",
"unit_of_measurement": "kWh",
}
# Attempt to insert statistics for an entity
external_metadata = {
**_external_metadata,
"statistic_id": "sensor.total_energy_import",
}
external_statistics = {**_external_statistics}
with pytest.raises(HomeAssistantError):
async_add_external_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
assert statistics_during_period(hass, zero, period="hour") == {}
assert list_statistic_ids(hass) == []
assert get_metadata(hass, statistic_ids=("sensor.total_energy_import",)) == {}
# Attempt to insert statistics for the wrong domain
external_metadata = {**_external_metadata, "source": "other"}
external_statistics = {**_external_statistics}
with pytest.raises(HomeAssistantError):
async_add_external_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
assert statistics_during_period(hass, zero, period="hour") == {}
assert list_statistic_ids(hass) == []
assert get_metadata(hass, statistic_ids=("test:total_energy_import",)) == {}
# Attempt to insert statistics for an naive starting time
external_metadata = {**_external_metadata}
external_statistics = {
**_external_statistics,
"start": period1.replace(tzinfo=None),
}
with pytest.raises(HomeAssistantError):
async_add_external_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
assert statistics_during_period(hass, zero, period="hour") == {}
assert list_statistic_ids(hass) == []
assert get_metadata(hass, statistic_ids=("test:total_energy_import",)) == {}
# Attempt to insert statistics for an invalid starting time
external_metadata = {**_external_metadata}
external_statistics = {**_external_statistics, "start": period1.replace(minute=1)}
with pytest.raises(HomeAssistantError):
async_add_external_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
assert statistics_during_period(hass, zero, period="hour") == {}
assert list_statistic_ids(hass) == []
assert get_metadata(hass, statistic_ids=("test:total_energy_import",)) == {}
def record_states(hass):
"""Record some test states.

View file

@ -206,7 +206,12 @@ async def test_update_statistics_metadata(hass, hass_ws_client, new_unit):
response = await client.receive_json()
assert response["success"]
assert response["result"] == [
{"statistic_id": "sensor.test", "unit_of_measurement": "W"}
{
"statistic_id": "sensor.test",
"name": None,
"source": "recorder",
"unit_of_measurement": "W",
}
]
await client.send_json(
@ -225,5 +230,10 @@ async def test_update_statistics_metadata(hass, hass_ws_client, new_unit):
response = await client.receive_json()
assert response["success"]
assert response["result"] == [
{"statistic_id": "sensor.test", "unit_of_measurement": new_unit}
{
"statistic_id": "sensor.test",
"name": None,
"source": "recorder",
"unit_of_measurement": new_unit,
}
]

View file

@ -112,7 +112,12 @@ def test_compile_hourly_statistics(
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
}
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -170,7 +175,12 @@ def test_compile_hourly_statistics_purged_state_changes(
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
}
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -231,9 +241,24 @@ def test_compile_hourly_statistics_unsupported(hass_recorder, caplog, attributes
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": "°C"},
{"statistic_id": "sensor.test6", "unit_of_measurement": "°C"},
{"statistic_id": "sensor.test7", "unit_of_measurement": "°C"},
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": "°C",
},
{
"statistic_id": "sensor.test6",
"name": None,
"source": "recorder",
"unit_of_measurement": "°C",
},
{
"statistic_id": "sensor.test7",
"name": None,
"source": "recorder",
"unit_of_measurement": "°C",
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -334,7 +359,12 @@ def test_compile_hourly_sum_statistics_amount(
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": display_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": display_unit,
}
]
stats = statistics_during_period(hass, period0, period="5minute")
expected_stats = {
@ -471,7 +501,12 @@ def test_compile_hourly_sum_statistics_amount_reset_every_state_change(
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
}
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -555,7 +590,12 @@ def test_compile_hourly_sum_statistics_amount_invalid_last_reset(
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
}
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -623,7 +663,12 @@ def test_compile_hourly_sum_statistics_nan_inf_state(
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
}
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -729,6 +774,8 @@ def test_compile_hourly_sum_statistics_negative_state(
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert {
"name": None,
"source": "recorder",
"statistic_id": entity_id,
"unit_of_measurement": native_unit,
} in statistic_ids
@ -802,7 +849,12 @@ def test_compile_hourly_sum_statistics_total_no_reset(
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
}
]
stats = statistics_during_period(hass, period0, period="5minute")
assert stats == {
@ -888,7 +940,12 @@ def test_compile_hourly_sum_statistics_total_increasing(
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
}
]
stats = statistics_during_period(hass, period0, period="5minute")
assert stats == {
@ -984,7 +1041,12 @@ def test_compile_hourly_sum_statistics_total_increasing_small_dip(
) in caplog.text
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
}
]
stats = statistics_during_period(hass, period0, period="5minute")
assert stats == {
@ -1077,7 +1139,12 @@ def test_compile_hourly_energy_statistics_unsupported(hass_recorder, caplog):
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": "kWh"}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": "kWh",
}
]
stats = statistics_during_period(hass, period0, period="5minute")
assert stats == {
@ -1164,9 +1231,24 @@ def test_compile_hourly_energy_statistics_multiple(hass_recorder, caplog):
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": "kWh"},
{"statistic_id": "sensor.test2", "unit_of_measurement": "kWh"},
{"statistic_id": "sensor.test3", "unit_of_measurement": "kWh"},
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": "kWh",
},
{
"statistic_id": "sensor.test2",
"name": None,
"source": "recorder",
"unit_of_measurement": "kWh",
},
{
"statistic_id": "sensor.test3",
"name": None,
"source": "recorder",
"unit_of_measurement": "kWh",
},
]
stats = statistics_during_period(hass, period0, period="5minute")
assert stats == {
@ -1476,13 +1558,23 @@ def test_list_statistic_ids(
hass.states.set("sensor.test1", 0, attributes=attributes)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
},
]
for stat_type in ["mean", "sum", "dogs"]:
statistic_ids = list_statistic_ids(hass, statistic_type=stat_type)
if statistic_type == stat_type:
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
},
]
else:
assert statistic_ids == []
@ -1554,7 +1646,12 @@ def test_compile_hourly_statistics_changing_units_1(
assert "does not match the unit of already compiled" not in caplog.text
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -1581,7 +1678,12 @@ def test_compile_hourly_statistics_changing_units_1(
)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -1639,7 +1741,12 @@ def test_compile_hourly_statistics_changing_units_2(
assert "and matches the unit of already compiled statistics" not in caplog.text
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": "cats"}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": "cats",
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {}
@ -1687,7 +1794,12 @@ def test_compile_hourly_statistics_changing_units_3(
assert "does not match the unit of already compiled" not in caplog.text
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -1712,7 +1824,12 @@ def test_compile_hourly_statistics_changing_units_3(
assert f"matches the unit of already compiled statistics ({unit})" in caplog.text
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": native_unit,
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -1760,7 +1877,12 @@ def test_compile_hourly_statistics_changing_device_class_1(
assert "does not match the unit of already compiled" not in caplog.text
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": state_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": state_unit,
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -1801,7 +1923,12 @@ def test_compile_hourly_statistics_changing_device_class_1(
)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": state_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": state_unit,
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -1850,7 +1977,12 @@ def test_compile_hourly_statistics_changing_device_class_2(
assert "does not match the unit of already compiled" not in caplog.text
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": statistic_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": statistic_unit,
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -1891,7 +2023,12 @@ def test_compile_hourly_statistics_changing_device_class_2(
)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": statistic_unit}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": statistic_unit,
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
@ -1943,7 +2080,12 @@ def test_compile_hourly_statistics_changing_statistics(
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": None}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": None,
},
]
metadata = get_metadata(hass, statistic_ids=("sensor.test1",))
assert metadata == {
@ -1952,6 +2094,8 @@ def test_compile_hourly_statistics_changing_statistics(
{
"has_mean": True,
"has_sum": False,
"name": None,
"source": "recorder",
"statistic_id": "sensor.test1",
"unit_of_measurement": None,
},
@ -1968,7 +2112,12 @@ def test_compile_hourly_statistics_changing_statistics(
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": None}
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": None,
},
]
metadata = get_metadata(hass, statistic_ids=("sensor.test1",))
assert metadata == {
@ -1977,6 +2126,8 @@ def test_compile_hourly_statistics_changing_statistics(
{
"has_mean": False,
"has_sum": True,
"name": None,
"source": "recorder",
"statistic_id": "sensor.test1",
"unit_of_measurement": None,
},
@ -2155,10 +2306,30 @@ def test_compile_statistics_hourly_daily_monthly_summary(
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": "%"},
{"statistic_id": "sensor.test2", "unit_of_measurement": "%"},
{"statistic_id": "sensor.test3", "unit_of_measurement": "%"},
{"statistic_id": "sensor.test4", "unit_of_measurement": "EUR"},
{
"statistic_id": "sensor.test1",
"name": None,
"source": "recorder",
"unit_of_measurement": "%",
},
{
"statistic_id": "sensor.test2",
"name": None,
"source": "recorder",
"unit_of_measurement": "%",
},
{
"statistic_id": "sensor.test3",
"name": None,
"source": "recorder",
"unit_of_measurement": "%",
},
{
"statistic_id": "sensor.test4",
"name": None,
"source": "recorder",
"unit_of_measurement": "EUR",
},
]
stats = statistics_during_period(hass, zero, period="5minute")