Support converting statistics to another unit (#79117)
This commit is contained in:
parent
772581dd28
commit
c52d0f7495
5 changed files with 448 additions and 6 deletions
|
@ -72,6 +72,7 @@ from .queries import find_shared_attributes_id, find_shared_data_id
|
|||
from .run_history import RunHistory
|
||||
from .tasks import (
|
||||
AdjustStatisticsTask,
|
||||
ChangeStatisticsUnitTask,
|
||||
ClearStatisticsTask,
|
||||
CommitTask,
|
||||
DatabaseLockTask,
|
||||
|
@ -511,6 +512,21 @@ class Recorder(threading.Thread):
|
|||
)
|
||||
)
|
||||
|
||||
@callback
|
||||
def async_change_statistics_unit(
|
||||
self,
|
||||
statistic_id: str,
|
||||
*,
|
||||
new_unit_of_measurement: str,
|
||||
old_unit_of_measurement: str,
|
||||
) -> None:
|
||||
"""Change statistics unit for a statistic_id."""
|
||||
self.queue_task(
|
||||
ChangeStatisticsUnitTask(
|
||||
statistic_id, new_unit_of_measurement, old_unit_of_measurement
|
||||
)
|
||||
)
|
||||
|
||||
@callback
|
||||
def async_import_statistics(
|
||||
self, metadata: StatisticMetaData, stats: Iterable[StatisticData]
|
||||
|
|
|
@ -41,7 +41,13 @@ from homeassistant.util.unit_conversion import (
|
|||
)
|
||||
|
||||
from .const import DOMAIN, MAX_ROWS_TO_PURGE, SupportedDialect
|
||||
from .db_schema import Statistics, StatisticsMeta, StatisticsRuns, StatisticsShortTerm
|
||||
from .db_schema import (
|
||||
Statistics,
|
||||
StatisticsBase,
|
||||
StatisticsMeta,
|
||||
StatisticsRuns,
|
||||
StatisticsShortTerm,
|
||||
)
|
||||
from .models import (
|
||||
StatisticData,
|
||||
StatisticMetaData,
|
||||
|
@ -208,6 +214,35 @@ def _get_display_to_statistic_unit_converter(
|
|||
)
|
||||
|
||||
|
||||
def _get_unit_converter(
|
||||
from_unit: str, to_unit: str
|
||||
) -> Callable[[float | None], float | None]:
|
||||
"""Prepare a converter from a unit to another unit."""
|
||||
|
||||
def convert_units(
|
||||
val: float | None, conv: type[BaseUnitConverter], from_unit: str, to_unit: str
|
||||
) -> float | None:
|
||||
"""Return converted val."""
|
||||
if val is None:
|
||||
return val
|
||||
return conv.convert(val, from_unit=from_unit, to_unit=to_unit)
|
||||
|
||||
for conv in STATISTIC_UNIT_TO_UNIT_CONVERTER.values():
|
||||
if from_unit in conv.VALID_UNITS and to_unit in conv.VALID_UNITS:
|
||||
return partial(
|
||||
convert_units, conv=conv, from_unit=from_unit, to_unit=to_unit
|
||||
)
|
||||
raise HomeAssistantError
|
||||
|
||||
|
||||
def can_convert_units(from_unit: str | None, to_unit: str | None) -> bool:
|
||||
"""Return True if it's possible to convert from from_unit to to_unit."""
|
||||
for converter in STATISTIC_UNIT_TO_UNIT_CONVERTER.values():
|
||||
if from_unit in converter.VALID_UNITS and to_unit in converter.VALID_UNITS:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class PlatformCompiledStatistics:
|
||||
"""Compiled Statistics from a platform."""
|
||||
|
@ -1614,3 +1649,79 @@ def adjust_statistics(
|
|||
)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _change_statistics_unit_for_table(
|
||||
session: Session,
|
||||
table: type[StatisticsBase],
|
||||
metadata_id: int,
|
||||
convert: Callable[[float | None], float | None],
|
||||
) -> None:
|
||||
"""Insert statistics in the database."""
|
||||
columns = [table.id, table.mean, table.min, table.max, table.state, table.sum]
|
||||
query = session.query(*columns).filter_by(metadata_id=bindparam("metadata_id"))
|
||||
rows = execute(query.params(metadata_id=metadata_id))
|
||||
for row in rows:
|
||||
session.query(table).filter(table.id == row.id).update(
|
||||
{
|
||||
table.mean: convert(row.mean),
|
||||
table.min: convert(row.min),
|
||||
table.max: convert(row.max),
|
||||
table.state: convert(row.state),
|
||||
table.sum: convert(row.sum),
|
||||
},
|
||||
synchronize_session=False,
|
||||
)
|
||||
|
||||
|
||||
def change_statistics_unit(
|
||||
instance: Recorder,
|
||||
statistic_id: str,
|
||||
new_unit: str,
|
||||
old_unit: str,
|
||||
) -> None:
|
||||
"""Change statistics unit for a statistic_id."""
|
||||
with session_scope(session=instance.get_session()) as session:
|
||||
metadata = get_metadata_with_session(
|
||||
instance.hass, session, statistic_ids=(statistic_id,)
|
||||
).get(statistic_id)
|
||||
|
||||
# Guard against the statistics being removed or updated before the
|
||||
# change_statistics_unit job executes
|
||||
if (
|
||||
metadata is None
|
||||
or metadata[1]["source"] != DOMAIN
|
||||
or metadata[1]["unit_of_measurement"] != old_unit
|
||||
):
|
||||
_LOGGER.warning("Could not change statistics unit for %s", statistic_id)
|
||||
return
|
||||
|
||||
metadata_id = metadata[0]
|
||||
|
||||
convert = _get_unit_converter(old_unit, new_unit)
|
||||
for table in (StatisticsShortTerm, Statistics):
|
||||
_change_statistics_unit_for_table(session, table, metadata_id, convert)
|
||||
session.query(StatisticsMeta).filter(
|
||||
StatisticsMeta.statistic_id == statistic_id
|
||||
).update({StatisticsMeta.unit_of_measurement: new_unit})
|
||||
|
||||
|
||||
@callback
|
||||
def async_change_statistics_unit(
|
||||
hass: HomeAssistant,
|
||||
statistic_id: str,
|
||||
*,
|
||||
new_unit_of_measurement: str,
|
||||
old_unit_of_measurement: str,
|
||||
) -> None:
|
||||
"""Change statistics unit for a statistic_id."""
|
||||
if not can_convert_units(old_unit_of_measurement, new_unit_of_measurement):
|
||||
raise HomeAssistantError(
|
||||
f"Can't convert {old_unit_of_measurement} to {new_unit_of_measurement}"
|
||||
)
|
||||
|
||||
get_instance(hass).async_change_statistics_unit(
|
||||
statistic_id,
|
||||
new_unit_of_measurement=new_unit_of_measurement,
|
||||
old_unit_of_measurement=old_unit_of_measurement,
|
||||
)
|
||||
|
|
|
@ -31,6 +31,24 @@ class RecorderTask(abc.ABC):
|
|||
"""Handle the task."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChangeStatisticsUnitTask(RecorderTask):
|
||||
"""Object to store statistics_id and unit to convert unit of statistics."""
|
||||
|
||||
statistic_id: str
|
||||
new_unit_of_measurement: str
|
||||
old_unit_of_measurement: str
|
||||
|
||||
def run(self, instance: Recorder) -> None:
|
||||
"""Handle the task."""
|
||||
statistics.change_statistics_unit(
|
||||
instance,
|
||||
self.statistic_id,
|
||||
self.new_unit_of_measurement,
|
||||
self.old_unit_of_measurement,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ClearStatisticsTask(RecorderTask):
|
||||
"""Object to store statistics_ids which for which to remove statistics."""
|
||||
|
|
|
@ -30,6 +30,7 @@ from homeassistant.util.unit_conversion import (
|
|||
from .const import MAX_QUEUE_BACKLOG
|
||||
from .statistics import (
|
||||
async_add_external_statistics,
|
||||
async_change_statistics_unit,
|
||||
async_import_statistics,
|
||||
list_statistic_ids,
|
||||
statistics_during_period,
|
||||
|
@ -46,6 +47,7 @@ def async_setup(hass: HomeAssistant) -> None:
|
|||
websocket_api.async_register_command(hass, ws_adjust_sum_statistics)
|
||||
websocket_api.async_register_command(hass, ws_backup_end)
|
||||
websocket_api.async_register_command(hass, ws_backup_start)
|
||||
websocket_api.async_register_command(hass, ws_change_statistics_unit)
|
||||
websocket_api.async_register_command(hass, ws_clear_statistics)
|
||||
websocket_api.async_register_command(hass, ws_get_statistics_during_period)
|
||||
websocket_api.async_register_command(hass, ws_get_statistics_metadata)
|
||||
|
@ -255,6 +257,32 @@ def ws_update_statistics_metadata(
|
|||
connection.send_result(msg["id"])
|
||||
|
||||
|
||||
@websocket_api.require_admin
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "recorder/change_statistics_unit",
|
||||
vol.Required("statistic_id"): str,
|
||||
vol.Required("new_unit_of_measurement"): vol.Any(str, None),
|
||||
vol.Required("old_unit_of_measurement"): vol.Any(str, None),
|
||||
}
|
||||
)
|
||||
@callback
|
||||
def ws_change_statistics_unit(
|
||||
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
|
||||
) -> None:
|
||||
"""Change the unit_of_measurement for a statistic_id.
|
||||
|
||||
All existing statistics will be converted to the new unit.
|
||||
"""
|
||||
async_change_statistics_unit(
|
||||
hass,
|
||||
msg["statistic_id"],
|
||||
new_unit_of_measurement=msg["new_unit_of_measurement"],
|
||||
old_unit_of_measurement=msg["old_unit_of_measurement"],
|
||||
)
|
||||
connection.send_result(msg["id"])
|
||||
|
||||
|
||||
@websocket_api.require_admin
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
|
|
|
@ -812,15 +812,17 @@ async def test_clear_statistics(hass, hass_ws_client, recorder_mock):
|
|||
assert response["result"] == {"sensor.test2": expected_response["sensor.test2"]}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("new_unit", ["dogs", None])
|
||||
@pytest.mark.parametrize(
|
||||
"new_unit, new_unit_class", [("dogs", None), (None, None), ("W", "power")]
|
||||
)
|
||||
async def test_update_statistics_metadata(
|
||||
hass, hass_ws_client, recorder_mock, new_unit
|
||||
hass, hass_ws_client, recorder_mock, new_unit, new_unit_class
|
||||
):
|
||||
"""Test removing statistics."""
|
||||
now = dt_util.utcnow()
|
||||
|
||||
units = METRIC_SYSTEM
|
||||
attributes = POWER_SENSOR_KW_ATTRIBUTES
|
||||
attributes = POWER_SENSOR_KW_ATTRIBUTES | {"device_class": None}
|
||||
state = 10
|
||||
|
||||
hass.config.units = units
|
||||
|
@ -845,8 +847,8 @@ async def test_update_statistics_metadata(
|
|||
"has_sum": False,
|
||||
"name": None,
|
||||
"source": "recorder",
|
||||
"statistics_unit_of_measurement": "W",
|
||||
"unit_class": "power",
|
||||
"statistics_unit_of_measurement": "kW",
|
||||
"unit_class": None,
|
||||
}
|
||||
]
|
||||
|
||||
|
@ -874,10 +876,277 @@ async def test_update_statistics_metadata(
|
|||
"name": None,
|
||||
"source": "recorder",
|
||||
"statistics_unit_of_measurement": new_unit,
|
||||
"unit_class": new_unit_class,
|
||||
}
|
||||
]
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 5,
|
||||
"type": "recorder/statistics_during_period",
|
||||
"start_time": now.isoformat(),
|
||||
"statistic_ids": ["sensor.test"],
|
||||
"period": "5minute",
|
||||
"units": {"power": "W"},
|
||||
}
|
||||
)
|
||||
response = await client.receive_json()
|
||||
assert response["success"]
|
||||
assert response["result"] == {
|
||||
"sensor.test": [
|
||||
{
|
||||
"end": (now + timedelta(minutes=5)).isoformat(),
|
||||
"last_reset": None,
|
||||
"max": 10.0,
|
||||
"mean": 10.0,
|
||||
"min": 10.0,
|
||||
"start": now.isoformat(),
|
||||
"state": None,
|
||||
"statistic_id": "sensor.test",
|
||||
"sum": None,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
async def test_change_statistics_unit(hass, hass_ws_client, recorder_mock):
|
||||
"""Test change unit of recorded statistics."""
|
||||
now = dt_util.utcnow()
|
||||
|
||||
units = METRIC_SYSTEM
|
||||
attributes = POWER_SENSOR_KW_ATTRIBUTES | {"device_class": None}
|
||||
state = 10
|
||||
|
||||
hass.config.units = units
|
||||
await async_setup_component(hass, "sensor", {})
|
||||
await async_recorder_block_till_done(hass)
|
||||
hass.states.async_set("sensor.test", state, attributes=attributes)
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
do_adhoc_statistics(hass, period="hourly", start=now)
|
||||
await async_recorder_block_till_done(hass)
|
||||
|
||||
client = await hass_ws_client()
|
||||
|
||||
await client.send_json({"id": 1, "type": "recorder/list_statistic_ids"})
|
||||
response = await client.receive_json()
|
||||
assert response["success"]
|
||||
assert response["result"] == [
|
||||
{
|
||||
"statistic_id": "sensor.test",
|
||||
"display_unit_of_measurement": "kW",
|
||||
"has_mean": True,
|
||||
"has_sum": False,
|
||||
"name": None,
|
||||
"source": "recorder",
|
||||
"statistics_unit_of_measurement": "kW",
|
||||
"unit_class": None,
|
||||
}
|
||||
]
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 2,
|
||||
"type": "recorder/statistics_during_period",
|
||||
"start_time": now.isoformat(),
|
||||
"statistic_ids": ["sensor.test"],
|
||||
"period": "5minute",
|
||||
}
|
||||
)
|
||||
response = await client.receive_json()
|
||||
assert response["success"]
|
||||
assert response["result"] == {
|
||||
"sensor.test": [
|
||||
{
|
||||
"end": (now + timedelta(minutes=5)).isoformat(),
|
||||
"last_reset": None,
|
||||
"max": 10.0,
|
||||
"mean": 10.0,
|
||||
"min": 10.0,
|
||||
"start": now.isoformat(),
|
||||
"state": None,
|
||||
"statistic_id": "sensor.test",
|
||||
"sum": None,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 3,
|
||||
"type": "recorder/change_statistics_unit",
|
||||
"statistic_id": "sensor.test",
|
||||
"new_unit_of_measurement": "W",
|
||||
"old_unit_of_measurement": "kW",
|
||||
}
|
||||
)
|
||||
response = await client.receive_json()
|
||||
assert response["success"]
|
||||
await async_recorder_block_till_done(hass)
|
||||
|
||||
await client.send_json({"id": 4, "type": "recorder/list_statistic_ids"})
|
||||
response = await client.receive_json()
|
||||
assert response["success"]
|
||||
assert response["result"] == [
|
||||
{
|
||||
"statistic_id": "sensor.test",
|
||||
"display_unit_of_measurement": "kW",
|
||||
"has_mean": True,
|
||||
"has_sum": False,
|
||||
"name": None,
|
||||
"source": "recorder",
|
||||
"statistics_unit_of_measurement": "W",
|
||||
"unit_class": "power",
|
||||
}
|
||||
]
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 5,
|
||||
"type": "recorder/statistics_during_period",
|
||||
"start_time": now.isoformat(),
|
||||
"statistic_ids": ["sensor.test"],
|
||||
"period": "5minute",
|
||||
"units": {"power": "W"},
|
||||
}
|
||||
)
|
||||
response = await client.receive_json()
|
||||
assert response["success"]
|
||||
assert response["result"] == {
|
||||
"sensor.test": [
|
||||
{
|
||||
"end": (now + timedelta(minutes=5)).isoformat(),
|
||||
"last_reset": None,
|
||||
"max": 10000.0,
|
||||
"mean": 10000.0,
|
||||
"min": 10000.0,
|
||||
"start": now.isoformat(),
|
||||
"state": None,
|
||||
"statistic_id": "sensor.test",
|
||||
"sum": None,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
async def test_change_statistics_unit_errors(
|
||||
hass, hass_ws_client, recorder_mock, caplog
|
||||
):
|
||||
"""Test change unit of recorded statistics."""
|
||||
now = dt_util.utcnow()
|
||||
ws_id = 0
|
||||
|
||||
units = METRIC_SYSTEM
|
||||
attributes = POWER_SENSOR_KW_ATTRIBUTES | {"device_class": None}
|
||||
state = 10
|
||||
|
||||
expected_statistic_ids = [
|
||||
{
|
||||
"statistic_id": "sensor.test",
|
||||
"display_unit_of_measurement": "kW",
|
||||
"has_mean": True,
|
||||
"has_sum": False,
|
||||
"name": None,
|
||||
"source": "recorder",
|
||||
"statistics_unit_of_measurement": "kW",
|
||||
"unit_class": None,
|
||||
}
|
||||
]
|
||||
|
||||
expected_statistics = {
|
||||
"sensor.test": [
|
||||
{
|
||||
"end": (now + timedelta(minutes=5)).isoformat(),
|
||||
"last_reset": None,
|
||||
"max": 10.0,
|
||||
"mean": 10.0,
|
||||
"min": 10.0,
|
||||
"start": now.isoformat(),
|
||||
"state": None,
|
||||
"statistic_id": "sensor.test",
|
||||
"sum": None,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
async def assert_statistic_ids(expected):
|
||||
nonlocal ws_id
|
||||
ws_id += 1
|
||||
await client.send_json({"id": ws_id, "type": "recorder/list_statistic_ids"})
|
||||
response = await client.receive_json()
|
||||
assert response["success"]
|
||||
assert response["result"] == expected
|
||||
|
||||
async def assert_statistics(expected):
|
||||
nonlocal ws_id
|
||||
ws_id += 1
|
||||
await client.send_json(
|
||||
{
|
||||
"id": ws_id,
|
||||
"type": "recorder/statistics_during_period",
|
||||
"start_time": now.isoformat(),
|
||||
"statistic_ids": ["sensor.test"],
|
||||
"period": "5minute",
|
||||
}
|
||||
)
|
||||
response = await client.receive_json()
|
||||
assert response["success"]
|
||||
assert response["result"] == expected
|
||||
|
||||
hass.config.units = units
|
||||
await async_setup_component(hass, "sensor", {})
|
||||
await async_recorder_block_till_done(hass)
|
||||
hass.states.async_set("sensor.test", state, attributes=attributes)
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
do_adhoc_statistics(hass, period="hourly", start=now)
|
||||
await async_recorder_block_till_done(hass)
|
||||
|
||||
client = await hass_ws_client()
|
||||
|
||||
await assert_statistic_ids(expected_statistic_ids)
|
||||
await assert_statistics(expected_statistics)
|
||||
|
||||
# Try changing to an invalid unit
|
||||
ws_id += 1
|
||||
await client.send_json(
|
||||
{
|
||||
"id": ws_id,
|
||||
"type": "recorder/change_statistics_unit",
|
||||
"statistic_id": "sensor.test",
|
||||
"old_unit_of_measurement": "kW",
|
||||
"new_unit_of_measurement": "dogs",
|
||||
}
|
||||
)
|
||||
response = await client.receive_json()
|
||||
assert not response["success"]
|
||||
assert response["error"]["message"] == "Can't convert kW to dogs"
|
||||
|
||||
await async_recorder_block_till_done(hass)
|
||||
|
||||
await assert_statistic_ids(expected_statistic_ids)
|
||||
await assert_statistics(expected_statistics)
|
||||
|
||||
# Try changing from the wrong unit
|
||||
ws_id += 1
|
||||
await client.send_json(
|
||||
{
|
||||
"id": ws_id,
|
||||
"type": "recorder/change_statistics_unit",
|
||||
"statistic_id": "sensor.test",
|
||||
"old_unit_of_measurement": "W",
|
||||
"new_unit_of_measurement": "kW",
|
||||
}
|
||||
)
|
||||
response = await client.receive_json()
|
||||
assert response["success"]
|
||||
|
||||
await async_recorder_block_till_done(hass)
|
||||
|
||||
assert "Could not change statistics unit for sensor.test" in caplog.text
|
||||
await assert_statistic_ids(expected_statistic_ids)
|
||||
await assert_statistics(expected_statistics)
|
||||
|
||||
|
||||
async def test_recorder_info(hass, hass_ws_client, recorder_mock):
|
||||
"""Test getting recorder status."""
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue