Improve performance of fetching and storing history and events with the database (#84870)

This commit is contained in:
J. Nick Koston 2023-01-02 13:26:08 -10:00 committed by GitHub
parent 0ad16e25ef
commit b8a1537b58
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 3696 additions and 281 deletions

View file

@ -2,7 +2,6 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime as dt
import json
from typing import Any, cast
@ -10,6 +9,7 @@ from sqlalchemy.engine.row import Row
from homeassistant.const import ATTR_ICON, EVENT_STATE_CHANGED
from homeassistant.core import Context, Event, State, callback
import homeassistant.util.dt as dt_util
class LazyEventPartialState:
@ -66,7 +66,7 @@ class EventAsRow:
data: dict[str, Any]
context: Context
context_id: str
time_fired: dt
time_fired_ts: float
state_id: int
event_data: str | None = None
old_format_icon: None = None
@ -92,7 +92,7 @@ def async_event_to_row(event: Event) -> EventAsRow | None:
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
time_fired=event.time_fired,
time_fired_ts=dt_util.utc_to_timestamp(event.time_fired),
state_id=hash(event),
)
# States are prefiltered so we never get states
@ -107,7 +107,7 @@ def async_event_to_row(event: Event) -> EventAsRow | None:
context_id=new_state.context.id,
context_user_id=new_state.context.user_id,
context_parent_id=new_state.context.parent_id,
time_fired=new_state.last_updated,
time_fired_ts=dt_util.utc_to_timestamp(new_state.last_updated),
state_id=hash(event),
icon=new_state.attributes.get(ATTR_ICON),
)

View file

@ -388,12 +388,14 @@ def _rows_match(row: Row | EventAsRow, other_row: Row | EventAsRow) -> bool:
def _row_time_fired_isoformat(row: Row | EventAsRow) -> str:
"""Convert the row timed_fired to isoformat."""
return process_timestamp_to_utc_isoformat(row.time_fired or dt_util.utcnow())
return process_timestamp_to_utc_isoformat(
dt_util.utc_from_timestamp(row.time_fired_ts) or dt_util.utcnow()
)
def _row_time_fired_timestamp(row: Row | EventAsRow) -> float:
"""Convert the row timed_fired to timestamp."""
return process_datetime_to_timestamp(row.time_fired or dt_util.utcnow())
return row.time_fired_ts or process_datetime_to_timestamp(dt_util.utcnow())
class EntityNameCache:

View file

@ -7,6 +7,7 @@ from sqlalchemy.sql.lambdas import StatementLambdaElement
from homeassistant.components.recorder.filters import Filters
from homeassistant.helpers.json import json_dumps
from homeassistant.util import dt as dt_util
from .all import all_stmt
from .devices import devices_stmt
@ -15,8 +16,8 @@ from .entities_and_devices import entities_devices_stmt
def statement_for_request(
start_day: dt,
end_day: dt,
start_day_dt: dt,
end_day_dt: dt,
event_types: tuple[str, ...],
entity_ids: list[str] | None = None,
device_ids: list[str] | None = None,
@ -24,7 +25,8 @@ def statement_for_request(
context_id: str | None = None,
) -> StatementLambdaElement:
"""Generate the logbook statement for a logbook request."""
start_day = dt_util.utc_to_timestamp(start_day_dt)
end_day = dt_util.utc_to_timestamp(end_day_dt)
# No entities: logbook sends everything for the timeframe
# limited by the context_id and the yaml configured filter
if not entity_ids and not device_ids:

View file

@ -1,15 +1,13 @@
"""All queries for logbook."""
from __future__ import annotations
from datetime import datetime as dt
from sqlalchemy import lambda_stmt
from sqlalchemy.orm import Query
from sqlalchemy.sql.elements import ClauseList
from sqlalchemy.sql.lambdas import StatementLambdaElement
from homeassistant.components.recorder.db_schema import (
LAST_UPDATED_INDEX,
LAST_UPDATED_INDEX_TS,
Events,
States,
)
@ -23,8 +21,8 @@ from .common import (
def all_stmt(
start_day: dt,
end_day: dt,
start_day: float,
end_day: float,
event_types: tuple[str, ...],
states_entity_filter: ClauseList | None = None,
events_entity_filter: ClauseList | None = None,
@ -53,22 +51,24 @@ def all_stmt(
else:
stmt += lambda s: s.union_all(_states_query_for_all(start_day, end_day))
stmt += lambda s: s.order_by(Events.time_fired)
stmt += lambda s: s.order_by(Events.time_fired_ts)
return stmt
def _states_query_for_all(start_day: dt, end_day: dt) -> Query:
def _states_query_for_all(start_day: float, end_day: float) -> Query:
return apply_states_filters(_apply_all_hints(select_states()), start_day, end_day)
def _apply_all_hints(query: Query) -> Query:
"""Force mysql to use the right index on large selects."""
return query.with_hint(
States, f"FORCE INDEX ({LAST_UPDATED_INDEX})", dialect_name="mysql"
States, f"FORCE INDEX ({LAST_UPDATED_INDEX_TS})", dialect_name="mysql"
)
def _states_query_for_context_id(start_day: dt, end_day: dt, context_id: str) -> Query:
def _states_query_for_context_id(
start_day: float, end_day: float, context_id: str
) -> Query:
return apply_states_filters(select_states(), start_day, end_day).where(
States.context_id == context_id
)

View file

@ -1,8 +1,6 @@
"""Queries for logbook."""
from __future__ import annotations
from datetime import datetime as dt
import sqlalchemy
from sqlalchemy import select
from sqlalchemy.orm import Query
@ -47,7 +45,7 @@ EVENT_COLUMNS = (
Events.event_id.label("event_id"),
Events.event_type.label("event_type"),
Events.event_data.label("event_data"),
Events.time_fired.label("time_fired"),
Events.time_fired_ts.label("time_fired_ts"),
Events.context_id.label("context_id"),
Events.context_user_id.label("context_user_id"),
Events.context_parent_id.label("context_parent_id"),
@ -79,7 +77,7 @@ EVENT_COLUMNS_FOR_STATE_SELECT = [
"event_type"
),
literal(value=None, type_=sqlalchemy.Text).label("event_data"),
States.last_updated.label("time_fired"),
States.last_updated_ts.label("time_fired_ts"),
States.context_id.label("context_id"),
States.context_user_id.label("context_user_id"),
States.context_parent_id.label("context_parent_id"),
@ -108,14 +106,14 @@ NOT_CONTEXT_ONLY = literal(None).label("context_only")
def select_events_context_id_subquery(
start_day: dt,
end_day: dt,
start_day: float,
end_day: float,
event_types: tuple[str, ...],
) -> Select:
"""Generate the select for a context_id subquery."""
return (
select(Events.context_id)
.where((Events.time_fired > start_day) & (Events.time_fired < end_day))
.where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day))
.where(Events.event_type.in_(event_types))
.outerjoin(EventData, (Events.data_id == EventData.data_id))
)
@ -142,12 +140,12 @@ def select_states_context_only() -> Select:
def select_events_without_states(
start_day: dt, end_day: dt, event_types: tuple[str, ...]
start_day: float, end_day: float, event_types: tuple[str, ...]
) -> Select:
"""Generate an events select that does not join states."""
return (
select(*EVENT_ROWS_NO_STATES, NOT_CONTEXT_ONLY)
.where((Events.time_fired > start_day) & (Events.time_fired < end_day))
.where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day))
.where(Events.event_type.in_(event_types))
.outerjoin(EventData, (Events.data_id == EventData.data_id))
)
@ -163,7 +161,7 @@ def select_states() -> Select:
def legacy_select_events_context_id(
start_day: dt, end_day: dt, context_id: str
start_day: float, end_day: float, context_id: str
) -> Select:
"""Generate a legacy events context id select that also joins states."""
# This can be removed once we no longer have event_ids in the states table
@ -176,33 +174,35 @@ def legacy_select_events_context_id(
)
.outerjoin(States, (Events.event_id == States.event_id))
.where(
(States.last_updated == States.last_changed) | States.last_changed.is_(None)
(States.last_updated_ts == States.last_changed_ts)
| States.last_changed_ts.is_(None)
)
.where(_not_continuous_entity_matcher())
.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
.where((Events.time_fired > start_day) & (Events.time_fired < end_day))
.where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day))
.where(Events.context_id == context_id)
)
def apply_states_filters(query: Query, start_day: dt, end_day: dt) -> Query:
def apply_states_filters(query: Query, start_day: float, end_day: float) -> Query:
"""Filter states by time range.
Filters states that do not have an old state or new state (added / removed)
Filters states that are in a continuous domain with a UOM.
Filters states that do not have matching last_updated and last_changed.
Filters states that do not have matching last_updated_ts and last_changed_ts.
"""
return (
query.filter(
(States.last_updated > start_day) & (States.last_updated < end_day)
(States.last_updated_ts > start_day) & (States.last_updated_ts < end_day)
)
.outerjoin(OLD_STATE, (States.old_state_id == OLD_STATE.state_id))
.where(_missing_state_matcher())
.where(_not_continuous_entity_matcher())
.where(
(States.last_updated == States.last_changed) | States.last_changed.is_(None)
(States.last_updated_ts == States.last_changed_ts)
| States.last_changed_ts.is_(None)
)
.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)

View file

@ -2,7 +2,6 @@
from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime as dt
import sqlalchemy
from sqlalchemy import lambda_stmt, select
@ -29,8 +28,8 @@ from .common import (
def _select_device_id_context_ids_sub_query(
start_day: dt,
end_day: dt,
start_day: float,
end_day: float,
event_types: tuple[str, ...],
json_quotable_device_ids: list[str],
) -> CompoundSelect:
@ -43,8 +42,8 @@ def _select_device_id_context_ids_sub_query(
def _apply_devices_context_union(
query: Query,
start_day: dt,
end_day: dt,
start_day: float,
end_day: float,
event_types: tuple[str, ...],
json_quotable_device_ids: list[str],
) -> CompoundSelect:
@ -70,8 +69,8 @@ def _apply_devices_context_union(
def devices_stmt(
start_day: dt,
end_day: dt,
start_day: float,
end_day: float,
event_types: tuple[str, ...],
json_quotable_device_ids: list[str],
) -> StatementLambdaElement:
@ -85,7 +84,7 @@ def devices_stmt(
end_day,
event_types,
json_quotable_device_ids,
).order_by(Events.time_fired)
).order_by(Events.time_fired_ts)
)
return stmt

View file

@ -2,7 +2,6 @@
from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime as dt
import sqlalchemy
from sqlalchemy import lambda_stmt, select, union_all
@ -12,7 +11,7 @@ from sqlalchemy.sql.selectable import CTE, CompoundSelect
from homeassistant.components.recorder.db_schema import (
ENTITY_ID_IN_EVENT,
ENTITY_ID_LAST_UPDATED_INDEX,
ENTITY_ID_LAST_UPDATED_INDEX_TS,
OLD_ENTITY_ID_IN_EVENT,
EventData,
Events,
@ -32,8 +31,8 @@ from .common import (
def _select_entities_context_ids_sub_query(
start_day: dt,
end_day: dt,
start_day: float,
end_day: float,
event_types: tuple[str, ...],
entity_ids: list[str],
json_quoted_entity_ids: list[str],
@ -44,7 +43,9 @@ def _select_entities_context_ids_sub_query(
apply_event_entity_id_matchers(json_quoted_entity_ids)
),
apply_entities_hints(select(States.context_id))
.filter((States.last_updated > start_day) & (States.last_updated < end_day))
.filter(
(States.last_updated_ts > start_day) & (States.last_updated_ts < end_day)
)
.where(States.entity_id.in_(entity_ids)),
)
return select(union.c.context_id).group_by(union.c.context_id)
@ -52,8 +53,8 @@ def _select_entities_context_ids_sub_query(
def _apply_entities_context_union(
query: Query,
start_day: dt,
end_day: dt,
start_day: float,
end_day: float,
event_types: tuple[str, ...],
entity_ids: list[str],
json_quoted_entity_ids: list[str],
@ -87,8 +88,8 @@ def _apply_entities_context_union(
def entities_stmt(
start_day: dt,
end_day: dt,
start_day: float,
end_day: float,
event_types: tuple[str, ...],
entity_ids: list[str],
json_quoted_entity_ids: list[str],
@ -104,12 +105,12 @@ def entities_stmt(
event_types,
entity_ids,
json_quoted_entity_ids,
).order_by(Events.time_fired)
).order_by(Events.time_fired_ts)
)
def states_query_for_entity_ids(
start_day: dt, end_day: dt, entity_ids: list[str]
start_day: float, end_day: float, entity_ids: list[str]
) -> Query:
"""Generate a select for states from the States table for specific entities."""
return apply_states_filters(
@ -136,5 +137,5 @@ def apply_event_entity_id_matchers(
def apply_entities_hints(query: Query) -> Query:
"""Force mysql to use the right index on large selects."""
return query.with_hint(
States, f"FORCE INDEX ({ENTITY_ID_LAST_UPDATED_INDEX})", dialect_name="mysql"
States, f"FORCE INDEX ({ENTITY_ID_LAST_UPDATED_INDEX_TS})", dialect_name="mysql"
)

View file

@ -2,7 +2,6 @@
from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime as dt
import sqlalchemy
from sqlalchemy import lambda_stmt, select, union_all
@ -29,8 +28,8 @@ from .entities import (
def _select_entities_device_id_context_ids_sub_query(
start_day: dt,
end_day: dt,
start_day: float,
end_day: float,
event_types: tuple[str, ...],
entity_ids: list[str],
json_quoted_entity_ids: list[str],
@ -44,7 +43,9 @@ def _select_entities_device_id_context_ids_sub_query(
)
),
apply_entities_hints(select(States.context_id))
.filter((States.last_updated > start_day) & (States.last_updated < end_day))
.filter(
(States.last_updated_ts > start_day) & (States.last_updated_ts < end_day)
)
.where(States.entity_id.in_(entity_ids)),
)
return select(union.c.context_id).group_by(union.c.context_id)
@ -52,8 +53,8 @@ def _select_entities_device_id_context_ids_sub_query(
def _apply_entities_devices_context_union(
query: Query,
start_day: dt,
end_day: dt,
start_day: float,
end_day: float,
event_types: tuple[str, ...],
entity_ids: list[str],
json_quoted_entity_ids: list[str],
@ -88,8 +89,8 @@ def _apply_entities_devices_context_union(
def entities_devices_stmt(
start_day: dt,
end_day: dt,
start_day: float,
end_day: float,
event_types: tuple[str, ...],
entity_ids: list[str],
json_quoted_entity_ids: list[str],
@ -109,7 +110,7 @@ def entities_devices_stmt(
entity_ids,
json_quoted_entity_ids,
json_quoted_device_ids,
).order_by(Events.time_fired)
).order_by(Events.time_fired_ts)
)
return stmt

View file

@ -1022,6 +1022,10 @@ class Recorder(threading.Thread):
self.event_session = self.get_session()
self.event_session.expire_on_commit = False
def _post_schema_migration(self, old_version: int, new_version: int) -> None:
"""Run post schema migration tasks."""
migration.post_schema_migration(self.event_session, old_version, new_version)
def _send_keep_alive(self) -> None:
"""Send a keep alive to keep the db connection open."""
assert self.event_session is not None

View file

@ -4,6 +4,7 @@ from __future__ import annotations
from collections.abc import Callable
from datetime import datetime, timedelta
import logging
import time
from typing import Any, TypeVar, cast
import ciso8601
@ -53,7 +54,7 @@ from .models import StatisticData, StatisticMetaData, process_timestamp
# pylint: disable=invalid-name
Base = declarative_base()
SCHEMA_VERSION = 30
SCHEMA_VERSION = 32
_StatisticsBaseSelfT = TypeVar("_StatisticsBaseSelfT", bound="StatisticsBase")
@ -90,8 +91,8 @@ TABLES_TO_CHECK = [
TABLE_SCHEMA_CHANGES,
]
LAST_UPDATED_INDEX = "ix_states_last_updated"
ENTITY_ID_LAST_UPDATED_INDEX = "ix_states_entity_id_last_updated"
LAST_UPDATED_INDEX_TS = "ix_states_last_updated_ts"
ENTITY_ID_LAST_UPDATED_INDEX_TS = "ix_states_entity_id_last_updated_ts"
EVENTS_CONTEXT_ID_INDEX = "ix_events_context_id"
STATES_CONTEXT_ID_INDEX = "ix_states_context_id"
@ -122,6 +123,8 @@ DOUBLE_TYPE = (
.with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
)
TIMESTAMP_TYPE = DOUBLE_TYPE
class JSONLiteral(JSON): # type: ignore[misc]
"""Teach SA how to literalize json."""
@ -146,7 +149,7 @@ class Events(Base): # type: ignore[misc,valid-type]
__table_args__ = (
# Used for fetching events at a specific time
# see logbook
Index("ix_events_event_type_time_fired", "event_type", "time_fired"),
Index("ix_events_event_type_time_fired_ts", "event_type", "time_fired_ts"),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_EVENTS
@ -155,7 +158,8 @@ class Events(Base): # type: ignore[misc,valid-type]
event_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used for new rows
origin_idx = Column(SmallInteger)
time_fired = Column(DATETIME_TYPE, index=True)
time_fired = Column(DATETIME_TYPE) # no longer used for new rows
time_fired_ts = Column(TIMESTAMP_TYPE, index=True)
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
@ -167,10 +171,18 @@ class Events(Base): # type: ignore[misc,valid-type]
return (
"<recorder.Events("
f"id={self.event_id}, type='{self.event_type}', "
f"origin_idx='{self.origin_idx}', time_fired='{self.time_fired}'"
f"origin_idx='{self.origin_idx}', time_fired='{self.time_fired_isotime}'"
f", data_id={self.data_id})>"
)
@property
def time_fired_isotime(self) -> str:
"""Return time_fired as an isotime string."""
date_time = dt_util.utc_from_timestamp(self.time_fired_ts) or process_timestamp(
self.time_fired
)
return date_time.isoformat(sep=" ", timespec="seconds")
@staticmethod
def from_event(event: Event) -> Events:
"""Create an event database object from a native event."""
@ -178,7 +190,8 @@ class Events(Base): # type: ignore[misc,valid-type]
event_type=event.event_type,
event_data=None,
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
time_fired=event.time_fired,
time_fired=None,
time_fired_ts=dt_util.utc_to_timestamp(event.time_fired),
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
@ -198,7 +211,7 @@ class Events(Base): # type: ignore[misc,valid-type]
EventOrigin(self.origin)
if self.origin
else EVENT_ORIGIN_ORDER[self.origin_idx],
process_timestamp(self.time_fired),
dt_util.utc_from_timestamp(self.time_fired_ts),
context=context,
)
except JSON_DECODE_EXCEPTIONS:
@ -261,7 +274,7 @@ class States(Base): # type: ignore[misc,valid-type]
__table_args__ = (
# Used for fetching the state of entities at a specific time
# (get_states in history.py)
Index(ENTITY_ID_LAST_UPDATED_INDEX, "entity_id", "last_updated"),
Index(ENTITY_ID_LAST_UPDATED_INDEX_TS, "entity_id", "last_updated_ts"),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATES
@ -274,8 +287,10 @@ class States(Base): # type: ignore[misc,valid-type]
event_id = Column( # no longer used for new rows
Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True
)
last_changed = Column(DATETIME_TYPE)
last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True)
last_changed = Column(DATETIME_TYPE) # no longer used for new rows
last_changed_ts = Column(TIMESTAMP_TYPE)
last_updated = Column(DATETIME_TYPE) # no longer used for new rows
last_updated_ts = Column(TIMESTAMP_TYPE, default=time.time, index=True)
old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True)
attributes_id = Column(
Integer, ForeignKey("state_attributes.attributes_id"), index=True
@ -292,10 +307,18 @@ class States(Base): # type: ignore[misc,valid-type]
return (
f"<recorder.States(id={self.state_id}, entity_id='{self.entity_id}',"
f" state='{self.state}', event_id='{self.event_id}',"
f" last_updated='{self.last_updated.isoformat(sep=' ', timespec='seconds')}',"
f" last_updated='{self.last_updated_isotime}',"
f" old_state_id={self.old_state_id}, attributes_id={self.attributes_id})>"
)
@property
def last_updated_isotime(self) -> str:
"""Return last_updated as an isotime string."""
date_time = dt_util.utc_from_timestamp(
self.last_updated_ts
) or process_timestamp(self.last_updated)
return date_time.isoformat(sep=" ", timespec="seconds")
@staticmethod
def from_event(event: Event) -> States:
"""Create object from a state_changed event."""
@ -308,21 +331,22 @@ class States(Base): # type: ignore[misc,valid-type]
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
last_updated=None,
last_changed=None,
)
# None state means the state was removed from the state machine
if state is None:
dbstate.state = ""
dbstate.last_updated = event.time_fired
dbstate.last_changed = None
dbstate.last_updated_ts = dt_util.utc_to_timestamp(event.time_fired)
dbstate.last_changed_ts = None
return dbstate
dbstate.state = state.state
dbstate.last_updated = state.last_updated
dbstate.last_updated_ts = dt_util.utc_to_timestamp(state.last_updated)
if state.last_updated == state.last_changed:
dbstate.last_changed = None
dbstate.last_changed_ts = None
else:
dbstate.last_changed = state.last_changed
dbstate.last_changed_ts = dt_util.utc_to_timestamp(state.last_changed)
return dbstate
@ -339,11 +363,13 @@ class States(Base): # type: ignore[misc,valid-type]
# When json_loads fails
_LOGGER.exception("Error converting row to state: %s", self)
return None
if self.last_changed is None or self.last_changed == self.last_updated:
last_changed = last_updated = process_timestamp(self.last_updated)
if self.last_changed_ts is None or self.last_changed_ts == self.last_updated_ts:
last_changed = last_updated = dt_util.utc_from_timestamp(
self.last_updated_ts or 0
)
else:
last_updated = process_timestamp(self.last_updated)
last_changed = process_timestamp(self.last_changed)
last_updated = dt_util.utc_from_timestamp(self.last_updated_ts or 0)
last_changed = dt_util.utc_from_timestamp(self.last_changed_ts or 0)
return State(
self.entity_id,
self.state,

View file

@ -29,10 +29,12 @@ from .db_schema import RecorderRuns, StateAttributes, States
from .filters import Filters
from .models import (
LazyState,
LazyStatePreSchema31,
process_datetime_to_timestamp,
process_timestamp,
process_timestamp_to_utc_isoformat,
row_to_compressed_state,
row_to_compressed_state_pre_schema_31,
)
from .util import execute_stmt_lambda_element, session_scope
@ -59,49 +61,84 @@ NEED_ATTRIBUTE_DOMAINS = {
"water_heater",
}
BASE_STATES = [
_BASE_STATES = [
States.entity_id,
States.state,
States.last_changed_ts,
States.last_updated_ts,
]
_BASE_STATES_NO_LAST_CHANGED = [
States.entity_id,
States.state,
literal(value=None).label("last_changed_ts"),
States.last_updated_ts,
]
_QUERY_STATE_NO_ATTR = [
*_BASE_STATES,
literal(value=None, type_=Text).label("attributes"),
literal(value=None, type_=Text).label("shared_attrs"),
]
_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED = [
*_BASE_STATES_NO_LAST_CHANGED,
literal(value=None, type_=Text).label("attributes"),
literal(value=None, type_=Text).label("shared_attrs"),
]
_BASE_STATES_PRE_SCHEMA_31 = [
States.entity_id,
States.state,
States.last_changed,
States.last_updated,
]
BASE_STATES_NO_LAST_CHANGED = [
_BASE_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31 = [
States.entity_id,
States.state,
literal(value=None, type_=Text).label("last_changed"),
States.last_updated,
]
QUERY_STATE_NO_ATTR = [
*BASE_STATES,
_QUERY_STATE_NO_ATTR_PRE_SCHEMA_31 = [
*_BASE_STATES_PRE_SCHEMA_31,
literal(value=None, type_=Text).label("attributes"),
literal(value=None, type_=Text).label("shared_attrs"),
]
QUERY_STATE_NO_ATTR_NO_LAST_CHANGED = [
*BASE_STATES_NO_LAST_CHANGED,
_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED_PRE_SCHEMA_31 = [
*_BASE_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31,
literal(value=None, type_=Text).label("attributes"),
literal(value=None, type_=Text).label("shared_attrs"),
]
# Remove QUERY_STATES_PRE_SCHEMA_25
# and the migration_in_progress check
# once schema 26 is created
QUERY_STATES_PRE_SCHEMA_25 = [
*BASE_STATES,
_QUERY_STATES_PRE_SCHEMA_25 = [
*_BASE_STATES_PRE_SCHEMA_31,
States.attributes,
literal(value=None, type_=Text).label("shared_attrs"),
]
QUERY_STATES_PRE_SCHEMA_25_NO_LAST_CHANGED = [
*BASE_STATES_NO_LAST_CHANGED,
_QUERY_STATES_PRE_SCHEMA_25_NO_LAST_CHANGED = [
*_BASE_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31,
States.attributes,
literal(value=None, type_=Text).label("shared_attrs"),
]
QUERY_STATES = [
*BASE_STATES,
_QUERY_STATES_PRE_SCHEMA_31 = [
*_BASE_STATES_PRE_SCHEMA_31,
# Remove States.attributes once all attributes are in StateAttributes.shared_attrs
States.attributes,
StateAttributes.shared_attrs,
]
QUERY_STATES_NO_LAST_CHANGED = [
*BASE_STATES_NO_LAST_CHANGED,
_QUERY_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31 = [
*_BASE_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31,
# Remove States.attributes once all attributes are in StateAttributes.shared_attrs
States.attributes,
StateAttributes.shared_attrs,
]
_QUERY_STATES = [
*_BASE_STATES,
# Remove States.attributes once all attributes are in StateAttributes.shared_attrs
States.attributes,
StateAttributes.shared_attrs,
]
_QUERY_STATES_NO_LAST_CHANGED = [
*_BASE_STATES_NO_LAST_CHANGED,
# Remove States.attributes once all attributes are in StateAttributes.shared_attrs
States.attributes,
StateAttributes.shared_attrs,
@ -124,10 +161,25 @@ def lambda_stmt_and_join_attributes(
# without the attributes fields and do not join the
# state_attributes table
if no_attributes:
if schema_version >= 31:
if include_last_changed:
return (
lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR)),
False,
)
return (
lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED)),
False,
)
if include_last_changed:
return lambda_stmt(lambda: select(*QUERY_STATE_NO_ATTR)), False
return (
lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR_PRE_SCHEMA_31)),
False,
)
return (
lambda_stmt(lambda: select(*QUERY_STATE_NO_ATTR_NO_LAST_CHANGED)),
lambda_stmt(
lambda: select(*_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED_PRE_SCHEMA_31)
),
False,
)
# If we in the process of migrating schema we do
@ -136,19 +188,27 @@ def lambda_stmt_and_join_attributes(
if schema_version < 25:
if include_last_changed:
return (
lambda_stmt(lambda: select(*QUERY_STATES_PRE_SCHEMA_25)),
lambda_stmt(lambda: select(*_QUERY_STATES_PRE_SCHEMA_25)),
False,
)
return (
lambda_stmt(lambda: select(*QUERY_STATES_PRE_SCHEMA_25_NO_LAST_CHANGED)),
lambda_stmt(lambda: select(*_QUERY_STATES_PRE_SCHEMA_25_NO_LAST_CHANGED)),
False,
)
if schema_version >= 31:
if include_last_changed:
return lambda_stmt(lambda: select(*_QUERY_STATES)), True
return lambda_stmt(lambda: select(*_QUERY_STATES_NO_LAST_CHANGED)), True
# Finally if no migration is in progress and no_attributes
# was not requested, we query both attributes columns and
# join state_attributes
if include_last_changed:
return lambda_stmt(lambda: select(*QUERY_STATES)), True
return lambda_stmt(lambda: select(*QUERY_STATES_NO_LAST_CHANGED)), True
return lambda_stmt(lambda: select(*_QUERY_STATES_PRE_SCHEMA_31)), True
return (
lambda_stmt(lambda: select(*_QUERY_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31)),
True,
)
def get_significant_states(
@ -211,22 +271,41 @@ def _significant_states_stmt(
and significant_changes_only
and split_entity_id(entity_ids[0])[0] not in SIGNIFICANT_DOMAINS
):
if schema_version >= 31:
stmt += lambda q: q.filter(
(States.last_changed_ts == States.last_updated_ts)
| States.last_changed_ts.is_(None)
)
stmt += lambda q: q.filter(
(States.last_changed == States.last_updated) | States.last_changed.is_(None)
)
elif significant_changes_only:
stmt += lambda q: q.filter(
or_(
*[
States.entity_id.like(entity_domain)
for entity_domain in SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE
],
(
(States.last_changed == States.last_updated)
| States.last_changed.is_(None)
),
if schema_version >= 31:
stmt += lambda q: q.filter(
or_(
*[
States.entity_id.like(entity_domain)
for entity_domain in SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE
],
(
(States.last_changed_ts == States.last_updated_ts)
| States.last_changed_ts.is_(None)
),
)
)
else:
stmt += lambda q: q.filter(
or_(
*[
States.entity_id.like(entity_domain)
for entity_domain in SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE
],
(
(States.last_changed == States.last_updated)
| States.last_changed.is_(None)
),
)
)
)
if entity_ids:
stmt += lambda q: q.filter(States.entity_id.in_(entity_ids))
@ -238,15 +317,25 @@ def _significant_states_stmt(
lambda q: q.filter(entity_filter), track_on=[filters]
)
stmt += lambda q: q.filter(States.last_updated > start_time)
if end_time:
stmt += lambda q: q.filter(States.last_updated < end_time)
if schema_version >= 31:
start_time_ts = start_time.timestamp()
stmt += lambda q: q.filter(States.last_updated_ts > start_time_ts)
if end_time:
end_time_ts = end_time.timestamp()
stmt += lambda q: q.filter(States.last_updated_ts < end_time_ts)
else:
stmt += lambda q: q.filter(States.last_updated > start_time)
if end_time:
stmt += lambda q: q.filter(States.last_updated < end_time)
if join_attributes:
stmt += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
stmt += lambda q: q.order_by(States.entity_id, States.last_updated)
if schema_version >= 31:
stmt += lambda q: q.order_by(States.entity_id, States.last_updated_ts)
else:
stmt += lambda q: q.order_by(States.entity_id, States.last_updated)
return stmt
@ -342,12 +431,29 @@ def _state_changed_during_period_stmt(
stmt, join_attributes = lambda_stmt_and_join_attributes(
schema_version, no_attributes, include_last_changed=False
)
stmt += lambda q: q.filter(
((States.last_changed == States.last_updated) | States.last_changed.is_(None))
& (States.last_updated > start_time)
)
if schema_version >= 31:
start_time_ts = start_time.timestamp()
stmt += lambda q: q.filter(
(
(States.last_changed_ts == States.last_updated_ts)
| States.last_changed_ts.is_(None)
)
& (States.last_updated_ts > start_time_ts)
)
else:
stmt += lambda q: q.filter(
(
(States.last_changed == States.last_updated)
| States.last_changed.is_(None)
)
& (States.last_updated > start_time)
)
if end_time:
stmt += lambda q: q.filter(States.last_updated < end_time)
if schema_version >= 31:
end_time_ts = end_time.timestamp()
stmt += lambda q: q.filter(States.last_updated_ts < end_time_ts)
else:
stmt += lambda q: q.filter(States.last_updated < end_time)
if entity_id:
stmt += lambda q: q.filter(States.entity_id == entity_id)
if join_attributes:
@ -355,9 +461,17 @@ def _state_changed_during_period_stmt(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
if descending:
stmt += lambda q: q.order_by(States.entity_id, States.last_updated.desc())
if schema_version >= 31:
stmt += lambda q: q.order_by(
States.entity_id, States.last_updated_ts.desc()
)
else:
stmt += lambda q: q.order_by(States.entity_id, States.last_updated.desc())
else:
stmt += lambda q: q.order_by(States.entity_id, States.last_updated)
if schema_version >= 31:
stmt += lambda q: q.order_by(States.entity_id, States.last_updated_ts)
else:
stmt += lambda q: q.order_by(States.entity_id, States.last_updated)
if limit:
stmt += lambda q: q.limit(limit)
return stmt
@ -409,18 +523,29 @@ def _get_last_state_changes_stmt(
stmt, join_attributes = lambda_stmt_and_join_attributes(
schema_version, False, include_last_changed=False
)
stmt += lambda q: q.filter(
(States.last_changed == States.last_updated) | States.last_changed.is_(None)
)
if schema_version >= 31:
stmt += lambda q: q.filter(
(States.last_changed_ts == States.last_updated_ts)
| States.last_changed_ts.is_(None)
)
else:
stmt += lambda q: q.filter(
(States.last_changed == States.last_updated) | States.last_changed.is_(None)
)
if entity_id:
stmt += lambda q: q.filter(States.entity_id == entity_id)
if join_attributes:
stmt += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
stmt += lambda q: q.order_by(States.entity_id, States.last_updated.desc()).limit(
number_of_states
)
if schema_version >= 31:
stmt += lambda q: q.order_by(
States.entity_id, States.last_updated_ts.desc()
).limit(number_of_states)
else:
stmt += lambda q: q.order_by(
States.entity_id, States.last_updated.desc()
).limit(number_of_states)
return stmt
@ -463,19 +588,36 @@ def _get_states_for_entites_stmt(
)
# We got an include-list of entities, accelerate the query by filtering already
# in the inner query.
stmt += lambda q: q.where(
States.state_id
== (
select(func.max(States.state_id).label("max_state_id"))
.filter(
(States.last_updated >= run_start)
& (States.last_updated < utc_point_in_time)
)
.filter(States.entity_id.in_(entity_ids))
.group_by(States.entity_id)
.subquery()
).c.max_state_id
)
if schema_version >= 31:
run_start_ts = run_start.timestamp()
utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
stmt += lambda q: q.where(
States.state_id
== (
select(func.max(States.state_id).label("max_state_id"))
.filter(
(States.last_updated_ts >= run_start_ts)
& (States.last_updated_ts < utc_point_in_time_ts)
)
.filter(States.entity_id.in_(entity_ids))
.group_by(States.entity_id)
.subquery()
).c.max_state_id
)
else:
stmt += lambda q: q.where(
States.state_id
== (
select(func.max(States.state_id).label("max_state_id"))
.filter(
(States.last_updated >= run_start)
& (States.last_updated < utc_point_in_time)
)
.filter(States.entity_id.in_(entity_ids))
.group_by(States.entity_id)
.subquery()
).c.max_state_id
)
if join_attributes:
stmt += lambda q: q.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
@ -484,10 +626,26 @@ def _get_states_for_entites_stmt(
def _generate_most_recent_states_by_date(
schema_version: int,
run_start: datetime,
utc_point_in_time: datetime,
) -> Subquery:
"""Generate the sub query for the most recent states by data."""
if schema_version >= 31:
run_start_ts = run_start.timestamp()
utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
return (
select(
States.entity_id.label("max_entity_id"),
func.max(States.last_updated_ts).label("max_last_updated"),
)
.filter(
(States.last_updated_ts >= run_start_ts)
& (States.last_updated_ts < utc_point_in_time_ts)
)
.group_by(States.entity_id)
.subquery()
)
return (
select(
States.entity_id.label("max_entity_id"),
@ -518,24 +676,42 @@ def _get_states_for_all_stmt(
# This filtering can't be done in the inner query because the domain column is
# not indexed and we can't control what's in the custom filter.
most_recent_states_by_date = _generate_most_recent_states_by_date(
run_start, utc_point_in_time
)
stmt += lambda q: q.where(
States.state_id
== (
select(func.max(States.state_id).label("max_state_id"))
.join(
most_recent_states_by_date,
and_(
States.entity_id == most_recent_states_by_date.c.max_entity_id,
States.last_updated
== most_recent_states_by_date.c.max_last_updated,
),
)
.group_by(States.entity_id)
.subquery()
).c.max_state_id,
schema_version, run_start, utc_point_in_time
)
if schema_version >= 31:
stmt += lambda q: q.where(
States.state_id
== (
select(func.max(States.state_id).label("max_state_id"))
.join(
most_recent_states_by_date,
and_(
States.entity_id == most_recent_states_by_date.c.max_entity_id,
States.last_updated_ts
== most_recent_states_by_date.c.max_last_updated,
),
)
.group_by(States.entity_id)
.subquery()
).c.max_state_id,
)
else:
stmt += lambda q: q.where(
States.state_id
== (
select(func.max(States.state_id).label("max_state_id"))
.join(
most_recent_states_by_date,
and_(
States.entity_id == most_recent_states_by_date.c.max_entity_id,
States.last_updated
== most_recent_states_by_date.c.max_last_updated,
),
)
.group_by(States.entity_id)
.subquery()
).c.max_state_id,
)
stmt += _ignore_domains_filter
if filters and filters.has_config:
entity_filter = filters.states_entity_filter()
@ -598,14 +774,25 @@ def _get_single_entity_states_stmt(
stmt, join_attributes = lambda_stmt_and_join_attributes(
schema_version, no_attributes, include_last_changed=True
)
stmt += (
lambda q: q.filter(
States.last_updated < utc_point_in_time,
States.entity_id == entity_id,
if schema_version >= 31:
utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
stmt += (
lambda q: q.filter(
States.last_updated_ts < utc_point_in_time_ts,
States.entity_id == entity_id,
)
.order_by(States.last_updated_ts.desc())
.limit(1)
)
else:
stmt += (
lambda q: q.filter(
States.last_updated < utc_point_in_time,
States.entity_id == entity_id,
)
.order_by(States.last_updated.desc())
.limit(1)
)
.order_by(States.last_updated.desc())
.limit(1)
)
if join_attributes:
stmt += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
@ -636,15 +823,24 @@ def _sorted_states_to_dict(
each list of states, otherwise our graphs won't start on the Y
axis correctly.
"""
schema_version = _schema_version(hass)
_process_timestamp: Callable[[datetime], float | str]
state_class: Callable[
[Row, dict[str, dict[str, Any]], datetime | None], State | dict[str, Any]
]
if compressed_state_format:
state_class = row_to_compressed_state
_process_timestamp: Callable[
[datetime], float | str
] = process_datetime_to_timestamp
if schema_version >= 31:
state_class = row_to_compressed_state
else:
state_class = row_to_compressed_state_pre_schema_31
_process_timestamp = process_datetime_to_timestamp
attr_time = COMPRESSED_STATE_LAST_UPDATED
attr_state = COMPRESSED_STATE_STATE
else:
state_class = LazyState # type: ignore[assignment]
if schema_version >= 31:
state_class = LazyState
else:
state_class = LazyStatePreSchema31
_process_timestamp = process_timestamp_to_utc_isoformat
attr_time = LAST_CHANGED_KEY
attr_state = STATE_KEY
@ -692,7 +888,9 @@ def _sorted_states_to_dict(
ent_results.append(state_class(row, attr_cache, start_time))
if not minimal_response or split_entity_id(ent_id)[0] in NEED_ATTRIBUTE_DOMAINS:
ent_results.extend(state_class(db_state, attr_cache) for db_state in group)
ent_results.extend(
state_class(db_state, attr_cache, None) for db_state in group
)
continue
# With minimal response we only provide a native
@ -703,26 +901,49 @@ def _sorted_states_to_dict(
if (first_state := next(group, None)) is None:
continue
prev_state = first_state.state
ent_results.append(state_class(first_state, attr_cache))
ent_results.append(state_class(first_state, attr_cache, None))
#
# minimal_response only makes sense with last_updated == last_updated
#
# We use last_updated for for last_changed since its the same
#
# With minimal response we do not care about attribute
# changes so we can filter out duplicate states
if schema_version < 31:
for row in group:
if (state := row.state) != prev_state:
ent_results.append(
{
attr_state: state,
attr_time: _process_timestamp(row.last_updated),
}
)
prev_state = state
continue
if compressed_state_format:
for row in group:
if (state := row.state) != prev_state:
ent_results.append(
{
attr_state: state,
attr_time: row.last_updated_ts,
}
)
prev_state = state
for row in group:
# With minimal response we do not care about attribute
# changes so we can filter out duplicate states
if (state := row.state) == prev_state:
continue
ent_results.append(
{
attr_state: state,
#
# minimal_response only makes sense with last_updated == last_updated
#
# We use last_updated for for last_changed since its the same
#
attr_time: _process_timestamp(row.last_updated),
}
)
prev_state = state
if (state := row.state) != prev_state:
ent_results.append(
{
attr_state: state,
attr_time: process_timestamp_to_utc_isoformat(
dt_util.utc_from_timestamp(row.last_updated_ts)
),
}
)
prev_state = state
# If there are no states beyond the initial state,
# the state a was never popped from initial_states

View file

@ -43,6 +43,7 @@ from .statistics import (
get_start_time,
validate_db_schema as statistics_validate_db_schema,
)
from .tasks import PostSchemaMigrationTask
from .util import session_scope
if TYPE_CHECKING:
@ -163,6 +164,9 @@ def migrate_schema(
)
statistics_correct_db_schema(instance, engine, session_maker, schema_errors)
if current_version != SCHEMA_VERSION:
instance.queue_task(PostSchemaMigrationTask(current_version, SCHEMA_VERSION))
def _create_index(
session_maker: Callable[[], Session], table_name: str, index_name: str
@ -492,6 +496,10 @@ def _apply_update( # noqa: C901
"""Perform operations to bring schema up to date."""
dialect = engine.dialect.name
big_int = "INTEGER(20)" if dialect == SupportedDialect.MYSQL else "INTEGER"
if dialect in (SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL):
timestamp_type = "DOUBLE PRECISION"
else:
timestamp_type = "FLOAT"
if new_version == 1:
_create_index(session_maker, "events", "ix_events_time_fired")
@ -821,10 +829,111 @@ def _apply_update( # noqa: C901
# Once we require SQLite >= 3.35.5, we should drop the column:
# ALTER TABLE statistics_meta DROP COLUMN state_unit_of_measurement
pass
elif new_version == 31:
# Once we require SQLite >= 3.35.5, we should drop the column:
# ALTER TABLE events DROP COLUMN time_fired
# ALTER TABLE states DROP COLUMN last_updated
# ALTER TABLE states DROP COLUMN last_changed
_add_columns(session_maker, "events", [f"time_fired_ts {timestamp_type}"])
_add_columns(
session_maker,
"states",
[f"last_updated_ts {timestamp_type}", f"last_changed_ts {timestamp_type}"],
)
_create_index(session_maker, "events", "ix_events_time_fired_ts")
_create_index(session_maker, "events", "ix_events_event_type_time_fired_ts")
_create_index(session_maker, "states", "ix_states_entity_id_last_updated_ts")
_create_index(session_maker, "states", "ix_states_last_updated_ts")
with session_scope(session=session_maker()) as session:
_migrate_columns_to_timestamp(hass, session, engine)
elif new_version == 32:
# Migration is done in two steps to ensure we can start using
# the new columns before we wipe the old ones.
_drop_index(session_maker, "states", "ix_states_entity_id_last_updated")
_drop_index(session_maker, "events", "ix_events_event_type_time_fired")
_drop_index(session_maker, "states", "ix_states_last_updated")
_drop_index(session_maker, "events", "ix_events_time_fired")
else:
raise ValueError(f"No schema migration defined for version {new_version}")
def post_schema_migration(
session: Session,
old_version: int,
new_version: int,
) -> None:
"""Post schema migration.
Run any housekeeping tasks after the schema migration has completed.
Post schema migration is run after the schema migration has completed
and the queue has been processed to ensure that we reduce the memory
pressure since events are held in memory until the queue is processed
which is blocked from being processed until the schema migration is
complete.
"""
if old_version < 32 <= new_version:
# In version 31 we migrated all the time_fired, last_updated, and last_changed
# columns to be timestamps. In version 32 we need to wipe the old columns
# since they are no longer used and take up a significant amount of space.
_wipe_old_string_time_columns(session)
def _wipe_old_string_time_columns(session: Session) -> None:
"""Wipe old string time columns to save space."""
# Wipe Events.time_fired since its been replaced by Events.time_fired_ts
# Wipe States.last_updated since its been replaced by States.last_updated_ts
# Wipe States.last_changed since its been replaced by States.last_changed_ts
session.execute(text("UPDATE events set time_fired=NULL;"))
session.execute(text("UPDATE states set last_updated=NULL, last_changed=NULL;"))
session.commit()
def _migrate_columns_to_timestamp(
hass: HomeAssistant, session: Session, engine: Engine
) -> None:
"""Migrate columns to use timestamp."""
# Migrate all data in Events.time_fired to Events.time_fired_ts
# Migrate all data in States.last_updated to States.last_updated_ts
# Migrate all data in States.last_changed to States.last_changed_ts
connection = session.connection()
if engine.dialect.name == SupportedDialect.SQLITE:
connection.execute(
text(
'UPDATE events set time_fired_ts=strftime("%s",time_fired) + '
"cast(substr(time_fired,-7) AS FLOAT);"
)
)
connection.execute(
text(
'UPDATE states set last_updated_ts=strftime("%s",last_updated) + '
"cast(substr(last_updated,-7) AS FLOAT), "
'last_changed_ts=strftime("%s",last_changed) + '
"cast(substr(last_changed,-7) AS FLOAT);"
)
)
elif engine.dialect.name == SupportedDialect.MYSQL:
connection.execute(
text("UPDATE events set time_fired_ts=UNIX_TIMESTAMP(time_fired);")
)
connection.execute(
text(
"UPDATE states set last_updated_ts=UNIX_TIMESTAMP(last_updated), "
"last_changed_ts=UNIX_TIMESTAMP(last_changed);"
)
)
elif engine.dialect.name == SupportedDialect.POSTGRESQL:
connection.execute(
text("UPDATE events set time_fired_ts=EXTRACT(EPOCH FROM time_fired);")
)
connection.execute(
text(
"UPDATE states set last_updated_ts=EXTRACT(EPOCH FROM last_updated), "
"last_changed_ts=EXTRACT(EPOCH FROM last_changed);"
)
)
def _initialize_database(session: Session) -> bool:
"""Initialize a new database, or a database created before introducing schema changes.
@ -840,7 +949,7 @@ def _initialize_database(session: Session) -> bool:
indexes = inspector.get_indexes("events")
for index in indexes:
if index["column_names"] == ["time_fired"]:
if index["column_names"] in (["time_fired"], ["time_fired_ts"]):
# Schema addition from version 1 detected. New DB.
session.add(StatisticsRuns(start=get_start_time()))
session.add(SchemaChanges(schema_version=SCHEMA_VERSION))

View file

@ -120,8 +120,8 @@ def process_datetime_to_timestamp(ts: datetime) -> float:
return ts.timestamp()
class LazyState(State):
"""A lazy version of core State."""
class LazyStatePreSchema31(State):
"""A lazy version of core State before schema 31."""
__slots__ = [
"_row",
@ -136,7 +136,7 @@ class LazyState(State):
self,
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None = None,
start_time: datetime | None,
) -> None:
"""Init the lazy state."""
self._row = row
@ -243,6 +243,114 @@ class LazyState(State):
)
class LazyState(State):
"""A lazy version of core State after schema 31."""
__slots__ = [
"_row",
"_attributes",
"_last_changed_ts",
"_last_updated_ts",
"_context",
"attr_cache",
]
def __init__( # pylint: disable=super-init-not-called
self,
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
) -> None:
"""Init the lazy state."""
self._row = row
self.entity_id: str = self._row.entity_id
self.state = self._row.state or ""
self._attributes: dict[str, Any] | None = None
self._last_updated_ts: float | None = self._row.last_updated_ts or (
dt_util.utc_to_timestamp(start_time) if start_time else None
)
self._last_changed_ts: float | None = (
self._row.last_changed_ts or self._last_updated_ts
)
self._context: Context | None = None
self.attr_cache = attr_cache
@property # type: ignore[override]
def attributes(self) -> dict[str, Any]:
"""State attributes."""
if self._attributes is None:
self._attributes = decode_attributes_from_row(self._row, self.attr_cache)
return self._attributes
@attributes.setter
def attributes(self, value: dict[str, Any]) -> None:
"""Set attributes."""
self._attributes = value
@property
def context(self) -> Context:
"""State context."""
if self._context is None:
self._context = Context(id=None)
return self._context
@context.setter
def context(self, value: Context) -> None:
"""Set context."""
self._context = value
@property
def last_changed(self) -> datetime:
"""Last changed datetime."""
assert self._last_changed_ts is not None
return dt_util.utc_from_timestamp(self._last_changed_ts)
@last_changed.setter
def last_changed(self, value: datetime) -> None:
"""Set last changed datetime."""
self._last_changed_ts = process_timestamp(value).timestamp()
@property
def last_updated(self) -> datetime:
"""Last updated datetime."""
assert self._last_updated_ts is not None
return dt_util.utc_from_timestamp(self._last_updated_ts)
@last_updated.setter
def last_updated(self, value: datetime) -> None:
"""Set last updated datetime."""
self._last_updated_ts = process_timestamp(value).timestamp()
def as_dict(self) -> dict[str, Any]: # type: ignore[override]
"""Return a dict representation of the LazyState.
Async friendly.
To be used for JSON serialization.
"""
last_updated_isoformat = self.last_updated.isoformat()
if self._last_changed_ts == self._last_updated_ts:
last_changed_isoformat = last_updated_isoformat
else:
last_changed_isoformat = self.last_changed.isoformat()
return {
"entity_id": self.entity_id,
"state": self.state,
"attributes": self._attributes or self.attributes,
"last_changed": last_changed_isoformat,
"last_updated": last_updated_isoformat,
}
def __eq__(self, other: Any) -> bool:
"""Return the comparison."""
return (
other.__class__ in [self.__class__, State]
and self.entity_id == other.entity_id
and self.state == other.state
and self.attributes == other.attributes
)
def decode_attributes_from_row(
row: Row, attr_cache: dict[str, dict[str, Any]]
) -> dict[str, Any]:
@ -263,9 +371,31 @@ def decode_attributes_from_row(
def row_to_compressed_state(
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None = None,
start_time: datetime | None,
) -> dict[str, Any]:
"""Convert a database row to a compressed state."""
"""Convert a database row to a compressed state schema 31 and later."""
comp_state = {
COMPRESSED_STATE_STATE: row.state,
COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache),
}
if start_time:
comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp(start_time)
else:
row_last_updated_ts: float = row.last_updated_ts
comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts
if (
row_changed_changed_ts := row.last_changed_ts
) and row_last_updated_ts != row_changed_changed_ts:
comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_changed_changed_ts
return comp_state
def row_to_compressed_state_pre_schema_31(
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
) -> dict[str, Any]:
"""Convert a database row to a compressed state before schema 31."""
comp_state = {
COMPRESSED_STATE_STATE: row.state,
COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache),

View file

@ -12,6 +12,7 @@ from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import distinct
from homeassistant.const import EVENT_STATE_CHANGED
import homeassistant.util.dt as dt_util
from .const import MAX_ROWS_TO_PURGE, SupportedDialect
from .db_schema import Events, StateAttributes, States
@ -233,7 +234,9 @@ def _select_state_attributes_ids_to_purge(
"""Return sets of state and attribute ids to purge."""
state_ids = set()
attributes_ids = set()
for state in session.execute(find_states_to_purge(purge_before)).all():
for state in session.execute(
find_states_to_purge(dt_util.utc_to_timestamp(purge_before))
).all():
state_ids.add(state.state_id)
if state.attributes_id:
attributes_ids.add(state.attributes_id)
@ -251,7 +254,9 @@ def _select_event_data_ids_to_purge(
"""Return sets of event and data ids to purge."""
event_ids = set()
data_ids = set()
for event in session.execute(find_events_to_purge(purge_before)).all():
for event in session.execute(
find_events_to_purge(dt_util.utc_to_timestamp(purge_before))
).all():
event_ids.add(event.event_id)
if event.data_id:
data_ids.add(event.data_id)
@ -420,7 +425,9 @@ def _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
still need to be able to purge them.
"""
events = session.execute(
find_legacy_event_state_and_attributes_and_data_ids_to_purge(purge_before)
find_legacy_event_state_and_attributes_and_data_ids_to_purge(
dt_util.utc_to_timestamp(purge_before)
)
).all()
_LOGGER.debug("Selected %s event ids to remove", len(events))
event_ids = set()

View file

@ -578,20 +578,20 @@ def delete_recorder_runs_rows(
)
def find_events_to_purge(purge_before: datetime) -> StatementLambdaElement:
def find_events_to_purge(purge_before: float) -> StatementLambdaElement:
"""Find events to purge."""
return lambda_stmt(
lambda: select(Events.event_id, Events.data_id)
.filter(Events.time_fired < purge_before)
.filter(Events.time_fired_ts < purge_before)
.limit(MAX_ROWS_TO_PURGE)
)
def find_states_to_purge(purge_before: datetime) -> StatementLambdaElement:
def find_states_to_purge(purge_before: float) -> StatementLambdaElement:
"""Find states to purge."""
return lambda_stmt(
lambda: select(States.state_id, States.attributes_id)
.filter(States.last_updated < purge_before)
.filter(States.last_updated_ts < purge_before)
.limit(MAX_ROWS_TO_PURGE)
)
@ -624,7 +624,7 @@ def find_latest_statistics_runs_run_id() -> StatementLambdaElement:
def find_legacy_event_state_and_attributes_and_data_ids_to_purge(
purge_before: datetime,
purge_before: float,
) -> StatementLambdaElement:
"""Find the latest row in the legacy format to purge."""
return lambda_stmt(
@ -632,7 +632,7 @@ def find_legacy_event_state_and_attributes_and_data_ids_to_purge(
Events.event_id, Events.data_id, States.state_id, States.attributes_id
)
.outerjoin(States, Events.event_id == States.event_id)
.filter(Events.time_fired < purge_before)
.filter(Events.time_fired_ts < purge_before)
.limit(MAX_ROWS_TO_PURGE)
)

View file

@ -297,3 +297,17 @@ class SynchronizeTask(RecorderTask):
# Does not use a tracked task to avoid
# blocking shutdown if the recorder is broken
instance.hass.loop.call_soon_threadsafe(self.event.set)
@dataclass
class PostSchemaMigrationTask(RecorderTask):
"""Post migration task to update schema."""
old_version: int
new_version: int
def run(self, instance: Recorder) -> None:
"""Handle the task."""
instance._post_schema_migration( # pylint: disable=[protected-access]
self.old_version, self.new_version
)

File diff suppressed because it is too large Load diff

View file

@ -27,6 +27,7 @@ class MockRow:
self.shared_data = json.dumps(data, cls=JSONEncoder)
self.data = data
self.time_fired = dt_util.utcnow()
self.time_fired_ts = dt_util.utc_to_timestamp(self.time_fired)
self.context_parent_id = context.parent_id if context else None
self.context_user_id = context.user_id if context else None
self.context_id = context.id if context else None

View file

@ -313,16 +313,17 @@ def create_state_changed_event_from_old_new(
row = collections.namedtuple(
"Row",
[
"event_type"
"event_data"
"time_fired"
"context_id"
"context_user_id"
"context_parent_id"
"state"
"entity_id"
"domain"
"attributes"
"event_type",
"event_data",
"time_fired",
"time_fired_ts",
"context_id",
"context_user_id",
"context_parent_id",
"state",
"entity_id",
"domain",
"attributes",
"state_id",
"old_state_id",
"shared_attrs",
@ -337,6 +338,7 @@ def create_state_changed_event_from_old_new(
row.attributes = attributes_json
row.shared_attrs = attributes_json
row.time_fired = event_time_fired
row.time_fired_ts = dt_util.utc_to_timestamp(event_time_fired)
row.state = new_state and new_state.get("state")
row.entity_id = entity_id
row.domain = entity_id and ha.split_entity_id(entity_id)[0]

View file

@ -14,6 +14,7 @@ from __future__ import annotations
from datetime import datetime, timedelta
import json
import logging
import time
from typing import TypedDict, overload
from sqlalchemy import (
@ -89,6 +90,8 @@ DOUBLE_TYPE = (
.with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
)
TIMESTAMP_TYPE = DOUBLE_TYPE
class Events(Base): # type: ignore
"""Event history data."""
@ -108,6 +111,9 @@ class Events(Base): # type: ignore
SmallInteger
) # *** Not originally in v23, only added for recorder to startup ok
time_fired = Column(DATETIME_TYPE, index=True)
time_fired_ts = Column(
TIMESTAMP_TYPE, index=True
) # *** Not originally in v23, only added for recorder to startup ok
created = Column(DATETIME_TYPE, default=dt_util.utcnow)
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
@ -197,7 +203,13 @@ class States(Base): # type: ignore
Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True
)
last_changed = Column(DATETIME_TYPE, default=dt_util.utcnow)
last_updated_ts = Column(
TIMESTAMP_TYPE, default=time.time
) # *** Not originally in v23, only added for recorder to startup ok
last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True)
last_updated_ts = Column(
TIMESTAMP_TYPE, default=time.time, index=True
) # *** Not originally in v23, only added for recorder to startup ok
created = Column(DATETIME_TYPE, default=dt_util.utcnow)
old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True)
event = relationship("Events", uselist=False)

View file

@ -0,0 +1,674 @@
"""Models for SQLAlchemy.
This file contains the model definitions for schema version 30.
It is used to test the schema migration logic.
"""
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime, timedelta
import logging
from typing import Any, TypedDict, TypeVar, cast, overload
import ciso8601
from fnvhash import fnv1a_32
from sqlalchemy import (
JSON,
BigInteger,
Boolean,
Column,
DateTime,
Float,
ForeignKey,
Identity,
Index,
Integer,
SmallInteger,
String,
Text,
distinct,
type_coerce,
)
from sqlalchemy.dialects import mysql, oracle, postgresql, sqlite
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import aliased, declarative_base, relationship
from sqlalchemy.orm.session import Session
from homeassistant.const import (
ATTR_ATTRIBUTION,
ATTR_RESTORED,
ATTR_SUPPORTED_FEATURES,
MAX_LENGTH_EVENT_CONTEXT_ID,
MAX_LENGTH_EVENT_EVENT_TYPE,
MAX_LENGTH_EVENT_ORIGIN,
MAX_LENGTH_STATE_ENTITY_ID,
MAX_LENGTH_STATE_STATE,
)
from homeassistant.core import Context, Event, EventOrigin, State, split_entity_id
from homeassistant.helpers.json import (
JSON_DECODE_EXCEPTIONS,
JSON_DUMP,
json_bytes,
json_loads,
)
import homeassistant.util.dt as dt_util
ALL_DOMAIN_EXCLUDE_ATTRS = {ATTR_ATTRIBUTION, ATTR_RESTORED, ATTR_SUPPORTED_FEATURES}
# SQLAlchemy Schema
# pylint: disable=invalid-name
Base = declarative_base()
SCHEMA_VERSION = 30
_StatisticsBaseSelfT = TypeVar("_StatisticsBaseSelfT", bound="StatisticsBase")
_LOGGER = logging.getLogger(__name__)
TABLE_EVENTS = "events"
TABLE_EVENT_DATA = "event_data"
TABLE_STATES = "states"
TABLE_STATE_ATTRIBUTES = "state_attributes"
TABLE_RECORDER_RUNS = "recorder_runs"
TABLE_SCHEMA_CHANGES = "schema_changes"
TABLE_STATISTICS = "statistics"
TABLE_STATISTICS_META = "statistics_meta"
TABLE_STATISTICS_RUNS = "statistics_runs"
TABLE_STATISTICS_SHORT_TERM = "statistics_short_term"
ALL_TABLES = [
TABLE_STATES,
TABLE_STATE_ATTRIBUTES,
TABLE_EVENTS,
TABLE_EVENT_DATA,
TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES,
TABLE_STATISTICS,
TABLE_STATISTICS_META,
TABLE_STATISTICS_RUNS,
TABLE_STATISTICS_SHORT_TERM,
]
TABLES_TO_CHECK = [
TABLE_STATES,
TABLE_EVENTS,
TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES,
]
LAST_UPDATED_INDEX = "ix_states_last_updated"
ENTITY_ID_LAST_UPDATED_INDEX = "ix_states_entity_id_last_updated"
EVENTS_CONTEXT_ID_INDEX = "ix_events_context_id"
STATES_CONTEXT_ID_INDEX = "ix_states_context_id"
class FAST_PYSQLITE_DATETIME(sqlite.DATETIME): # type: ignore[misc]
"""Use ciso8601 to parse datetimes instead of sqlalchemy built-in regex."""
def result_processor(self, dialect, coltype): # type: ignore[no-untyped-def]
"""Offload the datetime parsing to ciso8601."""
return lambda value: None if value is None else ciso8601.parse_datetime(value)
JSON_VARIANT_CAST = Text().with_variant(
postgresql.JSON(none_as_null=True), "postgresql"
)
JSONB_VARIANT_CAST = Text().with_variant(
postgresql.JSONB(none_as_null=True), "postgresql"
)
DATETIME_TYPE = (
DateTime(timezone=True)
.with_variant(mysql.DATETIME(timezone=True, fsp=6), "mysql")
.with_variant(FAST_PYSQLITE_DATETIME(), "sqlite")
)
DOUBLE_TYPE = (
Float()
.with_variant(mysql.DOUBLE(asdecimal=False), "mysql")
.with_variant(oracle.DOUBLE_PRECISION(), "oracle")
.with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
)
TIMESTAMP_TYPE = DOUBLE_TYPE
class UnsupportedDialect(Exception):
"""The dialect or its version is not supported."""
class StatisticResult(TypedDict):
"""Statistic result data class.
Allows multiple datapoints for the same statistic_id.
"""
meta: StatisticMetaData
stat: StatisticData
class StatisticDataBase(TypedDict):
"""Mandatory fields for statistic data class."""
start: datetime
class StatisticData(StatisticDataBase, total=False):
"""Statistic data class."""
mean: float
min: float
max: float
last_reset: datetime | None
state: float
sum: float
class StatisticMetaData(TypedDict):
"""Statistic meta data class."""
has_mean: bool
has_sum: bool
name: str | None
source: str
statistic_id: str
unit_of_measurement: str | None
class JSONLiteral(JSON): # type: ignore[misc]
"""Teach SA how to literalize json."""
def literal_processor(self, dialect: str) -> Callable[[Any], str]:
"""Processor to convert a value to JSON."""
def process(value: Any) -> str:
"""Dump json."""
return JSON_DUMP(value)
return process
EVENT_ORIGIN_ORDER = [EventOrigin.local, EventOrigin.remote]
EVENT_ORIGIN_TO_IDX = {origin: idx for idx, origin in enumerate(EVENT_ORIGIN_ORDER)}
class Events(Base): # type: ignore[misc,valid-type]
"""Event history data."""
__table_args__ = (
# Used for fetching events at a specific time
# see logbook
Index("ix_events_event_type_time_fired", "event_type", "time_fired"),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_EVENTS
event_id = Column(Integer, Identity(), primary_key=True)
event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE))
event_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used for new rows
origin_idx = Column(SmallInteger)
time_fired = Column(DATETIME_TYPE, index=True)
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
data_id = Column(Integer, ForeignKey("event_data.data_id"), index=True)
event_data_rel = relationship("EventData")
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
"<recorder.Events("
f"id={self.event_id}, type='{self.event_type}', "
f"origin_idx='{self.origin_idx}', time_fired='{self.time_fired}'"
f", data_id={self.data_id})>"
)
@staticmethod
def from_event(event: Event) -> Events:
"""Create an event database object from a native event."""
return Events(
event_type=event.event_type,
event_data=None,
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
time_fired=event.time_fired,
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
)
def to_native(self, validate_entity_id: bool = True) -> Event | None:
"""Convert to a native HA Event."""
context = Context(
id=self.context_id,
user_id=self.context_user_id,
parent_id=self.context_parent_id,
)
try:
return Event(
self.event_type,
json_loads(self.event_data) if self.event_data else {},
EventOrigin(self.origin)
if self.origin
else EVENT_ORIGIN_ORDER[self.origin_idx],
process_timestamp(self.time_fired),
context=context,
)
except JSON_DECODE_EXCEPTIONS:
# When json_loads fails
_LOGGER.exception("Error converting to event: %s", self)
return None
class EventData(Base): # type: ignore[misc,valid-type]
"""Event data history."""
__table_args__ = (
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_EVENT_DATA
data_id = Column(Integer, Identity(), primary_key=True)
hash = Column(BigInteger, index=True)
# Note that this is not named attributes to avoid confusion with the states table
shared_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
"<recorder.EventData("
f"id={self.data_id}, hash='{self.hash}', data='{self.shared_data}'"
")>"
)
@staticmethod
def from_event(event: Event) -> EventData:
"""Create object from an event."""
shared_data = json_bytes(event.data)
return EventData(
shared_data=shared_data.decode("utf-8"),
hash=EventData.hash_shared_data_bytes(shared_data),
)
@staticmethod
def shared_data_bytes_from_event(event: Event) -> bytes:
"""Create shared_data from an event."""
return json_bytes(event.data)
@staticmethod
def hash_shared_data_bytes(shared_data_bytes: bytes) -> int:
"""Return the hash of json encoded shared data."""
return cast(int, fnv1a_32(shared_data_bytes))
def to_native(self) -> dict[str, Any]:
"""Convert to an HA state object."""
try:
return cast(dict[str, Any], json_loads(self.shared_data))
except JSON_DECODE_EXCEPTIONS:
_LOGGER.exception("Error converting row to event data: %s", self)
return {}
class States(Base): # type: ignore[misc,valid-type]
"""State change history."""
__table_args__ = (
# Used for fetching the state of entities at a specific time
# (get_states in history.py)
Index(ENTITY_ID_LAST_UPDATED_INDEX, "entity_id", "last_updated"),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATES
state_id = Column(Integer, Identity(), primary_key=True)
entity_id = Column(String(MAX_LENGTH_STATE_ENTITY_ID))
state = Column(String(MAX_LENGTH_STATE_STATE))
attributes = Column(
Text().with_variant(mysql.LONGTEXT, "mysql")
) # no longer used for new rows
event_id = Column( # no longer used for new rows
Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True
)
last_changed = Column(DATETIME_TYPE)
last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True)
old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True)
attributes_id = Column(
Integer, ForeignKey("state_attributes.attributes_id"), index=True
)
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
origin_idx = Column(SmallInteger) # 0 is local, 1 is remote
old_state = relationship("States", remote_side=[state_id])
state_attributes = relationship("StateAttributes")
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.States(id={self.state_id}, entity_id='{self.entity_id}',"
f" state='{self.state}', event_id='{self.event_id}',"
f" last_updated='{self.last_updated.isoformat(sep=' ', timespec='seconds')}',"
f" old_state_id={self.old_state_id}, attributes_id={self.attributes_id})>"
)
@staticmethod
def from_event(event: Event) -> States:
"""Create object from a state_changed event."""
entity_id = event.data["entity_id"]
state: State | None = event.data.get("new_state")
dbstate = States(
entity_id=entity_id,
attributes=None,
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
)
# None state means the state was removed from the state machine
if state is None:
dbstate.state = ""
dbstate.last_updated = event.time_fired
dbstate.last_changed = None
return dbstate
dbstate.state = state.state
dbstate.last_updated = state.last_updated
if state.last_updated == state.last_changed:
dbstate.last_changed = None
else:
dbstate.last_changed = state.last_changed
return dbstate
def to_native(self, validate_entity_id: bool = True) -> State | None:
"""Convert to an HA state object."""
context = Context(
id=self.context_id,
user_id=self.context_user_id,
parent_id=self.context_parent_id,
)
try:
attrs = json_loads(self.attributes) if self.attributes else {}
except JSON_DECODE_EXCEPTIONS:
# When json_loads fails
_LOGGER.exception("Error converting row to state: %s", self)
return None
if self.last_changed is None or self.last_changed == self.last_updated:
last_changed = last_updated = process_timestamp(self.last_updated)
else:
last_updated = process_timestamp(self.last_updated)
last_changed = process_timestamp(self.last_changed)
return State(
self.entity_id,
self.state,
# Join the state_attributes table on attributes_id to get the attributes
# for newer states
attrs,
last_changed,
last_updated,
context=context,
validate_entity_id=validate_entity_id,
)
class StateAttributes(Base): # type: ignore[misc,valid-type]
"""State attribute change history."""
__table_args__ = (
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATE_ATTRIBUTES
attributes_id = Column(Integer, Identity(), primary_key=True)
hash = Column(BigInteger, index=True)
# Note that this is not named attributes to avoid confusion with the states table
shared_attrs = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.StateAttributes(id={self.attributes_id}, hash='{self.hash}',"
f" attributes='{self.shared_attrs}')>"
)
@staticmethod
def from_event(event: Event) -> StateAttributes:
"""Create object from a state_changed event."""
state: State | None = event.data.get("new_state")
# None state means the state was removed from the state machine
attr_bytes = b"{}" if state is None else json_bytes(state.attributes)
dbstate = StateAttributes(shared_attrs=attr_bytes.decode("utf-8"))
dbstate.hash = StateAttributes.hash_shared_attrs_bytes(attr_bytes)
return dbstate
@staticmethod
def shared_attrs_bytes_from_event(
event: Event, exclude_attrs_by_domain: dict[str, set[str]]
) -> bytes:
"""Create shared_attrs from a state_changed event."""
state: State | None = event.data.get("new_state")
# None state means the state was removed from the state machine
if state is None:
return b"{}"
domain = split_entity_id(state.entity_id)[0]
exclude_attrs = (
exclude_attrs_by_domain.get(domain, set()) | ALL_DOMAIN_EXCLUDE_ATTRS
)
return json_bytes(
{k: v for k, v in state.attributes.items() if k not in exclude_attrs}
)
@staticmethod
def hash_shared_attrs_bytes(shared_attrs_bytes: bytes) -> int:
"""Return the hash of json encoded shared attributes."""
return cast(int, fnv1a_32(shared_attrs_bytes))
def to_native(self) -> dict[str, Any]:
"""Convert to an HA state object."""
try:
return cast(dict[str, Any], json_loads(self.shared_attrs))
except JSON_DECODE_EXCEPTIONS:
# When json_loads fails
_LOGGER.exception("Error converting row to state attributes: %s", self)
return {}
class StatisticsBase:
"""Statistics base class."""
id = Column(Integer, Identity(), primary_key=True)
created = Column(DATETIME_TYPE, default=dt_util.utcnow)
@declared_attr # type: ignore[misc]
def metadata_id(self) -> Column:
"""Define the metadata_id column for sub classes."""
return Column(
Integer,
ForeignKey(f"{TABLE_STATISTICS_META}.id", ondelete="CASCADE"),
index=True,
)
start = Column(DATETIME_TYPE, index=True)
mean = Column(DOUBLE_TYPE)
min = Column(DOUBLE_TYPE)
max = Column(DOUBLE_TYPE)
last_reset = Column(DATETIME_TYPE)
state = Column(DOUBLE_TYPE)
sum = Column(DOUBLE_TYPE)
@classmethod
def from_stats(
cls: type[_StatisticsBaseSelfT], metadata_id: int, stats: StatisticData
) -> _StatisticsBaseSelfT:
"""Create object from a statistics."""
return cls( # type: ignore[call-arg,misc]
metadata_id=metadata_id,
**stats,
)
class Statistics(Base, StatisticsBase): # type: ignore[misc,valid-type]
"""Long term statistics."""
duration = timedelta(hours=1)
__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index("ix_statistics_statistic_id_start", "metadata_id", "start", unique=True),
)
__tablename__ = TABLE_STATISTICS
class StatisticsShortTerm(Base, StatisticsBase): # type: ignore[misc,valid-type]
"""Short term statistics."""
duration = timedelta(minutes=5)
__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index(
"ix_statistics_short_term_statistic_id_start",
"metadata_id",
"start",
unique=True,
),
)
__tablename__ = TABLE_STATISTICS_SHORT_TERM
class StatisticsMeta(Base): # type: ignore[misc,valid-type]
"""Statistics meta data."""
__table_args__ = (
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATISTICS_META
id = Column(Integer, Identity(), primary_key=True)
statistic_id = Column(String(255), index=True, unique=True)
source = Column(String(32))
unit_of_measurement = Column(String(255))
has_mean = Column(Boolean)
has_sum = Column(Boolean)
name = Column(String(255))
@staticmethod
def from_meta(meta: StatisticMetaData) -> StatisticsMeta:
"""Create object from meta data."""
return StatisticsMeta(**meta)
class RecorderRuns(Base): # type: ignore[misc,valid-type]
"""Representation of recorder run."""
__table_args__ = (Index("ix_recorder_runs_start_end", "start", "end"),)
__tablename__ = TABLE_RECORDER_RUNS
run_id = Column(Integer, Identity(), primary_key=True)
start = Column(DATETIME_TYPE, default=dt_util.utcnow)
end = Column(DATETIME_TYPE)
closed_incorrect = Column(Boolean, default=False)
created = Column(DATETIME_TYPE, default=dt_util.utcnow)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
end = (
f"'{self.end.isoformat(sep=' ', timespec='seconds')}'" if self.end else None
)
return (
f"<recorder.RecorderRuns(id={self.run_id},"
f" start='{self.start.isoformat(sep=' ', timespec='seconds')}', end={end},"
f" closed_incorrect={self.closed_incorrect},"
f" created='{self.created.isoformat(sep=' ', timespec='seconds')}')>"
)
def entity_ids(self, point_in_time: datetime | None = None) -> list[str]:
"""Return the entity ids that existed in this run.
Specify point_in_time if you want to know which existed at that point
in time inside the run.
"""
session = Session.object_session(self)
assert session is not None, "RecorderRuns need to be persisted"
query = session.query(distinct(States.entity_id)).filter(
States.last_updated >= self.start
)
if point_in_time is not None:
query = query.filter(States.last_updated < point_in_time)
elif self.end is not None:
query = query.filter(States.last_updated < self.end)
return [row[0] for row in query]
def to_native(self, validate_entity_id: bool = True) -> RecorderRuns:
"""Return self, native format is this model."""
return self
class SchemaChanges(Base): # type: ignore[misc,valid-type]
"""Representation of schema version changes."""
__tablename__ = TABLE_SCHEMA_CHANGES
change_id = Column(Integer, Identity(), primary_key=True)
schema_version = Column(Integer)
changed = Column(DATETIME_TYPE, default=dt_util.utcnow)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
"<recorder.SchemaChanges("
f"id={self.change_id}, schema_version={self.schema_version}, "
f"changed='{self.changed.isoformat(sep=' ', timespec='seconds')}'"
")>"
)
class StatisticsRuns(Base): # type: ignore[misc,valid-type]
"""Representation of statistics run."""
__tablename__ = TABLE_STATISTICS_RUNS
run_id = Column(Integer, Identity(), primary_key=True)
start = Column(DATETIME_TYPE, index=True)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.StatisticsRuns(id={self.run_id},"
f" start='{self.start.isoformat(sep=' ', timespec='seconds')}', )>"
)
EVENT_DATA_JSON = type_coerce(
EventData.shared_data.cast(JSONB_VARIANT_CAST), JSONLiteral(none_as_null=True)
)
OLD_FORMAT_EVENT_DATA_JSON = type_coerce(
Events.event_data.cast(JSONB_VARIANT_CAST), JSONLiteral(none_as_null=True)
)
SHARED_ATTRS_JSON = type_coerce(
StateAttributes.shared_attrs.cast(JSON_VARIANT_CAST), JSON(none_as_null=True)
)
OLD_FORMAT_ATTRS_JSON = type_coerce(
States.attributes.cast(JSON_VARIANT_CAST), JSON(none_as_null=True)
)
ENTITY_ID_IN_EVENT: Column = EVENT_DATA_JSON["entity_id"]
OLD_ENTITY_ID_IN_EVENT: Column = OLD_FORMAT_EVENT_DATA_JSON["entity_id"]
DEVICE_ID_IN_EVENT: Column = EVENT_DATA_JSON["device_id"]
OLD_STATE = aliased(States, name="old_state")
@overload
def process_timestamp(ts: None) -> None:
...
@overload
def process_timestamp(ts: datetime) -> datetime:
...
def process_timestamp(ts: datetime | None) -> datetime | None:
"""Process a timestamp into datetime object."""
if ts is None:
return None
if ts.tzinfo is None:
return ts.replace(tzinfo=dt_util.UTC)
return dt_util.as_utc(ts)

View file

@ -7,25 +7,34 @@ from datetime import datetime, timedelta
import json
from unittest.mock import patch, sentinel
from freezegun import freeze_time
import pytest
from sqlalchemy import text
from homeassistant.components import recorder
from homeassistant.components.recorder import history
from homeassistant.components.recorder import get_instance, history
from homeassistant.components.recorder.db_schema import (
Events,
RecorderRuns,
StateAttributes,
States,
)
from homeassistant.components.recorder.models import LazyState, process_timestamp
from homeassistant.components.recorder.models import (
LazyState,
LazyStatePreSchema31,
process_timestamp,
)
from homeassistant.components.recorder.util import session_scope
import homeassistant.core as ha
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers.json import JSONEncoder
import homeassistant.util.dt as dt_util
from .common import async_wait_recording_done, wait_recording_done
from .common import (
async_recorder_block_till_done,
async_wait_recording_done,
wait_recording_done,
)
from tests.common import SetupRecorderInstanceT, mock_state_change_event
@ -40,10 +49,14 @@ async def _async_get_states(
"""Get states from the database."""
def _get_states_with_session():
if get_instance(hass).schema_version < 31:
klass = LazyStatePreSchema31
else:
klass = LazyState
with session_scope(hass=hass) as session:
attr_cache = {}
return [
LazyState(row, attr_cache)
klass(row, attr_cache, None)
for row in history._get_rows_with_session(
hass,
session,
@ -579,6 +592,27 @@ def test_get_significant_states_only(hass_recorder):
assert states == hist[entity_id]
async def test_get_significant_states_only_minimal_response(recorder_mock, hass):
"""Test significant states when significant_states_only is True."""
now = dt_util.utcnow()
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "on", attributes={"any": "attr"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "off", attributes={"any": "attr"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "off", attributes={"any": "changed"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "off", attributes={"any": "again"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "on", attributes={"any": "attr"})
await async_wait_recording_done(hass)
hist = history.get_significant_states(
hass, now, minimal_response=True, significant_changes_only=False
)
assert len(hist["sensor.test"]) == 3
def record_states(hass) -> tuple[datetime, datetime, dict[str, list[State]]]:
"""Record some test states.
@ -884,7 +918,7 @@ async def test_get_full_significant_states_handles_empty_last_changed(
!= native_sensor_one_states[1].last_updated
)
def _fetch_db_states() -> list[State]:
def _fetch_db_states() -> list[States]:
with session_scope(hass=hass) as session:
states = list(session.query(States))
session.expunge_all()
@ -894,12 +928,20 @@ async def test_get_full_significant_states_handles_empty_last_changed(
_fetch_db_states
)
assert db_sensor_one_states[0].last_changed is None
assert db_sensor_one_states[0].last_changed_ts is None
assert (
process_timestamp(db_sensor_one_states[1].last_changed) == state0.last_changed
process_timestamp(
dt_util.utc_from_timestamp(db_sensor_one_states[1].last_changed_ts)
)
== state0.last_changed
)
assert db_sensor_one_states[0].last_updated_ts is not None
assert db_sensor_one_states[1].last_updated_ts is not None
assert (
db_sensor_one_states[0].last_updated_ts
!= db_sensor_one_states[1].last_updated_ts
)
assert db_sensor_one_states[0].last_updated is not None
assert db_sensor_one_states[1].last_updated is not None
assert db_sensor_one_states[0].last_updated != db_sensor_one_states[1].last_updated
def test_state_changes_during_period_multiple_entities_single_test(hass_recorder):
@ -929,3 +971,38 @@ def test_state_changes_during_period_multiple_entities_single_test(hass_recorder
hist = history.state_changes_during_period(hass, start, end, None)
for entity_id, value in test_entites.items():
hist[entity_id][0].state == value
async def test_get_full_significant_states_past_year_2038(
async_setup_recorder_instance: SetupRecorderInstanceT,
hass: ha.HomeAssistant,
):
"""Test we can store times past year 2038."""
await async_setup_recorder_instance(hass, {})
past_2038_time = dt_util.parse_datetime("2039-01-19 03:14:07.555555-00:00")
with freeze_time(past_2038_time):
hass.states.async_set("sensor.one", "on", {"attr": "original"})
state0 = hass.states.get("sensor.one")
await hass.async_block_till_done()
hass.states.async_set("sensor.one", "on", {"attr": "new"})
state1 = hass.states.get("sensor.one")
await async_wait_recording_done(hass)
def _get_entries():
with session_scope(hass=hass) as session:
return history.get_full_significant_states_with_session(
hass,
session,
past_2038_time - timedelta(days=365),
past_2038_time + timedelta(days=365),
entity_ids=["sensor.one"],
significant_changes_only=False,
)
states = await recorder.get_instance(hass).async_add_executor_job(_get_entries)
sensor_one_states: list[State] = states["sensor.one"]
assert sensor_one_states[0] == state0
assert sensor_one_states[1] == state1
assert sensor_one_states[0].last_changed == past_2038_time
assert sensor_one_states[0].last_updated == past_2038_time

View file

@ -0,0 +1,624 @@
"""The tests the History component."""
from __future__ import annotations
# pylint: disable=protected-access,invalid-name
from copy import copy
from datetime import datetime, timedelta
import importlib
import json
import sys
from unittest.mock import patch, sentinel
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from homeassistant.components import recorder
from homeassistant.components.recorder import core, history, statistics
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.components.recorder.util import session_scope
from homeassistant.core import State
from homeassistant.helpers.json import JSONEncoder
import homeassistant.util.dt as dt_util
from .common import wait_recording_done
CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine"
SCHEMA_MODULE = "tests.components.recorder.db_schema_30"
def _create_engine_test(*args, **kwargs):
"""Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema.
"""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
engine = create_engine(*args, **kwargs)
old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(
recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
)
session.add(
recorder.db_schema.SchemaChanges(
schema_version=old_db_schema.SCHEMA_VERSION
)
)
session.commit()
return engine
@pytest.fixture(autouse=True)
def db_schema_30():
"""Fixture to initialize the db with the old schema."""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch.object(core, "EventData", old_db_schema.EventData), patch.object(
core, "States", old_db_schema.States
), patch.object(
core, "Events", old_db_schema.Events
), patch.object(
core, "StateAttributes", old_db_schema.StateAttributes
), patch(
CREATE_ENGINE_TARGET, new=_create_engine_test
):
yield
def test_get_full_significant_states_with_session_entity_no_matches(hass_recorder):
"""Test getting states at a specific point in time for entities that never have been recorded."""
hass = hass_recorder()
now = dt_util.utcnow()
time_before_recorder_ran = now - timedelta(days=1000)
with session_scope(hass=hass) as session:
assert (
history.get_full_significant_states_with_session(
hass, session, time_before_recorder_ran, now, entity_ids=["demo.id"]
)
== {}
)
assert (
history.get_full_significant_states_with_session(
hass,
session,
time_before_recorder_ran,
now,
entity_ids=["demo.id", "demo.id2"],
)
== {}
)
def test_significant_states_with_session_entity_minimal_response_no_matches(
hass_recorder,
):
"""Test getting states at a specific point in time for entities that never have been recorded."""
hass = hass_recorder()
now = dt_util.utcnow()
time_before_recorder_ran = now - timedelta(days=1000)
with session_scope(hass=hass) as session:
assert (
history.get_significant_states_with_session(
hass,
session,
time_before_recorder_ran,
now,
entity_ids=["demo.id"],
minimal_response=True,
)
== {}
)
assert (
history.get_significant_states_with_session(
hass,
session,
time_before_recorder_ran,
now,
entity_ids=["demo.id", "demo.id2"],
minimal_response=True,
)
== {}
)
@pytest.mark.parametrize(
"attributes, no_attributes, limit",
[
({"attr": True}, False, 5000),
({}, True, 5000),
({"attr": True}, False, 3),
({}, True, 3),
],
)
def test_state_changes_during_period(hass_recorder, attributes, no_attributes, limit):
"""Test state change during period."""
hass = hass_recorder()
entity_id = "media_player.test"
def set_state(state):
"""Set the state."""
hass.states.set(entity_id, state, attributes)
wait_recording_done(hass)
return hass.states.get(entity_id)
start = dt_util.utcnow()
point = start + timedelta(seconds=1)
end = point + timedelta(seconds=1)
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
):
set_state("idle")
set_state("YouTube")
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point
):
states = [
set_state("idle"),
set_state("Netflix"),
set_state("Plex"),
set_state("YouTube"),
]
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=end
):
set_state("Netflix")
set_state("Plex")
hist = history.state_changes_during_period(
hass, start, end, entity_id, no_attributes, limit=limit
)
assert states[:limit] == hist[entity_id]
def test_state_changes_during_period_descending(hass_recorder):
"""Test state change during period descending."""
hass = hass_recorder()
entity_id = "media_player.test"
def set_state(state):
"""Set the state."""
hass.states.set(entity_id, state, {"any": 1})
wait_recording_done(hass)
return hass.states.get(entity_id)
start = dt_util.utcnow()
point = start + timedelta(seconds=1)
point2 = start + timedelta(seconds=1, microseconds=2)
point3 = start + timedelta(seconds=1, microseconds=3)
point4 = start + timedelta(seconds=1, microseconds=4)
end = point + timedelta(seconds=1)
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
):
set_state("idle")
set_state("YouTube")
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point
):
states = [set_state("idle")]
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point2
):
states.append(set_state("Netflix"))
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point3
):
states.append(set_state("Plex"))
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point4
):
states.append(set_state("YouTube"))
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=end
):
set_state("Netflix")
set_state("Plex")
hist = history.state_changes_during_period(
hass, start, end, entity_id, no_attributes=False, descending=False
)
assert states == hist[entity_id]
hist = history.state_changes_during_period(
hass, start, end, entity_id, no_attributes=False, descending=True
)
assert states == list(reversed(list(hist[entity_id])))
def test_get_last_state_changes(hass_recorder):
"""Test number of state changes."""
hass = hass_recorder()
entity_id = "sensor.test"
def set_state(state):
"""Set the state."""
hass.states.set(entity_id, state)
wait_recording_done(hass)
return hass.states.get(entity_id)
start = dt_util.utcnow() - timedelta(minutes=2)
point = start + timedelta(minutes=1)
point2 = point + timedelta(minutes=1)
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
):
set_state("1")
states = []
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point
):
states.append(set_state("2"))
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point2
):
states.append(set_state("3"))
hist = history.get_last_state_changes(hass, 2, entity_id)
assert states == hist[entity_id]
def test_ensure_state_can_be_copied(hass_recorder):
"""Ensure a state can pass though copy().
The filter integration uses copy() on states
from history.
"""
hass = hass_recorder()
entity_id = "sensor.test"
def set_state(state):
"""Set the state."""
hass.states.set(entity_id, state)
wait_recording_done(hass)
return hass.states.get(entity_id)
start = dt_util.utcnow() - timedelta(minutes=2)
point = start + timedelta(minutes=1)
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
):
set_state("1")
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point
):
set_state("2")
hist = history.get_last_state_changes(hass, 2, entity_id)
assert copy(hist[entity_id][0]) == hist[entity_id][0]
assert copy(hist[entity_id][1]) == hist[entity_id][1]
def test_get_significant_states(hass_recorder):
"""Test that only significant states are returned.
We should get back every thermostat change that
includes an attribute change, but only the state updates for
media player (attribute changes are not significant and not returned).
"""
hass = hass_recorder()
zero, four, states = record_states(hass)
hist = history.get_significant_states(hass, zero, four)
assert states == hist
def test_get_significant_states_minimal_response(hass_recorder):
"""Test that only significant states are returned.
When minimal responses is set only the first and
last states return a complete state.
We should get back every thermostat change that
includes an attribute change, but only the state updates for
media player (attribute changes are not significant and not returned).
"""
hass = hass_recorder()
zero, four, states = record_states(hass)
hist = history.get_significant_states(hass, zero, four, minimal_response=True)
entites_with_reducable_states = [
"media_player.test",
"media_player.test3",
]
# All states for media_player.test state are reduced
# down to last_changed and state when minimal_response
# is set except for the first state.
# is set. We use JSONEncoder to make sure that are
# pre-encoded last_changed is always the same as what
# will happen with encoding a native state
for entity_id in entites_with_reducable_states:
entity_states = states[entity_id]
for state_idx in range(1, len(entity_states)):
input_state = entity_states[state_idx]
orig_last_changed = orig_last_changed = json.dumps(
process_timestamp(input_state.last_changed),
cls=JSONEncoder,
).replace('"', "")
orig_state = input_state.state
entity_states[state_idx] = {
"last_changed": orig_last_changed,
"state": orig_state,
}
assert states == hist
def test_get_significant_states_with_initial(hass_recorder):
"""Test that only significant states are returned.
We should get back every thermostat change that
includes an attribute change, but only the state updates for
media player (attribute changes are not significant and not returned).
"""
hass = hass_recorder()
zero, four, states = record_states(hass)
one = zero + timedelta(seconds=1)
one_and_half = zero + timedelta(seconds=1.5)
for entity_id in states:
if entity_id == "media_player.test":
states[entity_id] = states[entity_id][1:]
for state in states[entity_id]:
if state.last_changed == one:
state.last_changed = one_and_half
hist = history.get_significant_states(
hass,
one_and_half,
four,
include_start_time_state=True,
)
assert states == hist
def test_get_significant_states_without_initial(hass_recorder):
"""Test that only significant states are returned.
We should get back every thermostat change that
includes an attribute change, but only the state updates for
media player (attribute changes are not significant and not returned).
"""
hass = hass_recorder()
zero, four, states = record_states(hass)
one = zero + timedelta(seconds=1)
one_and_half = zero + timedelta(seconds=1.5)
for entity_id in states:
states[entity_id] = list(
filter(lambda s: s.last_changed != one, states[entity_id])
)
del states["media_player.test2"]
hist = history.get_significant_states(
hass,
one_and_half,
four,
include_start_time_state=False,
)
assert states == hist
def test_get_significant_states_entity_id(hass_recorder):
"""Test that only significant states are returned for one entity."""
hass = hass_recorder()
zero, four, states = record_states(hass)
del states["media_player.test2"]
del states["media_player.test3"]
del states["thermostat.test"]
del states["thermostat.test2"]
del states["script.can_cancel_this_one"]
hist = history.get_significant_states(hass, zero, four, ["media_player.test"])
assert states == hist
def test_get_significant_states_multiple_entity_ids(hass_recorder):
"""Test that only significant states are returned for one entity."""
hass = hass_recorder()
zero, four, states = record_states(hass)
del states["media_player.test2"]
del states["media_player.test3"]
del states["thermostat.test2"]
del states["script.can_cancel_this_one"]
hist = history.get_significant_states(
hass,
zero,
four,
["media_player.test", "thermostat.test"],
)
assert states == hist
def test_get_significant_states_are_ordered(hass_recorder):
"""Test order of results from get_significant_states.
When entity ids are given, the results should be returned with the data
in the same order.
"""
hass = hass_recorder()
zero, four, _states = record_states(hass)
entity_ids = ["media_player.test", "media_player.test2"]
hist = history.get_significant_states(hass, zero, four, entity_ids)
assert list(hist.keys()) == entity_ids
entity_ids = ["media_player.test2", "media_player.test"]
hist = history.get_significant_states(hass, zero, four, entity_ids)
assert list(hist.keys()) == entity_ids
def test_get_significant_states_only(hass_recorder):
"""Test significant states when significant_states_only is set."""
hass = hass_recorder()
entity_id = "sensor.test"
def set_state(state, **kwargs):
"""Set the state."""
hass.states.set(entity_id, state, **kwargs)
wait_recording_done(hass)
return hass.states.get(entity_id)
start = dt_util.utcnow() - timedelta(minutes=4)
points = []
for i in range(1, 4):
points.append(start + timedelta(minutes=i))
states = []
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
):
set_state("123", attributes={"attribute": 10.64})
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow",
return_value=points[0],
):
# Attributes are different, state not
states.append(set_state("123", attributes={"attribute": 21.42}))
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow",
return_value=points[1],
):
# state is different, attributes not
states.append(set_state("32", attributes={"attribute": 21.42}))
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow",
return_value=points[2],
):
# everything is different
states.append(set_state("412", attributes={"attribute": 54.23}))
hist = history.get_significant_states(hass, start, significant_changes_only=True)
assert len(hist[entity_id]) == 2
assert states[0] not in hist[entity_id]
assert states[1] in hist[entity_id]
assert states[2] in hist[entity_id]
hist = history.get_significant_states(hass, start, significant_changes_only=False)
assert len(hist[entity_id]) == 3
assert states == hist[entity_id]
def record_states(hass) -> tuple[datetime, datetime, dict[str, list[State]]]:
"""Record some test states.
We inject a bunch of state updates from media player, zone and
thermostat.
"""
mp = "media_player.test"
mp2 = "media_player.test2"
mp3 = "media_player.test3"
therm = "thermostat.test"
therm2 = "thermostat.test2"
zone = "zone.home"
script_c = "script.can_cancel_this_one"
def set_state(entity_id, state, **kwargs):
"""Set the state."""
hass.states.set(entity_id, state, **kwargs)
wait_recording_done(hass)
return hass.states.get(entity_id)
zero = dt_util.utcnow()
one = zero + timedelta(seconds=1)
two = one + timedelta(seconds=1)
three = two + timedelta(seconds=1)
four = three + timedelta(seconds=1)
states = {therm: [], therm2: [], mp: [], mp2: [], mp3: [], script_c: []}
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=one
):
states[mp].append(
set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)})
)
states[mp].append(
set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt2)})
)
states[mp2].append(
set_state(mp2, "YouTube", attributes={"media_title": str(sentinel.mt2)})
)
states[mp3].append(
set_state(mp3, "idle", attributes={"media_title": str(sentinel.mt1)})
)
states[therm].append(
set_state(therm, 20, attributes={"current_temperature": 19.5})
)
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=two
):
# This state will be skipped only different in time
set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt3)})
# This state will be skipped because domain is excluded
set_state(zone, "zoning")
states[script_c].append(
set_state(script_c, "off", attributes={"can_cancel": True})
)
states[therm].append(
set_state(therm, 21, attributes={"current_temperature": 19.8})
)
states[therm2].append(
set_state(therm2, 20, attributes={"current_temperature": 19})
)
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=three
):
states[mp].append(
set_state(mp, "Netflix", attributes={"media_title": str(sentinel.mt4)})
)
states[mp3].append(
set_state(mp3, "Netflix", attributes={"media_title": str(sentinel.mt3)})
)
# Attributes changed even though state is the same
states[therm].append(
set_state(therm, 21, attributes={"current_temperature": 20})
)
return zero, four, states
def test_state_changes_during_period_multiple_entities_single_test(hass_recorder):
"""Test state change during period with multiple entities in the same test.
This test ensures the sqlalchemy query cache does not
generate incorrect results.
"""
hass = hass_recorder()
start = dt_util.utcnow()
test_entites = {f"sensor.{i}": str(i) for i in range(30)}
for entity_id, value in test_entites.items():
hass.states.set(entity_id, value)
wait_recording_done(hass)
end = dt_util.utcnow()
hist = history.state_changes_during_period(hass, start, end, None)
for entity_id, value in test_entites.items():
hist[entity_id][0].state == value
for entity_id, value in test_entites.items():
hist = history.state_changes_during_period(hass, start, end, entity_id)
assert len(hist) == 1
hist[entity_id][0].state == value
hist = history.state_changes_during_period(hass, start, end, None)
for entity_id, value in test_entites.items():
hist[entity_id][0].state == value

View file

@ -58,6 +58,27 @@ def test_from_event_to_db_state_attributes():
assert StateAttributes.from_event(event).to_native() == attrs
def test_repr():
"""Test converting event to db state repr."""
attrs = {"this_attr": True}
fixed_time = datetime(2016, 7, 9, 11, 0, 0, tzinfo=dt.UTC, microsecond=432432)
state = ha.State(
"sensor.temperature",
"18",
attrs,
last_changed=fixed_time,
last_updated=fixed_time,
)
event = ha.Event(
EVENT_STATE_CHANGED,
{"entity_id": "sensor.temperature", "old_state": None, "new_state": state},
context=state.context,
time_fired=fixed_time,
)
assert "2016-07-09 11:00:00+00:00" in repr(States.from_event(event))
assert "2016-07-09 11:00:00+00:00" in repr(Events.from_event(event))
def test_handling_broken_json_state_attributes(caplog):
"""Test we handle broken json in state attributes."""
state_attributes = StateAttributes(
@ -81,8 +102,8 @@ def test_from_event_to_delete_state():
assert db_state.entity_id == "sensor.temperature"
assert db_state.state == ""
assert db_state.last_changed is None
assert db_state.last_updated == event.time_fired
assert db_state.last_changed_ts is None
assert db_state.last_updated_ts == event.time_fired.timestamp()
def test_entity_ids():
@ -251,7 +272,7 @@ async def test_lazy_state_handles_include_json(caplog):
entity_id="sensor.invalid",
shared_attrs="{INVALID_JSON}",
)
assert LazyState(row, {}).attributes == {}
assert LazyState(row, {}, None).attributes == {}
assert "Error converting row to state attributes" in caplog.text
@ -262,7 +283,7 @@ async def test_lazy_state_prefers_shared_attrs_over_attrs(caplog):
shared_attrs='{"shared":true}',
attributes='{"shared":false}',
)
assert LazyState(row, {}).attributes == {"shared": True}
assert LazyState(row, {}, None).attributes == {"shared": True}
async def test_lazy_state_handles_different_last_updated_and_last_changed(caplog):
@ -272,10 +293,10 @@ async def test_lazy_state_handles_different_last_updated_and_last_changed(caplog
entity_id="sensor.valid",
state="off",
shared_attrs='{"shared":true}',
last_updated=now,
last_changed=now - timedelta(seconds=60),
last_updated_ts=now.timestamp(),
last_changed_ts=(now - timedelta(seconds=60)).timestamp(),
)
lstate = LazyState(row, {})
lstate = LazyState(row, {}, None)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
@ -283,8 +304,8 @@ async def test_lazy_state_handles_different_last_updated_and_last_changed(caplog
"last_updated": "2021-06-12T03:04:01.000323+00:00",
"state": "off",
}
assert lstate.last_updated == row.last_updated
assert lstate.last_changed == row.last_changed
assert lstate.last_updated.timestamp() == row.last_updated_ts
assert lstate.last_changed.timestamp() == row.last_changed_ts
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
@ -301,10 +322,10 @@ async def test_lazy_state_handles_same_last_updated_and_last_changed(caplog):
entity_id="sensor.valid",
state="off",
shared_attrs='{"shared":true}',
last_updated=now,
last_changed=now,
last_updated_ts=now.timestamp(),
last_changed_ts=now.timestamp(),
)
lstate = LazyState(row, {})
lstate = LazyState(row, {}, None)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
@ -312,8 +333,8 @@ async def test_lazy_state_handles_same_last_updated_and_last_changed(caplog):
"last_updated": "2021-06-12T03:04:01.000323+00:00",
"state": "off",
}
assert lstate.last_updated == row.last_updated
assert lstate.last_changed == row.last_changed
assert lstate.last_updated.timestamp() == row.last_updated_ts
assert lstate.last_changed.timestamp() == row.last_changed_ts
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",

View file

@ -460,7 +460,7 @@ async def test_purge_edge_case(
event_type="EVENT_TEST_PURGE",
event_data="{}",
origin="LOCAL",
time_fired=timestamp,
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
session.add(
@ -468,8 +468,8 @@ async def test_purge_edge_case(
entity_id="test.recorder2",
state="purgeme",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
event_id=1001,
attributes_id=1002,
)
@ -529,7 +529,7 @@ async def test_purge_cutoff_date(
event_type="KEEP",
event_data="{}",
origin="LOCAL",
time_fired=timestamp_keep,
time_fired_ts=dt_util.utc_to_timestamp(timestamp_keep),
)
)
session.add(
@ -537,8 +537,8 @@ async def test_purge_cutoff_date(
entity_id="test.cutoff",
state="keep",
attributes="{}",
last_changed=timestamp_keep,
last_updated=timestamp_keep,
last_changed_ts=dt_util.utc_to_timestamp(timestamp_keep),
last_updated_ts=dt_util.utc_to_timestamp(timestamp_keep),
event_id=1000,
attributes_id=1000,
)
@ -557,7 +557,7 @@ async def test_purge_cutoff_date(
event_type="PURGE",
event_data="{}",
origin="LOCAL",
time_fired=timestamp_purge,
time_fired_ts=dt_util.utc_to_timestamp(timestamp_purge),
)
)
session.add(
@ -565,8 +565,8 @@ async def test_purge_cutoff_date(
entity_id="test.cutoff",
state="purge",
attributes="{}",
last_changed=timestamp_purge,
last_updated=timestamp_purge,
last_changed_ts=dt_util.utc_to_timestamp(timestamp_purge),
last_updated_ts=dt_util.utc_to_timestamp(timestamp_purge),
event_id=1000 + row,
attributes_id=1000 + row,
)
@ -690,8 +690,8 @@ async def test_purge_filtered_states(
entity_id="sensor.excluded",
state="purgeme",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
)
)
# Add states and state_changed events that should be keeped
@ -716,8 +716,8 @@ async def test_purge_filtered_states(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=1,
state_attributes=state_attrs,
)
@ -726,8 +726,8 @@ async def test_purge_filtered_states(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=2,
state_attributes=state_attrs,
)
@ -735,8 +735,8 @@ async def test_purge_filtered_states(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=62, # keep
state_attributes=state_attrs,
)
@ -748,7 +748,7 @@ async def test_purge_filtered_states(
event_type="EVENT_KEEP",
event_data="{}",
origin="LOCAL",
time_fired=timestamp,
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
@ -920,8 +920,8 @@ async def test_purge_without_state_attributes_filtered_states_to_empty(
entity_id="sensor.old_format",
state=STATE_ON,
attributes=json.dumps({"old": "not_using_state_attributes"}),
last_changed=timestamp,
last_updated=timestamp,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
event_id=event_id,
state_attributes=None,
)
@ -932,7 +932,7 @@ async def test_purge_without_state_attributes_filtered_states_to_empty(
event_type=EVENT_STATE_CHANGED,
event_data="{}",
origin="LOCAL",
time_fired=timestamp,
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
session.add(
@ -941,7 +941,7 @@ async def test_purge_without_state_attributes_filtered_states_to_empty(
event_type=EVENT_THEMES_UPDATED,
event_data="{}",
origin="LOCAL",
time_fired=timestamp,
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
@ -993,7 +993,7 @@ async def test_purge_filtered_events(
event_type="EVENT_PURGE",
event_data="{}",
origin="LOCAL",
time_fired=timestamp,
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
@ -1093,7 +1093,7 @@ async def test_purge_filtered_events_state_changed(
event_type="EVENT_KEEP",
event_data="{}",
origin="LOCAL",
time_fired=timestamp,
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
# Add states with linked old_state_ids that need to be handled
@ -1102,8 +1102,8 @@ async def test_purge_filtered_events_state_changed(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=1,
)
timestamp = dt_util.utcnow() - timedelta(days=4)
@ -1111,16 +1111,16 @@ async def test_purge_filtered_events_state_changed(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=2,
)
state_3 = States(
entity_id="sensor.linked_old_state_id",
state="keep",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
old_state_id=62, # keep
)
session.add_all((state_1, state_2, state_3))
@ -1355,7 +1355,7 @@ async def _add_test_events(hass: HomeAssistant, iterations: int = 1):
event_type=event_type,
event_data=json.dumps(event_data),
origin="LOCAL",
time_fired=timestamp,
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
@ -1392,7 +1392,7 @@ async def _add_events_with_event_data(hass: HomeAssistant, iterations: int = 1):
Events(
event_type=event_type,
origin="LOCAL",
time_fired=timestamp,
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
event_data_rel=event_data,
)
)
@ -1494,8 +1494,8 @@ def _add_state_without_event_linkage(
entity_id=entity_id,
state=state,
attributes=None,
last_changed=timestamp,
last_updated=timestamp,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
event_id=None,
state_attributes=state_attrs,
)
@ -1519,8 +1519,8 @@ def _add_state_and_state_changed_event(
entity_id=entity_id,
state=state,
attributes=None,
last_changed=timestamp,
last_updated=timestamp,
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
event_id=event_id,
state_attributes=state_attrs,
)
@ -1531,7 +1531,7 @@ def _add_state_and_state_changed_event(
event_type=EVENT_STATE_CHANGED,
event_data="{}",
origin="LOCAL",
time_fired=timestamp,
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
@ -1600,8 +1600,8 @@ async def test_purge_can_mix_legacy_and_new_format(
broken_state_no_time = States(
event_id=None,
entity_id="orphened.state",
last_updated=None,
last_changed=None,
last_updated_ts=None,
last_changed_ts=None,
)
session.add(broken_state_no_time)
start_id = 50000

View file

@ -0,0 +1,135 @@
"""The tests for recorder platform migrating data from v30."""
# pylint: disable=protected-access,invalid-name
from datetime import timedelta
import importlib
import sys
from unittest.mock import patch
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from homeassistant.components import recorder
from homeassistant.components.recorder import SQLITE_URL_PREFIX, core, statistics
from homeassistant.components.recorder.util import session_scope
from homeassistant.core import EVENT_STATE_CHANGED, Event, EventOrigin, State
from homeassistant.helpers import recorder as recorder_helper
from homeassistant.setup import setup_component
import homeassistant.util.dt as dt_util
from .common import wait_recording_done
from tests.common import get_test_home_assistant
ORIG_TZ = dt_util.DEFAULT_TIME_ZONE
CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine"
SCHEMA_MODULE = "tests.components.recorder.db_schema_30"
def _create_engine_test(*args, **kwargs):
"""Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema.
"""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
engine = create_engine(*args, **kwargs)
old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(
recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
)
session.add(
recorder.db_schema.SchemaChanges(
schema_version=old_db_schema.SCHEMA_VERSION
)
)
session.commit()
return engine
def test_migrate_times(caplog, tmpdir):
"""Test we can migrate times."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
now = dt_util.utcnow()
one_second_past = now - timedelta(seconds=1)
now_timestamp = now.timestamp()
one_second_past_timestamp = one_second_past.timestamp()
mock_state = State(
"sensor.test",
"old",
{"last_reset": now.isoformat()},
last_changed=one_second_past,
last_updated=now,
)
state_changed_event = Event(
EVENT_STATE_CHANGED,
{
"entity_id": "sensor.test",
"old_state": None,
"new_state": mock_state,
},
EventOrigin.local,
time_fired=now,
)
custom_event = Event(
"custom_event",
{"entity_id": "sensor.custom"},
EventOrigin.local,
time_fired=now,
)
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch.object(core, "EventData", old_db_schema.EventData), patch.object(
core, "States", old_db_schema.States
), patch.object(
core, "Events", old_db_schema.Events
), patch(
CREATE_ENGINE_TARGET, new=_create_engine_test
):
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
wait_recording_done(hass)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
session.add(old_db_schema.Events.from_event(custom_event))
session.add(old_db_schema.States.from_event(state_changed_event))
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
# Test that the duplicates are removed during migration from schema 23
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
hass.start()
wait_recording_done(hass)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
result = list(
session.query(recorder.db_schema.Events).where(
recorder.db_schema.Events.event_type == "custom_event"
)
)
assert len(result) == 1
assert result[0].time_fired_ts == now_timestamp
result = list(
session.query(recorder.db_schema.States).where(
recorder.db_schema.States.entity_id == "sensor.test"
)
)
assert len(result) == 1
assert result[0].last_changed_ts == one_second_past_timestamp
assert result[0].last_updated_ts == now_timestamp
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ