Remove recorder history queries for database schemas < 31 (#125652)

This commit is contained in:
Erik Montnemery 2024-09-10 19:26:19 +02:00 committed by GitHub
parent 650c92a3cf
commit bde92b34dd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 88 additions and 2224 deletions

View file

@ -1,11 +0,0 @@
"""Common functions for history."""
from __future__ import annotations
from homeassistant.core import HomeAssistant
from ... import recorder
def _schema_version(hass: HomeAssistant) -> int:
return recorder.get_instance(hass).schema_version

View file

@ -24,19 +24,9 @@ import homeassistant.util.dt as dt_util
from ... import recorder
from ..db_schema import RecorderRuns, StateAttributes, States
from ..filters import Filters
from ..models import (
process_datetime_to_timestamp,
process_timestamp,
process_timestamp_to_utc_isoformat,
)
from ..models.legacy import (
LegacyLazyState,
LegacyLazyStatePreSchema31,
legacy_row_to_compressed_state,
legacy_row_to_compressed_state_pre_schema_31,
)
from ..models import process_timestamp, process_timestamp_to_utc_isoformat
from ..models.legacy import LegacyLazyState, legacy_row_to_compressed_state
from ..util import execute_stmt_lambda_element, session_scope
from .common import _schema_version
from .const import (
LAST_CHANGED_KEY,
NEED_ATTRIBUTE_DOMAINS,
@ -137,7 +127,7 @@ _FIELD_MAP_PRE_SCHEMA_31 = {
def _lambda_stmt_and_join_attributes(
schema_version: int, no_attributes: bool, include_last_changed: bool = True
no_attributes: bool, include_last_changed: bool = True
) -> tuple[StatementLambdaElement, bool]:
"""Return the lambda_stmt and if StateAttributes should be joined.
@ -148,7 +138,6 @@ def _lambda_stmt_and_join_attributes(
# without the attributes fields and do not join the
# state_attributes table
if no_attributes:
if schema_version >= 31:
if include_last_changed:
return (
lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR)),
@ -158,31 +147,10 @@ def _lambda_stmt_and_join_attributes(
lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED)),
False,
)
if include_last_changed:
return (
lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR_PRE_SCHEMA_31)),
False,
)
return (
lambda_stmt(
lambda: select(*_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED_PRE_SCHEMA_31)
),
False,
)
if schema_version >= 31:
if include_last_changed:
return lambda_stmt(lambda: select(*_QUERY_STATES)), True
return lambda_stmt(lambda: select(*_QUERY_STATES_NO_LAST_CHANGED)), True
# Finally if no migration is in progress and no_attributes
# was not requested, we query both attributes columns and
# join state_attributes
if include_last_changed:
return lambda_stmt(lambda: select(*_QUERY_STATES_PRE_SCHEMA_31)), True
return (
lambda_stmt(lambda: select(*_QUERY_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31)),
True,
)
def get_significant_states(
@ -215,7 +183,6 @@ def get_significant_states(
def _significant_states_stmt(
schema_version: int,
start_time: datetime,
end_time: datetime | None,
entity_ids: list[str],
@ -224,25 +191,18 @@ def _significant_states_stmt(
) -> StatementLambdaElement:
"""Query the database for significant state changes."""
stmt, join_attributes = _lambda_stmt_and_join_attributes(
schema_version, no_attributes, include_last_changed=not significant_changes_only
no_attributes, include_last_changed=not significant_changes_only
)
if (
len(entity_ids) == 1
and significant_changes_only
and split_entity_id(entity_ids[0])[0] not in SIGNIFICANT_DOMAINS
):
if schema_version >= 31:
stmt += lambda q: q.filter(
(States.last_changed_ts == States.last_updated_ts)
| States.last_changed_ts.is_(None)
)
else:
stmt += lambda q: q.filter(
(States.last_changed == States.last_updated)
| States.last_changed.is_(None)
)
elif significant_changes_only:
if schema_version >= 31:
stmt += lambda q: q.filter(
or_(
*[
@ -255,40 +215,19 @@ def _significant_states_stmt(
),
)
)
else:
stmt += lambda q: q.filter(
or_(
*[
States.entity_id.like(entity_domain)
for entity_domain in SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE
],
(
(States.last_changed == States.last_updated)
| States.last_changed.is_(None)
),
)
)
stmt += lambda q: q.filter(States.entity_id.in_(entity_ids))
if schema_version >= 31:
start_time_ts = start_time.timestamp()
stmt += lambda q: q.filter(States.last_updated_ts > start_time_ts)
if end_time:
end_time_ts = end_time.timestamp()
stmt += lambda q: q.filter(States.last_updated_ts < end_time_ts)
else:
stmt += lambda q: q.filter(States.last_updated > start_time)
if end_time:
stmt += lambda q: q.filter(States.last_updated < end_time)
if join_attributes:
stmt += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
if schema_version >= 31:
stmt += lambda q: q.order_by(States.entity_id, States.last_updated_ts)
else:
stmt += lambda q: q.order_by(States.entity_id, States.last_updated)
return stmt
@ -321,7 +260,6 @@ def get_significant_states_with_session(
if not entity_ids:
raise ValueError("entity_ids must be provided")
stmt = _significant_states_stmt(
_schema_version(hass),
start_time,
end_time,
entity_ids,
@ -376,7 +314,6 @@ def get_full_significant_states_with_session(
def _state_changed_during_period_stmt(
schema_version: int,
start_time: datetime,
end_time: datetime | None,
entity_id: str,
@ -385,9 +322,8 @@ def _state_changed_during_period_stmt(
limit: int | None,
) -> StatementLambdaElement:
stmt, join_attributes = _lambda_stmt_and_join_attributes(
schema_version, no_attributes, include_last_changed=False
no_attributes, include_last_changed=False
)
if schema_version >= 31:
start_time_ts = start_time.timestamp()
stmt += lambda q: q.filter(
(
@ -396,36 +332,18 @@ def _state_changed_during_period_stmt(
)
& (States.last_updated_ts > start_time_ts)
)
else:
stmt += lambda q: q.filter(
(
(States.last_changed == States.last_updated)
| States.last_changed.is_(None)
)
& (States.last_updated > start_time)
)
if end_time:
if schema_version >= 31:
end_time_ts = end_time.timestamp()
stmt += lambda q: q.filter(States.last_updated_ts < end_time_ts)
else:
stmt += lambda q: q.filter(States.last_updated < end_time)
stmt += lambda q: q.filter(States.entity_id == entity_id)
if join_attributes:
stmt += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
if descending:
if schema_version >= 31:
stmt += lambda q: q.order_by(
States.entity_id, States.last_updated_ts.desc()
)
stmt += lambda q: q.order_by(States.entity_id, States.last_updated_ts.desc())
else:
stmt += lambda q: q.order_by(States.entity_id, States.last_updated.desc())
elif schema_version >= 31:
stmt += lambda q: q.order_by(States.entity_id, States.last_updated_ts)
else:
stmt += lambda q: q.order_by(States.entity_id, States.last_updated)
if limit:
stmt += lambda q: q.limit(limit)
@ -448,7 +366,6 @@ def state_changes_during_period(
entity_ids = [entity_id.lower()]
with session_scope(hass=hass, read_only=True) as session:
stmt = _state_changed_during_period_stmt(
_schema_version(hass),
start_time,
end_time,
entity_id,
@ -471,12 +388,11 @@ def state_changes_during_period(
def _get_last_state_changes_stmt(
schema_version: int, number_of_states: int, entity_id: str
number_of_states: int, entity_id: str
) -> StatementLambdaElement:
stmt, join_attributes = _lambda_stmt_and_join_attributes(
schema_version, False, include_last_changed=False
False, include_last_changed=False
)
if schema_version >= 31:
stmt += lambda q: q.where(
States.state_id
== (
@ -487,17 +403,6 @@ def _get_last_state_changes_stmt(
.subquery()
).c.state_id
)
else:
stmt += lambda q: q.where(
States.state_id
== (
select(States.state_id)
.filter(States.entity_id == entity_id)
.order_by(States.last_updated.desc())
.limit(number_of_states)
.subquery()
).c.state_id
)
if join_attributes:
stmt += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
@ -515,9 +420,7 @@ def get_last_state_changes(
entity_ids = [entity_id_lower]
with session_scope(hass=hass, read_only=True) as session:
stmt = _get_last_state_changes_stmt(
_schema_version(hass), number_of_states, entity_id_lower
)
stmt = _get_last_state_changes_stmt(number_of_states, entity_id_lower)
states = list(execute_stmt_lambda_element(session, stmt))
return cast(
dict[str, list[State]],
@ -533,7 +436,6 @@ def get_last_state_changes(
def _get_states_for_entities_stmt(
schema_version: int,
run_start: datetime,
utc_point_in_time: datetime,
entity_ids: list[str],
@ -541,11 +443,10 @@ def _get_states_for_entities_stmt(
) -> StatementLambdaElement:
"""Baked query to get states for specific entities."""
stmt, join_attributes = _lambda_stmt_and_join_attributes(
schema_version, no_attributes, include_last_changed=True
no_attributes, include_last_changed=True
)
# We got an include-list of entities, accelerate the query by filtering already
# in the inner query.
if schema_version >= 31:
run_start_ts = process_timestamp(run_start).timestamp()
utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
stmt += lambda q: q.join(
@ -565,34 +466,11 @@ def _get_states_for_entities_stmt(
)
),
and_(
States.entity_id
== most_recent_states_for_entities_by_date.c.max_entity_id,
States.entity_id == most_recent_states_for_entities_by_date.c.max_entity_id,
States.last_updated_ts
== most_recent_states_for_entities_by_date.c.max_last_updated,
),
)
else:
stmt += lambda q: q.join(
(
most_recent_states_for_entities_by_date := select(
States.entity_id.label("max_entity_id"),
func.max(States.last_updated).label("max_last_updated"),
)
.filter(
(States.last_updated >= run_start)
& (States.last_updated < utc_point_in_time)
)
.filter(States.entity_id.in_(entity_ids))
.group_by(States.entity_id)
.subquery()
),
and_(
States.entity_id
== most_recent_states_for_entities_by_date.c.max_entity_id,
States.last_updated
== most_recent_states_for_entities_by_date.c.max_last_updated,
),
)
if join_attributes:
stmt += lambda q: q.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
@ -609,12 +487,11 @@ def _get_rows_with_session(
no_attributes: bool = False,
) -> Iterable[Row]:
"""Return the states at a specific point in time."""
schema_version = _schema_version(hass)
if len(entity_ids) == 1:
return execute_stmt_lambda_element(
session,
_get_single_entity_states_stmt(
schema_version, utc_point_in_time, entity_ids[0], no_attributes
utc_point_in_time, entity_ids[0], no_attributes
),
)
@ -628,13 +505,12 @@ def _get_rows_with_session(
# We have more than one entity to look at so we need to do a query on states
# since the last recorder run started.
stmt = _get_states_for_entities_stmt(
schema_version, run.start, utc_point_in_time, entity_ids, no_attributes
run.start, utc_point_in_time, entity_ids, no_attributes
)
return execute_stmt_lambda_element(session, stmt)
def _get_single_entity_states_stmt(
schema_version: int,
utc_point_in_time: datetime,
entity_id: str,
no_attributes: bool = False,
@ -642,9 +518,8 @@ def _get_single_entity_states_stmt(
# Use an entirely different (and extremely fast) query if we only
# have a single entity id
stmt, join_attributes = _lambda_stmt_and_join_attributes(
schema_version, no_attributes, include_last_changed=True
no_attributes, include_last_changed=True
)
if schema_version >= 31:
utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
stmt += (
lambda q: q.filter(
@ -654,15 +529,6 @@ def _get_single_entity_states_stmt(
.order_by(States.last_updated_ts.desc())
.limit(1)
)
else:
stmt += (
lambda q: q.filter(
States.last_updated < utc_point_in_time,
States.entity_id == entity_id,
)
.order_by(States.last_updated.desc())
.limit(1)
)
if join_attributes:
stmt += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
@ -692,26 +558,15 @@ def _sorted_states_to_dict(
each list of states, otherwise our graphs won't start on the Y
axis correctly.
"""
schema_version = _schema_version(hass)
_process_timestamp: Callable[[datetime], float | str]
field_map = _FIELD_MAP if schema_version >= 31 else _FIELD_MAP_PRE_SCHEMA_31
state_class: Callable[
[Row, dict[str, dict[str, Any]], datetime | None], State | dict[str, Any]
]
if compressed_state_format:
if schema_version >= 31:
state_class = legacy_row_to_compressed_state
else:
state_class = legacy_row_to_compressed_state_pre_schema_31
_process_timestamp = process_datetime_to_timestamp
attr_time = COMPRESSED_STATE_LAST_UPDATED
attr_state = COMPRESSED_STATE_STATE
else:
if schema_version >= 31:
state_class = LegacyLazyState
else:
state_class = LegacyLazyStatePreSchema31
_process_timestamp = process_timestamp_to_utc_isoformat
attr_time = LAST_CHANGED_KEY
attr_state = STATE_KEY
@ -768,7 +623,7 @@ def _sorted_states_to_dict(
prev_state = first_state.state
ent_results.append(state_class(first_state, attr_cache, None))
state_idx = field_map["state"]
state_idx = _FIELD_MAP["state"]
#
# minimal_response only makes sense with last_updated == last_updated
@ -777,20 +632,7 @@ def _sorted_states_to_dict(
#
# With minimal response we do not care about attribute
# changes so we can filter out duplicate states
if schema_version < 31:
last_updated_idx = field_map["last_updated"]
for row in group:
if (state := row[state_idx]) != prev_state:
ent_results.append(
{
attr_state: state,
attr_time: _process_timestamp(row[last_updated_idx]),
}
)
prev_state = state
continue
last_updated_ts_idx = field_map["last_updated_ts"]
last_updated_ts_idx = _FIELD_MAP["last_updated_ts"]
if compressed_state_format:
for row in group:
if (state := row[state_idx]) != prev_state:

View file

@ -23,7 +23,6 @@ from .statistics import (
)
from .time import (
datetime_to_timestamp_or_none,
process_datetime_to_timestamp,
process_timestamp,
process_timestamp_to_utc_isoformat,
timestamp_to_datetime_or_none,
@ -47,7 +46,6 @@ __all__ = [
"datetime_to_timestamp_or_none",
"extract_event_type_ids",
"extract_metadata_ids",
"process_datetime_to_timestamp",
"process_timestamp",
"process_timestamp_to_utc_isoformat",
"row_to_compressed_state",

View file

@ -17,166 +17,7 @@ from homeassistant.core import Context, State
import homeassistant.util.dt as dt_util
from .state_attributes import decode_attributes_from_source
from .time import (
process_datetime_to_timestamp,
process_timestamp,
process_timestamp_to_utc_isoformat,
)
class LegacyLazyStatePreSchema31(State):
"""A lazy version of core State before schema 31."""
__slots__ = [
"_row",
"_attributes",
"_last_changed",
"_last_updated",
"_context",
"attr_cache",
]
def __init__( # pylint: disable=super-init-not-called
self,
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
) -> None:
"""Init the lazy state."""
self._row = row
self.entity_id: str = self._row.entity_id
self.state = self._row.state or ""
self._attributes: dict[str, Any] | None = None
self._last_changed: datetime | None = start_time
self._last_reported: datetime | None = start_time
self._last_updated: datetime | None = start_time
self._context: Context | None = None
self.attr_cache = attr_cache
@property # type: ignore[override]
def attributes(self) -> dict[str, Any]:
"""State attributes."""
if self._attributes is None:
self._attributes = decode_attributes_from_row_legacy(
self._row, self.attr_cache
)
return self._attributes
@attributes.setter
def attributes(self, value: dict[str, Any]) -> None:
"""Set attributes."""
self._attributes = value
@property
def context(self) -> Context:
"""State context."""
if self._context is None:
self._context = Context(id=None)
return self._context
@context.setter
def context(self, value: Context) -> None:
"""Set context."""
self._context = value
@property
def last_changed(self) -> datetime:
"""Last changed datetime."""
if self._last_changed is None:
if (last_changed := self._row.last_changed) is not None:
self._last_changed = process_timestamp(last_changed)
else:
self._last_changed = self.last_updated
return self._last_changed
@last_changed.setter
def last_changed(self, value: datetime) -> None:
"""Set last changed datetime."""
self._last_changed = value
@property
def last_reported(self) -> datetime:
"""Last reported datetime."""
if self._last_reported is None:
self._last_reported = self.last_updated
return self._last_reported
@last_reported.setter
def last_reported(self, value: datetime) -> None:
"""Set last reported datetime."""
self._last_reported = value
@property
def last_updated(self) -> datetime:
"""Last updated datetime."""
if self._last_updated is None:
self._last_updated = process_timestamp(self._row.last_updated)
return self._last_updated
@last_updated.setter
def last_updated(self, value: datetime) -> None:
"""Set last updated datetime."""
self._last_updated = value
def as_dict(self) -> dict[str, Any]: # type: ignore[override]
"""Return a dict representation of the LazyState.
Async friendly.
To be used for JSON serialization.
"""
if self._last_changed is None and self._last_updated is None:
last_updated_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_updated
)
if (
self._row.last_changed is None
or self._row.last_changed == self._row.last_updated
):
last_changed_isoformat = last_updated_isoformat
else:
last_changed_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_changed
)
else:
last_updated_isoformat = self.last_updated.isoformat()
if self.last_changed == self.last_updated:
last_changed_isoformat = last_updated_isoformat
else:
last_changed_isoformat = self.last_changed.isoformat()
return {
"entity_id": self.entity_id,
"state": self.state,
"attributes": self._attributes or self.attributes,
"last_changed": last_changed_isoformat,
"last_updated": last_updated_isoformat,
}
def legacy_row_to_compressed_state_pre_schema_31(
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
) -> dict[str, Any]:
"""Convert a database row to a compressed state before schema 31."""
comp_state = {
COMPRESSED_STATE_STATE: row.state,
COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row_legacy(row, attr_cache),
}
if start_time:
comp_state[COMPRESSED_STATE_LAST_UPDATED] = start_time.timestamp()
else:
row_last_updated: datetime = row.last_updated
comp_state[COMPRESSED_STATE_LAST_UPDATED] = process_datetime_to_timestamp(
row_last_updated
)
if (
row_changed_changed := row.last_changed
) and row_last_updated != row_changed_changed:
comp_state[COMPRESSED_STATE_LAST_CHANGED] = process_datetime_to_timestamp(
row_changed_changed
)
return comp_state
from .time import process_timestamp
class LegacyLazyState(State):

View file

@ -52,17 +52,6 @@ def process_timestamp_to_utc_isoformat(ts: datetime | None) -> str | None:
return ts.astimezone(dt_util.UTC).isoformat()
def process_datetime_to_timestamp(ts: datetime) -> float:
"""Process a datebase datetime to epoch.
Mirrors the behavior of process_timestamp_to_utc_isoformat
except it returns the epoch time.
"""
if ts.tzinfo is None or ts.tzinfo == dt_util.UTC:
return dt_util.utc_to_timestamp(ts)
return ts.timestamp()
def datetime_to_timestamp_or_none(dt: datetime | None) -> float | None:
"""Convert a datetime to a timestamp."""
return None if dt is None else dt.timestamp()

File diff suppressed because it is too large Load diff

View file

@ -1,713 +0,0 @@
"""The tests the History component."""
from __future__ import annotations
from copy import copy
from datetime import datetime, timedelta
import json
from unittest.mock import patch, sentinel
from freezegun import freeze_time
import pytest
from homeassistant.components import recorder
from homeassistant.components.recorder import Recorder, history
from homeassistant.components.recorder.filters import Filters
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.components.recorder.util import session_scope
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers.json import JSONEncoder
import homeassistant.util.dt as dt_util
from .common import (
assert_dict_of_states_equal_without_context_and_last_changed,
assert_multiple_states_equal_without_context,
assert_multiple_states_equal_without_context_and_last_changed,
assert_states_equal_without_context,
async_wait_recording_done,
old_db_schema,
)
from tests.typing import RecorderInstanceGenerator
@pytest.fixture
async def mock_recorder_before_hass(
async_test_recorder: RecorderInstanceGenerator,
) -> None:
"""Set up recorder."""
@pytest.fixture(autouse=True)
def db_schema_30():
"""Fixture to initialize the db with the old schema 30."""
with old_db_schema("30"):
yield
@pytest.fixture(autouse=True)
def setup_recorder(db_schema_30, recorder_mock: Recorder) -> recorder.Recorder:
"""Set up recorder."""
async def test_get_full_significant_states_with_session_entity_no_matches(
hass: HomeAssistant,
) -> None:
"""Test getting states at a specific point in time for entities that never have been recorded."""
now = dt_util.utcnow()
time_before_recorder_ran = now - timedelta(days=1000)
instance = recorder.get_instance(hass)
with (
session_scope(hass=hass) as session,
patch.object(instance.states_meta_manager, "active", False),
):
assert (
history.get_full_significant_states_with_session(
hass, session, time_before_recorder_ran, now, entity_ids=["demo.id"]
)
== {}
)
assert (
history.get_full_significant_states_with_session(
hass,
session,
time_before_recorder_ran,
now,
entity_ids=["demo.id", "demo.id2"],
)
== {}
)
async def test_significant_states_with_session_entity_minimal_response_no_matches(
hass: HomeAssistant,
) -> None:
"""Test getting states at a specific point in time for entities that never have been recorded."""
now = dt_util.utcnow()
time_before_recorder_ran = now - timedelta(days=1000)
instance = recorder.get_instance(hass)
with (
session_scope(hass=hass) as session,
patch.object(instance.states_meta_manager, "active", False),
):
assert (
history.get_significant_states_with_session(
hass,
session,
time_before_recorder_ran,
now,
entity_ids=["demo.id"],
minimal_response=True,
)
== {}
)
assert (
history.get_significant_states_with_session(
hass,
session,
time_before_recorder_ran,
now,
entity_ids=["demo.id", "demo.id2"],
minimal_response=True,
)
== {}
)
@pytest.mark.parametrize(
("attributes", "no_attributes", "limit"),
[
({"attr": True}, False, 5000),
({}, True, 5000),
({"attr": True}, False, 3),
({}, True, 3),
],
)
async def test_state_changes_during_period(
hass: HomeAssistant, attributes, no_attributes, limit
) -> None:
"""Test state change during period."""
entity_id = "media_player.test"
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
def set_state(state):
"""Set the state."""
hass.states.async_set(entity_id, state, attributes)
return hass.states.get(entity_id)
start = dt_util.utcnow()
point = start + timedelta(seconds=1)
end = point + timedelta(seconds=1)
with freeze_time(start) as freezer:
set_state("idle")
set_state("YouTube")
freezer.move_to(point)
states = [
set_state("idle"),
set_state("Netflix"),
set_state("Plex"),
set_state("YouTube"),
]
freezer.move_to(end)
set_state("Netflix")
set_state("Plex")
await async_wait_recording_done(hass)
hist = history.state_changes_during_period(
hass, start, end, entity_id, no_attributes, limit=limit
)
assert_multiple_states_equal_without_context(states[:limit], hist[entity_id])
async def test_state_changes_during_period_descending(
hass: HomeAssistant,
) -> None:
"""Test state change during period descending."""
entity_id = "media_player.test"
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
def set_state(state):
"""Set the state."""
hass.states.async_set(entity_id, state, {"any": 1})
return hass.states.get(entity_id)
start = dt_util.utcnow()
point = start + timedelta(seconds=1)
point2 = start + timedelta(seconds=1, microseconds=2)
point3 = start + timedelta(seconds=1, microseconds=3)
point4 = start + timedelta(seconds=1, microseconds=4)
end = point + timedelta(seconds=1)
with freeze_time(start) as freezer:
set_state("idle")
set_state("YouTube")
freezer.move_to(point)
states = [set_state("idle")]
freezer.move_to(point2)
states.append(set_state("Netflix"))
freezer.move_to(point3)
states.append(set_state("Plex"))
freezer.move_to(point4)
states.append(set_state("YouTube"))
freezer.move_to(end)
set_state("Netflix")
set_state("Plex")
await async_wait_recording_done(hass)
hist = history.state_changes_during_period(
hass, start, end, entity_id, no_attributes=False, descending=False
)
assert_multiple_states_equal_without_context(states, hist[entity_id])
hist = history.state_changes_during_period(
hass, start, end, entity_id, no_attributes=False, descending=True
)
assert_multiple_states_equal_without_context(
states, list(reversed(list(hist[entity_id])))
)
async def test_get_last_state_changes(hass: HomeAssistant) -> None:
"""Test number of state changes."""
entity_id = "sensor.test"
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
def set_state(state):
"""Set the state."""
hass.states.async_set(entity_id, state)
return hass.states.get(entity_id)
start = dt_util.utcnow() - timedelta(minutes=2)
point = start + timedelta(minutes=1)
point2 = point + timedelta(minutes=1, seconds=1)
states = []
with freeze_time(start) as freezer:
set_state("1")
freezer.move_to(point)
states.append(set_state("2"))
freezer.move_to(point2)
states.append(set_state("3"))
await async_wait_recording_done(hass)
hist = history.get_last_state_changes(hass, 2, entity_id)
assert_multiple_states_equal_without_context(states, hist[entity_id])
async def test_ensure_state_can_be_copied(
hass: HomeAssistant,
) -> None:
"""Ensure a state can pass though copy().
The filter integration uses copy() on states
from history.
"""
entity_id = "sensor.test"
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
def set_state(state):
"""Set the state."""
hass.states.async_set(entity_id, state)
return hass.states.get(entity_id)
start = dt_util.utcnow() - timedelta(minutes=2)
point = start + timedelta(minutes=1)
with freeze_time(start) as freezer:
set_state("1")
freezer.move_to(point)
set_state("2")
await async_wait_recording_done(hass)
hist = history.get_last_state_changes(hass, 2, entity_id)
assert_states_equal_without_context(
copy(hist[entity_id][0]), hist[entity_id][0]
)
assert_states_equal_without_context(
copy(hist[entity_id][1]), hist[entity_id][1]
)
async def test_get_significant_states(hass: HomeAssistant) -> None:
"""Test that only significant states are returned.
We should get back every thermostat change that
includes an attribute change, but only the state updates for
media player (attribute changes are not significant and not returned).
"""
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
zero, four, states = record_states(hass)
await async_wait_recording_done(hass)
hist = history.get_significant_states(hass, zero, four, entity_ids=list(states))
assert_dict_of_states_equal_without_context_and_last_changed(states, hist)
async def test_get_significant_states_minimal_response(hass: HomeAssistant) -> None:
"""Test that only significant states are returned.
When minimal responses is set only the first and
last states return a complete state.
We should get back every thermostat change that
includes an attribute change, but only the state updates for
media player (attribute changes are not significant and not returned).
"""
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
zero, four, states = record_states(hass)
await async_wait_recording_done(hass)
hist = history.get_significant_states(
hass, zero, four, minimal_response=True, entity_ids=list(states)
)
entites_with_reducable_states = [
"media_player.test",
"media_player.test3",
]
# All states for media_player.test state are reduced
# down to last_changed and state when minimal_response
# is set except for the first state.
# is set. We use JSONEncoder to make sure that are
# pre-encoded last_changed is always the same as what
# will happen with encoding a native state
for entity_id in entites_with_reducable_states:
entity_states = states[entity_id]
for state_idx in range(1, len(entity_states)):
input_state = entity_states[state_idx]
orig_last_changed = json.dumps(
process_timestamp(input_state.last_changed),
cls=JSONEncoder,
).replace('"', "")
orig_state = input_state.state
entity_states[state_idx] = {
"last_changed": orig_last_changed,
"state": orig_state,
}
assert len(hist) == len(states)
assert_states_equal_without_context(
states["media_player.test"][0], hist["media_player.test"][0]
)
assert states["media_player.test"][1] == hist["media_player.test"][1]
assert states["media_player.test"][2] == hist["media_player.test"][2]
assert_multiple_states_equal_without_context(
states["media_player.test2"], hist["media_player.test2"]
)
assert_states_equal_without_context(
states["media_player.test3"][0], hist["media_player.test3"][0]
)
assert states["media_player.test3"][1] == hist["media_player.test3"][1]
assert_multiple_states_equal_without_context(
states["script.can_cancel_this_one"], hist["script.can_cancel_this_one"]
)
assert_multiple_states_equal_without_context_and_last_changed(
states["thermostat.test"], hist["thermostat.test"]
)
assert_multiple_states_equal_without_context_and_last_changed(
states["thermostat.test2"], hist["thermostat.test2"]
)
async def test_get_significant_states_with_initial(hass: HomeAssistant) -> None:
"""Test that only significant states are returned.
We should get back every thermostat change that
includes an attribute change, but only the state updates for
media player (attribute changes are not significant and not returned).
"""
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
zero, four, states = record_states(hass)
await async_wait_recording_done(hass)
one = zero + timedelta(seconds=1)
one_with_microsecond = zero + timedelta(seconds=1, microseconds=1)
one_and_half = zero + timedelta(seconds=1.5)
for entity_id in states:
if entity_id == "media_player.test":
states[entity_id] = states[entity_id][1:]
for state in states[entity_id]:
if state.last_changed in (one, one_with_microsecond):
state.last_changed = one_and_half
state.last_updated = one_and_half
hist = history.get_significant_states(
hass,
one_and_half,
four,
include_start_time_state=True,
entity_ids=list(states),
)
assert_dict_of_states_equal_without_context_and_last_changed(states, hist)
async def test_get_significant_states_without_initial(hass: HomeAssistant) -> None:
"""Test that only significant states are returned.
We should get back every thermostat change that
includes an attribute change, but only the state updates for
media player (attribute changes are not significant and not returned).
"""
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
zero, four, states = record_states(hass)
await async_wait_recording_done(hass)
one = zero + timedelta(seconds=1)
one_with_microsecond = zero + timedelta(seconds=1, microseconds=1)
one_and_half = zero + timedelta(seconds=1.5)
for entity_id in states:
states[entity_id] = [
s
for s in states[entity_id]
if s.last_changed not in (one, one_with_microsecond)
]
del states["media_player.test2"]
hist = history.get_significant_states(
hass,
one_and_half,
four,
include_start_time_state=False,
entity_ids=list(states),
)
assert_dict_of_states_equal_without_context_and_last_changed(states, hist)
async def test_get_significant_states_entity_id(hass: HomeAssistant) -> None:
"""Test that only significant states are returned for one entity."""
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
zero, four, states = record_states(hass)
await async_wait_recording_done(hass)
del states["media_player.test2"]
del states["media_player.test3"]
del states["thermostat.test"]
del states["thermostat.test2"]
del states["script.can_cancel_this_one"]
hist = history.get_significant_states(hass, zero, four, ["media_player.test"])
assert_dict_of_states_equal_without_context_and_last_changed(states, hist)
async def test_get_significant_states_multiple_entity_ids(hass: HomeAssistant) -> None:
"""Test that only significant states are returned for one entity."""
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
zero, four, states = record_states(hass)
await async_wait_recording_done(hass)
del states["media_player.test2"]
del states["media_player.test3"]
del states["thermostat.test2"]
del states["script.can_cancel_this_one"]
hist = history.get_significant_states(
hass,
zero,
four,
["media_player.test", "thermostat.test"],
)
assert_multiple_states_equal_without_context_and_last_changed(
states["media_player.test"], hist["media_player.test"]
)
assert_multiple_states_equal_without_context_and_last_changed(
states["thermostat.test"], hist["thermostat.test"]
)
async def test_get_significant_states_are_ordered(hass: HomeAssistant) -> None:
"""Test order of results from get_significant_states.
When entity ids are given, the results should be returned with the data
in the same order.
"""
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
zero, four, _states = record_states(hass)
await async_wait_recording_done(hass)
entity_ids = ["media_player.test", "media_player.test2"]
hist = history.get_significant_states(hass, zero, four, entity_ids)
assert list(hist.keys()) == entity_ids
entity_ids = ["media_player.test2", "media_player.test"]
hist = history.get_significant_states(hass, zero, four, entity_ids)
assert list(hist.keys()) == entity_ids
async def test_get_significant_states_only(hass: HomeAssistant) -> None:
"""Test significant states when significant_states_only is set."""
entity_id = "sensor.test"
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
def set_state(state, **kwargs):
"""Set the state."""
hass.states.async_set(entity_id, state, **kwargs)
return hass.states.get(entity_id)
start = dt_util.utcnow() - timedelta(minutes=4)
points = [start + timedelta(minutes=i) for i in range(1, 4)]
states = []
with freeze_time(start) as freezer:
set_state("123", attributes={"attribute": 10.64})
freezer.move_to(points[0])
# Attributes are different, state not
states.append(set_state("123", attributes={"attribute": 21.42}))
freezer.move_to(points[1])
# state is different, attributes not
states.append(set_state("32", attributes={"attribute": 21.42}))
freezer.move_to(points[2])
# everything is different
states.append(set_state("412", attributes={"attribute": 54.23}))
await async_wait_recording_done(hass)
hist = history.get_significant_states(
hass,
start,
significant_changes_only=True,
entity_ids=list({state.entity_id for state in states}),
)
assert len(hist[entity_id]) == 2
assert not any(
state.last_updated == states[0].last_updated for state in hist[entity_id]
)
assert any(
state.last_updated == states[1].last_updated for state in hist[entity_id]
)
assert any(
state.last_updated == states[2].last_updated for state in hist[entity_id]
)
hist = history.get_significant_states(
hass,
start,
significant_changes_only=False,
entity_ids=list({state.entity_id for state in states}),
)
assert len(hist[entity_id]) == 3
assert_multiple_states_equal_without_context_and_last_changed(
states, hist[entity_id]
)
def record_states(
hass: HomeAssistant,
) -> tuple[datetime, datetime, dict[str, list[State]]]:
"""Record some test states.
We inject a bunch of state updates from media player, zone and
thermostat.
"""
mp = "media_player.test"
mp2 = "media_player.test2"
mp3 = "media_player.test3"
therm = "thermostat.test"
therm2 = "thermostat.test2"
zone = "zone.home"
script_c = "script.can_cancel_this_one"
def set_state(entity_id, state, **kwargs):
"""Set the state."""
hass.states.async_set(entity_id, state, **kwargs)
return hass.states.get(entity_id)
zero = dt_util.utcnow()
one = zero + timedelta(seconds=1)
two = one + timedelta(seconds=1)
three = two + timedelta(seconds=1)
four = three + timedelta(seconds=1)
states = {therm: [], therm2: [], mp: [], mp2: [], mp3: [], script_c: []}
with freeze_time(one) as freezer:
states[mp].append(
set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)})
)
states[mp2].append(
set_state(mp2, "YouTube", attributes={"media_title": str(sentinel.mt2)})
)
states[mp3].append(
set_state(mp3, "idle", attributes={"media_title": str(sentinel.mt1)})
)
states[therm].append(
set_state(therm, 20, attributes={"current_temperature": 19.5})
)
freezer.move_to(one + timedelta(microseconds=1))
states[mp].append(
set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt2)})
)
freezer.move_to(two)
# This state will be skipped only different in time
set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt3)})
# This state will be skipped because domain is excluded
set_state(zone, "zoning")
states[script_c].append(
set_state(script_c, "off", attributes={"can_cancel": True})
)
states[therm].append(
set_state(therm, 21, attributes={"current_temperature": 19.8})
)
states[therm2].append(
set_state(therm2, 20, attributes={"current_temperature": 19})
)
freezer.move_to(three)
states[mp].append(
set_state(mp, "Netflix", attributes={"media_title": str(sentinel.mt4)})
)
states[mp3].append(
set_state(mp3, "Netflix", attributes={"media_title": str(sentinel.mt3)})
)
# Attributes changed even though state is the same
states[therm].append(
set_state(therm, 21, attributes={"current_temperature": 20})
)
return zero, four, states
async def test_state_changes_during_period_multiple_entities_single_test(
hass: HomeAssistant,
) -> None:
"""Test state change during period with multiple entities in the same test.
This test ensures the sqlalchemy query cache does not
generate incorrect results.
"""
instance = recorder.get_instance(hass)
with patch.object(instance.states_meta_manager, "active", False):
start = dt_util.utcnow()
test_entites = {f"sensor.{i}": str(i) for i in range(30)}
for entity_id, value in test_entites.items():
hass.states.async_set(entity_id, value)
await async_wait_recording_done(hass)
end = dt_util.utcnow()
for entity_id, value in test_entites.items():
hist = history.state_changes_during_period(hass, start, end, entity_id)
assert len(hist) == 1
assert hist[entity_id][0].state == value
def test_get_significant_states_without_entity_ids_raises(hass: HomeAssistant) -> None:
"""Test at least one entity id is required for get_significant_states."""
now = dt_util.utcnow()
with pytest.raises(ValueError, match="entity_ids must be provided"):
history.get_significant_states(hass, now, None)
def test_state_changes_during_period_without_entity_ids_raises(
hass: HomeAssistant,
) -> None:
"""Test at least one entity id is required for state_changes_during_period."""
now = dt_util.utcnow()
with pytest.raises(ValueError, match="entity_id must be provided"):
history.state_changes_during_period(hass, now, None)
def test_get_significant_states_with_filters_raises(hass: HomeAssistant) -> None:
"""Test passing filters is no longer supported."""
now = dt_util.utcnow()
with pytest.raises(NotImplementedError, match="Filters are no longer supported"):
history.get_significant_states(
hass, now, None, ["media_player.test"], Filters()
)
def test_get_significant_states_with_non_existent_entity_ids_returns_empty(
hass: HomeAssistant,
) -> None:
"""Test get_significant_states returns an empty dict when entities not in the db."""
now = dt_util.utcnow()
assert history.get_significant_states(hass, now, None, ["nonexistent.entity"]) == {}
def test_state_changes_during_period_with_non_existent_entity_ids_returns_empty(
hass: HomeAssistant,
) -> None:
"""Test state_changes_during_period returns an empty dict when entities not in the db."""
now = dt_util.utcnow()
assert (
history.state_changes_during_period(hass, now, None, "nonexistent.entity") == {}
)
def test_get_last_state_changes_with_non_existent_entity_ids_returns_empty(
hass: HomeAssistant,
) -> None:
"""Test get_last_state_changes returns an empty dict when entities not in the db."""
assert history.get_last_state_changes(hass, 1, "nonexistent.entity") == {}

View file

@ -3,7 +3,6 @@
from datetime import datetime, timedelta
from unittest.mock import PropertyMock
from freezegun import freeze_time
import pytest
from homeassistant.components.recorder.const import SupportedDialect
@ -15,13 +14,11 @@ from homeassistant.components.recorder.db_schema import (
)
from homeassistant.components.recorder.models import (
LazyState,
process_datetime_to_timestamp,
process_timestamp,
process_timestamp_to_utc_isoformat,
)
from homeassistant.const import EVENT_STATE_CHANGED
import homeassistant.core as ha
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import InvalidEntityFormatError
from homeassistant.util import dt as dt_util
@ -354,75 +351,3 @@ async def test_lazy_state_handles_same_last_updated_and_last_changed(
"last_updated": "2021-06-12T03:04:01.000323+00:00",
"state": "off",
}
@pytest.mark.parametrize(
"time_zone", ["Europe/Berlin", "America/Chicago", "US/Hawaii", "UTC"]
)
async def test_process_datetime_to_timestamp(time_zone, hass: HomeAssistant) -> None:
"""Test we can handle processing database datatimes to timestamps."""
await hass.config.async_set_time_zone(time_zone)
utc_now = dt_util.utcnow()
assert process_datetime_to_timestamp(utc_now) == utc_now.timestamp()
now = dt_util.now()
assert process_datetime_to_timestamp(now) == now.timestamp()
@pytest.mark.parametrize(
"time_zone", ["Europe/Berlin", "America/Chicago", "US/Hawaii", "UTC"]
)
async def test_process_datetime_to_timestamp_freeze_time(
time_zone, hass: HomeAssistant
) -> None:
"""Test we can handle processing database datatimes to timestamps.
This test freezes time to make sure everything matches.
"""
await hass.config.async_set_time_zone(time_zone)
utc_now = dt_util.utcnow()
with freeze_time(utc_now):
epoch = utc_now.timestamp()
assert process_datetime_to_timestamp(dt_util.utcnow()) == epoch
now = dt_util.now()
assert process_datetime_to_timestamp(now) == epoch
@pytest.mark.parametrize(
"time_zone", ["Europe/Berlin", "America/Chicago", "US/Hawaii", "UTC"]
)
async def test_process_datetime_to_timestamp_mirrors_utc_isoformat_behavior(
time_zone, hass: HomeAssistant
) -> None:
"""Test process_datetime_to_timestamp mirrors process_timestamp_to_utc_isoformat."""
await hass.config.async_set_time_zone(time_zone)
datetime_with_tzinfo = datetime(2016, 7, 9, 11, 0, 0, tzinfo=dt_util.UTC)
datetime_without_tzinfo = datetime(2016, 7, 9, 11, 0, 0)
est = dt_util.get_time_zone("US/Eastern")
datetime_est_timezone = datetime(2016, 7, 9, 11, 0, 0, tzinfo=est)
est = dt_util.get_time_zone("US/Eastern")
datetime_est_timezone = datetime(2016, 7, 9, 11, 0, 0, tzinfo=est)
nst = dt_util.get_time_zone("Canada/Newfoundland")
datetime_nst_timezone = datetime(2016, 7, 9, 11, 0, 0, tzinfo=nst)
hst = dt_util.get_time_zone("US/Hawaii")
datetime_hst_timezone = datetime(2016, 7, 9, 11, 0, 0, tzinfo=hst)
assert (
process_datetime_to_timestamp(datetime_with_tzinfo)
== dt_util.parse_datetime("2016-07-09T11:00:00+00:00").timestamp()
)
assert (
process_datetime_to_timestamp(datetime_without_tzinfo)
== dt_util.parse_datetime("2016-07-09T11:00:00+00:00").timestamp()
)
assert (
process_datetime_to_timestamp(datetime_est_timezone)
== dt_util.parse_datetime("2016-07-09T15:00:00+00:00").timestamp()
)
assert (
process_datetime_to_timestamp(datetime_nst_timezone)
== dt_util.parse_datetime("2016-07-09T13:30:00+00:00").timestamp()
)
assert (
process_datetime_to_timestamp(datetime_hst_timezone)
== dt_util.parse_datetime("2016-07-09T21:00:00+00:00").timestamp()
)