Separate attrs into another table (reduces database size) (#68224)

This commit is contained in:
J. Nick Koston 2022-03-18 00:23:13 -10:00 committed by GitHub
parent d7145095ef
commit 9215702388
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 788 additions and 98 deletions

View file

@ -5,6 +5,7 @@ from http import HTTPStatus
from itertools import groupby from itertools import groupby
import json import json
import re import re
from typing import Any
import sqlalchemy import sqlalchemy
from sqlalchemy.orm import aliased from sqlalchemy.orm import aliased
@ -18,6 +19,7 @@ from homeassistant.components.http import HomeAssistantView
from homeassistant.components.recorder import get_instance from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import ( from homeassistant.components.recorder.models import (
Events, Events,
StateAttributes,
States, States,
process_timestamp_to_utc_isoformat, process_timestamp_to_utc_isoformat,
) )
@ -494,6 +496,7 @@ def _generate_events_query(session):
States.entity_id, States.entity_id,
States.domain, States.domain,
States.attributes, States.attributes,
StateAttributes.shared_attrs,
) )
@ -504,6 +507,7 @@ def _generate_events_query_without_states(session):
literal(value=None, type_=sqlalchemy.String).label("entity_id"), literal(value=None, type_=sqlalchemy.String).label("entity_id"),
literal(value=None, type_=sqlalchemy.String).label("domain"), literal(value=None, type_=sqlalchemy.String).label("domain"),
literal(value=None, type_=sqlalchemy.Text).label("attributes"), literal(value=None, type_=sqlalchemy.Text).label("attributes"),
literal(value=None, type_=sqlalchemy.Text).label("shared_attrs"),
) )
@ -519,6 +523,9 @@ def _generate_states_query(session, start_day, end_day, old_state, entity_ids):
(States.last_updated == States.last_changed) (States.last_updated == States.last_changed)
& States.entity_id.in_(entity_ids) & States.entity_id.in_(entity_ids)
) )
.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
) )
@ -534,7 +541,9 @@ def _apply_events_types_and_states_filter(hass, query, old_state):
(Events.event_type != EVENT_STATE_CHANGED) | _continuous_entity_matcher() (Events.event_type != EVENT_STATE_CHANGED) | _continuous_entity_matcher()
) )
) )
return _apply_event_types_filter(hass, events_query, ALL_EVENT_TYPES) return _apply_event_types_filter(hass, events_query, ALL_EVENT_TYPES).outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
def _missing_state_matcher(old_state): def _missing_state_matcher(old_state):
@ -556,6 +565,9 @@ def _continuous_entity_matcher():
return sqlalchemy.or_( return sqlalchemy.or_(
sqlalchemy.not_(States.domain.in_(CONTINUOUS_DOMAINS)), sqlalchemy.not_(States.domain.in_(CONTINUOUS_DOMAINS)),
sqlalchemy.not_(States.attributes.contains(UNIT_OF_MEASUREMENT_JSON)), sqlalchemy.not_(States.attributes.contains(UNIT_OF_MEASUREMENT_JSON)),
sqlalchemy.not_(
StateAttributes.shared_attrs.contains(UNIT_OF_MEASUREMENT_JSON)
),
) )
@ -709,8 +721,9 @@ class LazyEventPartialState:
"""Extract the icon from the decoded attributes or json.""" """Extract the icon from the decoded attributes or json."""
if self._attributes: if self._attributes:
return self._attributes.get(ATTR_ICON) return self._attributes.get(ATTR_ICON)
result = ICON_JSON_EXTRACT.search(
result = ICON_JSON_EXTRACT.search(self._row.attributes) self._row.shared_attrs or self._row.attributes
)
return result and result.group(1) return result and result.group(1)
@property @property
@ -734,14 +747,12 @@ class LazyEventPartialState:
@property @property
def attributes(self): def attributes(self):
"""State attributes.""" """State attributes."""
if not self._attributes: if self._attributes is None:
if ( source = self._row.shared_attrs or self._row.attributes
self._row.attributes is None if source == EMPTY_JSON_OBJECT or source is None:
or self._row.attributes == EMPTY_JSON_OBJECT
):
self._attributes = {} self._attributes = {}
else: else:
self._attributes = json.loads(self._row.attributes) self._attributes = json.loads(source)
return self._attributes return self._attributes
@property @property
@ -772,12 +783,12 @@ class EntityAttributeCache:
that are expected to change state. that are expected to change state.
""" """
def __init__(self, hass): def __init__(self, hass: HomeAssistant) -> None:
"""Init the cache.""" """Init the cache."""
self._hass = hass self._hass = hass
self._cache = {} self._cache: dict[str, dict[str, Any]] = {}
def get(self, entity_id, attribute, event): def get(self, entity_id: str, attribute: str, event: LazyEventPartialState) -> Any:
"""Lookup an attribute for an entity or get it from the cache.""" """Lookup an attribute for an entity or get it from the cache."""
if entity_id in self._cache: if entity_id in self._cache:
if attribute in self._cache[entity_id]: if attribute in self._cache[entity_id]:

View file

@ -7,7 +7,7 @@ import logging
import voluptuous as vol import voluptuous as vol
from homeassistant.components.recorder import get_instance from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import States from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.util import execute, session_scope from homeassistant.components.recorder.util import execute, session_scope
from homeassistant.const import ( from homeassistant.const import (
ATTR_TEMPERATURE, ATTR_TEMPERATURE,
@ -110,11 +110,6 @@ DOMAIN = "plant"
CONFIG_SCHEMA = vol.Schema({DOMAIN: {cv.string: PLANT_SCHEMA}}, extra=vol.ALLOW_EXTRA) CONFIG_SCHEMA = vol.Schema({DOMAIN: {cv.string: PLANT_SCHEMA}}, extra=vol.ALLOW_EXTRA)
# Flag for enabling/disabling the loading of the history from the database.
# This feature is turned off right now as its tests are not 100% stable.
ENABLE_LOAD_HISTORY = False
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Plant component.""" """Set up the Plant component."""
component = EntityComponent(_LOGGER, DOMAIN, hass) component = EntityComponent(_LOGGER, DOMAIN, hass)
@ -282,7 +277,7 @@ class Plant(Entity):
async def async_added_to_hass(self): async def async_added_to_hass(self):
"""After being added to hass, load from history.""" """After being added to hass, load from history."""
if ENABLE_LOAD_HISTORY and "recorder" in self.hass.config.components: if "recorder" in self.hass.config.components:
# only use the database if it's configured # only use the database if it's configured
await get_instance(self.hass).async_add_executor_job( await get_instance(self.hass).async_add_executor_job(
self._load_history_from_db self._load_history_from_db
@ -315,14 +310,24 @@ class Plant(Entity):
_LOGGER.debug("Initializing values for %s from the database", self._name) _LOGGER.debug("Initializing values for %s from the database", self._name)
with session_scope(hass=self.hass) as session: with session_scope(hass=self.hass) as session:
query = ( query = (
session.query(States) session.query(States, StateAttributes)
.filter( .filter(
(States.entity_id == entity_id.lower()) (States.entity_id == entity_id.lower())
and (States.last_updated > start_date) and (States.last_updated > start_date)
) )
.outerjoin(
StateAttributes,
States.attributes_id == StateAttributes.attributes_id,
)
.order_by(States.last_updated.asc()) .order_by(States.last_updated.asc())
) )
states = execute(query, to_native=True, validate_entity_ids=False) states = []
if results := execute(query, to_native=False, validate_entity_ids=False):
for state, attributes in results:
native = state.to_native()
if not native.attributes:
native.attributes = attributes.to_native()
states.append(native)
for state in states: for state in states:
# filter out all None, NaN and "unknown" states # filter out all None, NaN and "unknown" states

View file

@ -14,6 +14,7 @@ import threading
import time import time
from typing import Any, TypeVar from typing import Any, TypeVar
from lru import LRU # pylint: disable=no-name-in-module
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.orm import scoped_session, sessionmaker
@ -67,6 +68,7 @@ from .models import (
Base, Base,
Events, Events,
RecorderRuns, RecorderRuns,
StateAttributes,
States, States,
StatisticsRuns, StatisticsRuns,
process_timestamp, process_timestamp,
@ -131,6 +133,15 @@ KEEPALIVE_TIME = 30
# States and Events objects # States and Events objects
EXPIRE_AFTER_COMMITS = 120 EXPIRE_AFTER_COMMITS = 120
# The number of attribute ids to cache in memory
#
# Based on:
# - The number of overlapping attributes
# - How frequently states with overlapping attributes will change
# - How much memory our low end hardware has
STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048
DB_LOCK_TIMEOUT = 30 DB_LOCK_TIMEOUT = 30
DB_LOCK_QUEUE_CHECK_TIMEOUT = 1 DB_LOCK_QUEUE_CHECK_TIMEOUT = 1
@ -541,6 +552,8 @@ class Recorder(threading.Thread):
self._commits_without_expire = 0 self._commits_without_expire = 0
self._keepalive_count = 0 self._keepalive_count = 0
self._old_states: dict[str, States] = {} self._old_states: dict[str, States] = {}
self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE)
self._pending_state_attributes: dict[str, StateAttributes] = {}
self._pending_expunge: list[States] = [] self._pending_expunge: list[States] = []
self.event_session = None self.event_session = None
self.get_session = None self.get_session = None
@ -964,33 +977,58 @@ class Recorder(threading.Thread):
dbevent.event_data = None dbevent.event_data = None
else: else:
dbevent = Events.from_event(event) dbevent = Events.from_event(event)
self.event_session.add(dbevent)
except (TypeError, ValueError): except (TypeError, ValueError):
_LOGGER.warning("Event is not JSON serializable: %s", event) _LOGGER.warning("Event is not JSON serializable: %s", event)
return return
self.event_session.add(dbevent)
if event.event_type == EVENT_STATE_CHANGED: if event.event_type == EVENT_STATE_CHANGED:
try: try:
dbstate = States.from_event(event) dbstate = States.from_event(event)
has_new_state = event.data.get("new_state") dbstate_attributes = StateAttributes.from_event(event)
if dbstate.entity_id in self._old_states: except (TypeError, ValueError) as ex:
old_state = self._old_states.pop(dbstate.entity_id)
if old_state.state_id:
dbstate.old_state_id = old_state.state_id
else:
dbstate.old_state = old_state
if not has_new_state:
dbstate.state = None
dbstate.event = dbevent
self.event_session.add(dbstate)
if has_new_state:
self._old_states[dbstate.entity_id] = dbstate
self._pending_expunge.append(dbstate)
except (TypeError, ValueError):
_LOGGER.warning( _LOGGER.warning(
"State is not JSON serializable: %s", "State is not JSON serializable: %s: %s",
event.data.get("new_state"), event.data.get("new_state"),
ex,
) )
return
dbstate.attributes = None
shared_attrs = dbstate_attributes.shared_attrs
# Matching attributes found in the pending commit
if pending_attributes := self._pending_state_attributes.get(shared_attrs):
dbstate.state_attributes = pending_attributes
# Matching attributes id found in the cache
elif attributes_id := self._state_attributes_ids.get(shared_attrs):
dbstate.attributes_id = attributes_id
# Matching attributes found in the database
elif (
attributes := self.event_session.query(StateAttributes.attributes_id)
.filter(StateAttributes.hash == dbstate_attributes.hash)
.filter(StateAttributes.shared_attrs == shared_attrs)
.first()
):
dbstate.attributes_id = attributes[0]
self._state_attributes_ids[shared_attrs] = attributes[0]
# No matching attributes found, save them in the DB
else:
dbstate.state_attributes = dbstate_attributes
self._pending_state_attributes[shared_attrs] = dbstate_attributes
self.event_session.add(dbstate_attributes)
if old_state := self._old_states.pop(dbstate.entity_id, None):
if old_state.state_id:
dbstate.old_state_id = old_state.state_id
else:
dbstate.old_state = old_state
if event.data.get("new_state"):
self._old_states[dbstate.entity_id] = dbstate
self._pending_expunge.append(dbstate)
else:
dbstate.state = None
self.event_session.add(dbstate)
dbstate.event = dbevent
# If they do not have a commit interval # If they do not have a commit interval
# than we commit right away # than we commit right away
@ -1042,6 +1080,7 @@ class Recorder(threading.Thread):
if dbstate in self.event_session: if dbstate in self.event_session:
self.event_session.expunge(dbstate) self.event_session.expunge(dbstate)
self._pending_expunge = [] self._pending_expunge = []
self._pending_state_attributes = {}
self.event_session.commit() self.event_session.commit()
# Expire is an expensive operation (frequently more expensive # Expire is an expensive operation (frequently more expensive
@ -1062,6 +1101,8 @@ class Recorder(threading.Thread):
def _close_event_session(self): def _close_event_session(self):
"""Close the event session.""" """Close the event session."""
self._old_states = {} self._old_states = {}
self._state_attributes_ids = {}
self._pending_state_attributes = {}
if not self.event_session: if not self.event_session:
return return

View file

@ -13,7 +13,12 @@ from homeassistant.components import recorder
from homeassistant.core import split_entity_id from homeassistant.core import split_entity_id
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from .models import LazyState, States, process_timestamp_to_utc_isoformat from .models import (
LazyState,
StateAttributes,
States,
process_timestamp_to_utc_isoformat,
)
from .util import execute, session_scope from .util import execute, session_scope
# mypy: allow-untyped-defs, no-check-untyped-defs # mypy: allow-untyped-defs, no-check-untyped-defs
@ -46,6 +51,7 @@ QUERY_STATES = [
States.attributes, States.attributes,
States.last_changed, States.last_changed,
States.last_updated, States.last_updated,
StateAttributes.shared_attrs,
] ]
HISTORY_BAKERY = "recorder_history_bakery" HISTORY_BAKERY = "recorder_history_bakery"
@ -114,6 +120,9 @@ def get_significant_states_with_session(
if end_time is not None: if end_time is not None:
baked_query += lambda q: q.filter(States.last_updated < bindparam("end_time")) baked_query += lambda q: q.filter(States.last_updated < bindparam("end_time"))
baked_query += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
baked_query += lambda q: q.order_by(States.entity_id, States.last_updated) baked_query += lambda q: q.order_by(States.entity_id, States.last_updated)
states = execute( states = execute(
@ -159,6 +168,9 @@ def state_changes_during_period(hass, start_time, end_time=None, entity_id=None)
baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id")) baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id"))
entity_id = entity_id.lower() entity_id = entity_id.lower()
baked_query += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
baked_query += lambda q: q.order_by(States.entity_id, States.last_updated) baked_query += lambda q: q.order_by(States.entity_id, States.last_updated)
states = execute( states = execute(
@ -186,6 +198,9 @@ def get_last_state_changes(hass, number_of_states, entity_id):
baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id")) baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id"))
entity_id = entity_id.lower() entity_id = entity_id.lower()
baked_query += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
baked_query += lambda q: q.order_by( baked_query += lambda q: q.order_by(
States.entity_id, States.last_updated.desc() States.entity_id, States.last_updated.desc()
) )
@ -263,6 +278,8 @@ def _get_states_with_session(
query = query.join( query = query.join(
most_recent_state_ids, most_recent_state_ids,
States.state_id == most_recent_state_ids.c.max_state_id, States.state_id == most_recent_state_ids.c.max_state_id,
).outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
) )
else: else:
# We did not get an include-list of entities, query all states in the inner # We did not get an include-list of entities, query all states in the inner
@ -301,7 +318,9 @@ def _get_states_with_session(
query = query.filter(~States.domain.in_(IGNORE_DOMAINS)) query = query.filter(~States.domain.in_(IGNORE_DOMAINS))
if filters: if filters:
query = filters.apply(query) query = filters.apply(query)
query = query.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
return [LazyState(row) for row in execute(query)] return [LazyState(row) for row in execute(query)]
@ -315,6 +334,9 @@ def _get_single_entity_states_with_session(hass, session, utc_point_in_time, ent
States.last_updated < bindparam("utc_point_in_time"), States.last_updated < bindparam("utc_point_in_time"),
States.entity_id == bindparam("entity_id"), States.entity_id == bindparam("entity_id"),
) )
baked_query += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
baked_query += lambda q: q.order_by(States.last_updated.desc()) baked_query += lambda q: q.order_by(States.last_updated.desc())
baked_query += lambda q: q.limit(1) baked_query += lambda q: q.limit(1)

View file

@ -2,7 +2,7 @@
"domain": "recorder", "domain": "recorder",
"name": "Recorder", "name": "Recorder",
"documentation": "https://www.home-assistant.io/integrations/recorder", "documentation": "https://www.home-assistant.io/integrations/recorder",
"requirements": ["sqlalchemy==1.4.32"], "requirements": ["sqlalchemy==1.4.32","fnvhash==0.1.0","lru-dict==1.1.7"],
"codeowners": ["@home-assistant/core"], "codeowners": ["@home-assistant/core"],
"quality_scale": "internal", "quality_scale": "internal",
"iot_class": "local_push" "iot_class": "local_push"

View file

@ -638,6 +638,9 @@ def _apply_update(instance, new_version, old_version): # noqa: C901
"statistics_short_term", "statistics_short_term",
"ix_statistics_short_term_statistic_id_start", "ix_statistics_short_term_statistic_id_start",
) )
elif new_version == 25:
_add_columns(instance, "states", ["attributes_id INTEGER(20)"])
_create_index(instance, "states", "ix_states_attributes_id")
else: else:
raise ValueError(f"No schema migration defined for version {new_version}") raise ValueError(f"No schema migration defined for version {new_version}")

View file

@ -6,7 +6,9 @@ import json
import logging import logging
from typing import TypedDict, overload from typing import TypedDict, overload
from fnvhash import fnv1a_32
from sqlalchemy import ( from sqlalchemy import (
BigInteger,
Boolean, Boolean,
Column, Column,
DateTime, DateTime,
@ -40,7 +42,7 @@ import homeassistant.util.dt as dt_util
# pylint: disable=invalid-name # pylint: disable=invalid-name
Base = declarative_base() Base = declarative_base()
SCHEMA_VERSION = 24 SCHEMA_VERSION = 25
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -48,6 +50,7 @@ DB_TIMEZONE = "+00:00"
TABLE_EVENTS = "events" TABLE_EVENTS = "events"
TABLE_STATES = "states" TABLE_STATES = "states"
TABLE_STATE_ATTRIBUTES = "state_attributes"
TABLE_RECORDER_RUNS = "recorder_runs" TABLE_RECORDER_RUNS = "recorder_runs"
TABLE_SCHEMA_CHANGES = "schema_changes" TABLE_SCHEMA_CHANGES = "schema_changes"
TABLE_STATISTICS = "statistics" TABLE_STATISTICS = "statistics"
@ -66,6 +69,9 @@ ALL_TABLES = [
TABLE_STATISTICS_SHORT_TERM, TABLE_STATISTICS_SHORT_TERM,
] ]
EMPTY_JSON_OBJECT = "{}"
DATETIME_TYPE = DateTime(timezone=True).with_variant( DATETIME_TYPE = DateTime(timezone=True).with_variant(
mysql.DATETIME(timezone=True, fsp=6), "mysql" mysql.DATETIME(timezone=True, fsp=6), "mysql"
) )
@ -161,8 +167,12 @@ class States(Base): # type: ignore[misc,valid-type]
last_changed = Column(DATETIME_TYPE, default=dt_util.utcnow) last_changed = Column(DATETIME_TYPE, default=dt_util.utcnow)
last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True) last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True)
old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True) old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True)
attributes_id = Column(
Integer, ForeignKey("state_attributes.attributes_id"), index=True
)
event = relationship("Events", uselist=False) event = relationship("Events", uselist=False)
old_state = relationship("States", remote_side=[state_id]) old_state = relationship("States", remote_side=[state_id])
state_attributes = relationship("StateAttributes")
def __repr__(self) -> str: def __repr__(self) -> str:
"""Return string representation of instance for debugging.""" """Return string representation of instance for debugging."""
@ -171,7 +181,7 @@ class States(Base): # type: ignore[misc,valid-type]
f"id={self.state_id}, domain='{self.domain}', entity_id='{self.entity_id}', " f"id={self.state_id}, domain='{self.domain}', entity_id='{self.entity_id}', "
f"state='{self.state}', event_id='{self.event_id}', " f"state='{self.state}', event_id='{self.event_id}', "
f"last_updated='{self.last_updated.isoformat(sep=' ', timespec='seconds')}', " f"last_updated='{self.last_updated.isoformat(sep=' ', timespec='seconds')}', "
f"old_state_id={self.old_state_id}" f"old_state_id={self.old_state_id}, attributes_id={self.attributes_id}"
f")>" f")>"
) )
@ -182,20 +192,17 @@ class States(Base): # type: ignore[misc,valid-type]
state = event.data.get("new_state") state = event.data.get("new_state")
dbstate = States(entity_id=entity_id) dbstate = States(entity_id=entity_id)
dbstate.attributes = None
# State got deleted # State got deleted
if state is None: if state is None:
dbstate.state = "" dbstate.state = ""
dbstate.domain = split_entity_id(entity_id)[0] dbstate.domain = split_entity_id(entity_id)[0]
dbstate.attributes = "{}"
dbstate.last_changed = event.time_fired dbstate.last_changed = event.time_fired
dbstate.last_updated = event.time_fired dbstate.last_updated = event.time_fired
else: else:
dbstate.domain = state.domain dbstate.domain = state.domain
dbstate.state = state.state dbstate.state = state.state
dbstate.attributes = json.dumps(
dict(state.attributes), cls=JSONEncoder, separators=(",", ":")
)
dbstate.last_changed = state.last_changed dbstate.last_changed = state.last_changed
dbstate.last_updated = state.last_updated dbstate.last_updated = state.last_updated
@ -207,7 +214,9 @@ class States(Base): # type: ignore[misc,valid-type]
return State( return State(
self.entity_id, self.entity_id,
self.state, self.state,
json.loads(self.attributes), # Join the state_attributes table on attributes_id to get the attributes
# for newer states
json.loads(self.attributes) if self.attributes else {},
process_timestamp(self.last_changed), process_timestamp(self.last_changed),
process_timestamp(self.last_updated), process_timestamp(self.last_updated),
# Join the events table on event_id to get the context instead # Join the events table on event_id to get the context instead
@ -221,6 +230,53 @@ class States(Base): # type: ignore[misc,valid-type]
return None return None
class StateAttributes(Base): # type: ignore[misc,valid-type]
"""State attribute change history."""
__table_args__ = (
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATE_ATTRIBUTES
attributes_id = Column(Integer, Identity(), primary_key=True)
hash = Column(BigInteger, index=True)
# Note that this is not named attributes to avoid confusion with the states table
shared_attrs = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.StateAttributes("
f"id={self.attributes_id}, hash='{self.hash}', attributes='{self.shared_attrs}'"
f")>"
)
@staticmethod
def from_event(event):
"""Create object from a state_changed event."""
state = event.data.get("new_state")
dbstate = StateAttributes()
# State got deleted
if state is None:
dbstate.shared_attrs = "{}"
else:
dbstate.shared_attrs = json.dumps(
dict(state.attributes),
cls=JSONEncoder,
separators=(",", ":"),
)
dbstate.hash = fnv1a_32(dbstate.shared_attrs.encode("utf-8"))
return dbstate
def to_native(self):
"""Convert to an HA state object."""
try:
return json.loads(self.shared_attrs)
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting row to state attributes: %s", self)
return {}
class StatisticResult(TypedDict): class StatisticResult(TypedDict):
"""Statistic result data class. """Statistic result data class.
@ -492,12 +548,18 @@ class LazyState(State):
@property # type: ignore[override] @property # type: ignore[override]
def attributes(self): def attributes(self):
"""State attributes.""" """State attributes."""
if not self._attributes: if self._attributes is None:
source = self._row.shared_attrs or self._row.attributes
if source == EMPTY_JSON_OBJECT or source is None:
self._attributes = {}
return self._attributes
try: try:
self._attributes = json.loads(self._row.attributes) self._attributes = json.loads(source)
except ValueError: except ValueError:
# When json.loads fails # When json.loads fails
_LOGGER.exception("Error converting row to state: %s", self._row) _LOGGER.exception(
"Error converting row to state attributes: %s", self._row
)
self._attributes = {} self._attributes = {}
return self._attributes return self._attributes
@ -549,18 +611,22 @@ class LazyState(State):
To be used for JSON serialization. To be used for JSON serialization.
""" """
if self._last_changed: if self._last_changed is None and self._last_updated is None:
last_changed_isoformat = self._last_changed.isoformat()
else:
last_changed_isoformat = process_timestamp_to_utc_isoformat( last_changed_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_changed self._row.last_changed
) )
if self._last_updated: if self._row.last_changed == self._row.last_updated:
last_updated_isoformat = self._last_updated.isoformat() last_updated_isoformat = last_changed_isoformat
else:
last_updated_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_updated
)
else: else:
last_updated_isoformat = process_timestamp_to_utc_isoformat( last_changed_isoformat = self.last_changed.isoformat()
self._row.last_updated if self.last_changed == self.last_updated:
) last_updated_isoformat = last_changed_isoformat
else:
last_updated_isoformat = self.last_updated.isoformat()
return { return {
"entity_id": self.entity_id, "entity_id": self.entity_id,
"state": self.state, "state": self.state,

View file

@ -10,8 +10,17 @@ from sqlalchemy import func
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import distinct from sqlalchemy.sql.expression import distinct
from homeassistant.const import EVENT_STATE_CHANGED
from .const import MAX_ROWS_TO_PURGE from .const import MAX_ROWS_TO_PURGE
from .models import Events, RecorderRuns, States, StatisticsRuns, StatisticsShortTerm from .models import (
Events,
RecorderRuns,
StateAttributes,
States,
StatisticsRuns,
StatisticsShortTerm,
)
from .repack import repack_database from .repack import repack_database
from .util import retryable_database_job, session_scope from .util import retryable_database_job, session_scope
@ -37,7 +46,12 @@ def purge_old_data(
with session_scope(session=instance.get_session()) as session: # type: ignore[misc] with session_scope(session=instance.get_session()) as session: # type: ignore[misc]
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
event_ids = _select_event_ids_to_purge(session, purge_before) event_ids = _select_event_ids_to_purge(session, purge_before)
state_ids = _select_state_ids_to_purge(session, purge_before, event_ids) state_ids, attributes_ids = _select_state_and_attributes_ids_to_purge(
session, purge_before, event_ids
)
attributes_ids = _remove_attributes_ids_used_by_newer_states(
session, purge_before, attributes_ids
)
statistics_runs = _select_statistics_runs_to_purge(session, purge_before) statistics_runs = _select_statistics_runs_to_purge(session, purge_before)
short_term_statistics = _select_short_term_statistics_to_purge( short_term_statistics = _select_short_term_statistics_to_purge(
session, purge_before session, purge_before
@ -46,6 +60,9 @@ def purge_old_data(
if state_ids: if state_ids:
_purge_state_ids(instance, session, state_ids) _purge_state_ids(instance, session, state_ids)
if attributes_ids:
_purge_attributes_ids(instance, session, attributes_ids)
if event_ids: if event_ids:
_purge_event_ids(session, event_ids) _purge_event_ids(session, event_ids)
@ -82,20 +99,47 @@ def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list
return [event.event_id for event in events] return [event.event_id for event in events]
def _select_state_ids_to_purge( def _select_state_and_attributes_ids_to_purge(
session: Session, purge_before: datetime, event_ids: list[int] session: Session, purge_before: datetime, event_ids: list[int]
) -> set[int]: ) -> tuple[set[int], set[int]]:
"""Return a list of state ids to purge.""" """Return a list of state ids to purge."""
if not event_ids: if not event_ids:
return set() return set(), set()
states = ( states = (
session.query(States.state_id) session.query(States.state_id, States.attributes_id)
.filter(States.last_updated < purge_before) .filter(States.last_updated < purge_before)
.filter(States.event_id.in_(event_ids)) .filter(States.event_id.in_(event_ids))
.all() .all()
) )
_LOGGER.debug("Selected %s state ids to remove", len(states)) _LOGGER.debug("Selected %s state ids to remove", len(states))
return {state.state_id for state in states} state_ids = set()
attributes_ids = set()
for state in states:
state_ids.add(state.state_id)
if state.attributes_id:
attributes_ids.add(state.attributes_id)
return state_ids, attributes_ids
def _remove_attributes_ids_used_by_newer_states(
session: Session, purge_before: datetime, attributes_ids: set[int]
) -> set[int]:
"""Remove attributes ids that are still in use for states we are not purging yet."""
if not attributes_ids:
return set()
keep_attributes_ids = {
state.attributes_id
for state in session.query(States.attributes_id)
.filter(States.last_updated >= purge_before)
.filter(States.attributes_id.in_(attributes_ids))
.group_by(States.attributes_id)
}
to_remove = attributes_ids - keep_attributes_ids
_LOGGER.debug(
"Selected %s shared attributes to remove",
len(to_remove),
)
return to_remove
def _select_statistics_runs_to_purge( def _select_statistics_runs_to_purge(
@ -143,7 +187,9 @@ def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int])
disconnected_rows = ( disconnected_rows = (
session.query(States) session.query(States)
.filter(States.old_state_id.in_(state_ids)) .filter(States.old_state_id.in_(state_ids))
.update({"old_state_id": None}, synchronize_session=False) .update(
{"old_state_id": None, "attributes_id": None}, synchronize_session=False
)
) )
_LOGGER.debug("Updated %s states to remove old_state_id", disconnected_rows) _LOGGER.debug("Updated %s states to remove old_state_id", disconnected_rows)
@ -175,6 +221,44 @@ def _evict_purged_states_from_old_states_cache(
old_states.pop(old_state_reversed[purged_state_id], None) old_states.pop(old_state_reversed[purged_state_id], None)
def _evict_purged_attributes_from_attributes_cache(
instance: Recorder, purged_attributes_ids: set[int]
) -> None:
"""Evict purged attribute ids from the attribute ids cache."""
# Make a map from attributes_id to the attributes json
state_attributes_ids = (
instance._state_attributes_ids # pylint: disable=protected-access
)
state_attributes_ids_reversed = {
attributes_id: attributes
for attributes, attributes_id in state_attributes_ids.items()
}
# Evict any purged attributes from the state_attributes_ids cache
for purged_attribute_id in purged_attributes_ids.intersection(
state_attributes_ids_reversed
):
state_attributes_ids.pop(
state_attributes_ids_reversed[purged_attribute_id], None
)
def _purge_attributes_ids(
instance: Recorder, session: Session, attributes_ids: set[int]
) -> None:
"""Delete old attributes ids."""
deleted_rows = (
session.query(StateAttributes)
.filter(StateAttributes.attributes_id.in_(attributes_ids))
.delete(synchronize_session=False)
)
_LOGGER.debug("Deleted %s attribute states", deleted_rows)
# Evict any entries in the state_attributes_ids cache referring to a purged state
_evict_purged_attributes_from_attributes_cache(instance, attributes_ids)
def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None: def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None:
"""Delete by run_id.""" """Delete by run_id."""
deleted_rows = ( deleted_rows = (
@ -248,26 +332,52 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
return True return True
def _remove_attributes_ids_used_by_other_entities(
session: Session, entities: list[str], attributes_ids: set[int]
) -> set[int]:
"""Remove attributes ids that are still in use for entitiy_ids we are not purging yet."""
if not attributes_ids:
return set()
keep_attributes_ids = {
state.attributes_id
for state in session.query(States.attributes_id)
.filter(States.entity_id.not_in(entities))
.filter(States.attributes_id.in_(attributes_ids))
.group_by(States.attributes_id)
}
to_remove = attributes_ids - keep_attributes_ids
_LOGGER.debug(
"Selected %s shared attributes to remove",
len(to_remove),
)
return to_remove
def _purge_filtered_states( def _purge_filtered_states(
instance: Recorder, session: Session, excluded_entity_ids: list[str] instance: Recorder, session: Session, excluded_entity_ids: list[str]
) -> None: ) -> None:
"""Remove filtered states and linked events.""" """Remove filtered states and linked events."""
state_ids: list[int] state_ids: list[int]
attributes_ids: list[int]
event_ids: list[int | None] event_ids: list[int | None]
state_ids, event_ids = zip( state_ids, attributes_ids, event_ids = zip(
*( *(
session.query(States.state_id, States.event_id) session.query(States.state_id, States.attributes_id, States.event_id)
.filter(States.entity_id.in_(excluded_entity_ids)) .filter(States.entity_id.in_(excluded_entity_ids))
.limit(MAX_ROWS_TO_PURGE) .limit(MAX_ROWS_TO_PURGE)
.all() .all()
) )
) )
event_ids = [id_ for id_ in event_ids if id_ is not None] event_ids = [id_ for id_ in event_ids if id_ is not None]
attributes_ids_set = _remove_attributes_ids_used_by_other_entities(
session, excluded_entity_ids, {id_ for id_ in attributes_ids if id_ is not None}
)
_LOGGER.debug( _LOGGER.debug(
"Selected %s state_ids to remove that should be filtered", len(state_ids) "Selected %s state_ids to remove that should be filtered", len(state_ids)
) )
_purge_state_ids(instance, session, set(state_ids)) _purge_state_ids(instance, session, set(state_ids))
_purge_event_ids(session, event_ids) # type: ignore[arg-type] # type of event_ids already narrowed to 'list[int]' _purge_event_ids(session, event_ids) # type: ignore[arg-type] # type of event_ids already narrowed to 'list[int]'
_purge_attributes_ids(instance, session, attributes_ids_set)
def _purge_filtered_events( def _purge_filtered_events(
@ -290,6 +400,9 @@ def _purge_filtered_events(
state_ids: set[int] = {state.state_id for state in states} state_ids: set[int] = {state.state_id for state in states}
_purge_state_ids(instance, session, state_ids) _purge_state_ids(instance, session, state_ids)
_purge_event_ids(session, event_ids) _purge_event_ids(session, event_ids)
if EVENT_STATE_CHANGED in excluded_event_types:
session.query(StateAttributes).delete(synchronize_session=False)
instance._state_attributes_ids = {} # pylint: disable=protected-access
@retryable_database_job("purge") @retryable_database_job("purge")

View file

@ -12,7 +12,7 @@ from typing import Any, Literal, cast
import voluptuous as vol import voluptuous as vol
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
from homeassistant.components.recorder.models import States from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.util import execute, session_scope from homeassistant.components.recorder.util import execute, session_scope
from homeassistant.components.sensor import ( from homeassistant.components.sensor import (
PLATFORM_SCHEMA, PLATFORM_SCHEMA,
@ -482,9 +482,10 @@ class StatisticsSensor(SensorEntity):
""" """
_LOGGER.debug("%s: initializing values from the database", self.entity_id) _LOGGER.debug("%s: initializing values from the database", self.entity_id)
states = []
with session_scope(hass=self.hass) as session: with session_scope(hass=self.hass) as session:
query = session.query(States).filter( query = session.query(States, StateAttributes).filter(
States.entity_id == self._source_entity_id.lower() States.entity_id == self._source_entity_id.lower()
) )
@ -499,10 +500,18 @@ class StatisticsSensor(SensorEntity):
else: else:
_LOGGER.debug("%s: retrieving all records", self.entity_id) _LOGGER.debug("%s: retrieving all records", self.entity_id)
query = query.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
query = query.order_by(States.last_updated.desc()).limit( query = query.order_by(States.last_updated.desc()).limit(
self._samples_max_buffer_size self._samples_max_buffer_size
) )
states = execute(query, to_native=True, validate_entity_ids=False) if results := execute(query, to_native=False, validate_entity_ids=False):
for state, attributes in results:
native = state.to_native()
if not native.attributes:
native.attributes = attributes.to_native()
states.append(native)
if states: if states:
for state in reversed(states): for state in reversed(states):

View file

@ -13,11 +13,13 @@ bcrypt==3.1.7
certifi>=2021.5.30 certifi>=2021.5.30
ciso8601==2.2.0 ciso8601==2.2.0
cryptography==35.0.0 cryptography==35.0.0
fnvhash==0.1.0
hass-nabucasa==0.54.0 hass-nabucasa==0.54.0
home-assistant-frontend==20220317.0 home-assistant-frontend==20220317.0
httpx==0.22.0 httpx==0.22.0
ifaddr==0.1.7 ifaddr==0.1.7
jinja2==3.0.3 jinja2==3.0.3
lru-dict==1.1.7
paho-mqtt==1.6.1 paho-mqtt==1.6.1
pillow==9.0.1 pillow==9.0.1
pip>=21.0,<22.1 pip>=21.0,<22.1

View file

@ -651,6 +651,7 @@ flipr-api==1.4.2
flux_led==0.28.27 flux_led==0.28.27
# homeassistant.components.homekit # homeassistant.components.homekit
# homeassistant.components.recorder
fnvhash==0.1.0 fnvhash==0.1.0
# homeassistant.components.foobot # homeassistant.components.foobot
@ -952,6 +953,9 @@ logi_circle==0.2.2
# homeassistant.components.london_underground # homeassistant.components.london_underground
london-tube-status==0.2 london-tube-status==0.2
# homeassistant.components.recorder
lru-dict==1.1.7
# homeassistant.components.luftdaten # homeassistant.components.luftdaten
luftdaten==0.7.2 luftdaten==0.7.2

View file

@ -443,6 +443,7 @@ flipr-api==1.4.2
flux_led==0.28.27 flux_led==0.28.27
# homeassistant.components.homekit # homeassistant.components.homekit
# homeassistant.components.recorder
fnvhash==0.1.0 fnvhash==0.1.0
# homeassistant.components.foobot # homeassistant.components.foobot
@ -636,6 +637,9 @@ libsoundtouch==0.8
# homeassistant.components.logi_circle # homeassistant.components.logi_circle
logi_circle==0.2.2 logi_circle==0.2.2
# homeassistant.components.recorder
lru-dict==1.1.7
# homeassistant.components.luftdaten # homeassistant.components.luftdaten
luftdaten==0.7.2 luftdaten==0.7.2

View file

@ -43,7 +43,11 @@ from tests.common import (
async_init_recorder_component, async_init_recorder_component,
mock_platform, mock_platform,
) )
from tests.components.recorder.common import trigger_db_commit from tests.components.recorder.common import (
async_trigger_db_commit,
async_wait_recording_done_without_instance,
trigger_db_commit,
)
EMPTY_CONFIG = logbook.CONFIG_SCHEMA({logbook.DOMAIN: {}}) EMPTY_CONFIG = logbook.CONFIG_SCHEMA({logbook.DOMAIN: {}})
@ -280,12 +284,14 @@ def create_state_changed_event_from_old_new(
"attributes" "attributes"
"state_id", "state_id",
"old_state_id", "old_state_id",
"shared_attrs",
], ],
) )
row.event_type = EVENT_STATE_CHANGED row.event_type = EVENT_STATE_CHANGED
row.event_data = "{}" row.event_data = "{}"
row.attributes = attributes_json row.attributes = attributes_json
row.shared_attrs = attributes_json
row.time_fired = event_time_fired row.time_fired = event_time_fired
row.state = new_state and new_state.get("state") row.state = new_state and new_state.get("state")
row.entity_id = entity_id row.entity_id = entity_id
@ -636,6 +642,44 @@ async def test_logbook_entity_filter_with_automations(hass, hass_client):
assert json_dict[0]["entity_id"] == entity_id_second assert json_dict[0]["entity_id"] == entity_id_second
async def test_logbook_entity_no_longer_in_state_machine(hass, hass_client):
"""Test the logbook view with an entity that hass been removed from the state machine."""
await async_init_recorder_component(hass)
await async_setup_component(hass, "logbook", {})
await async_setup_component(hass, "automation", {})
await async_setup_component(hass, "script", {})
await async_wait_recording_done_without_instance(hass)
entity_id_test = "alarm_control_panel.area_001"
hass.states.async_set(
entity_id_test, STATE_OFF, {ATTR_FRIENDLY_NAME: "Alarm Control Panel"}
)
hass.states.async_set(
entity_id_test, STATE_ON, {ATTR_FRIENDLY_NAME: "Alarm Control Panel"}
)
async_trigger_db_commit(hass)
await async_wait_recording_done_without_instance(hass)
hass.states.async_remove(entity_id_test)
client = await hass_client()
# Today time 00:00:00
start = dt_util.utcnow().date()
start_date = datetime(start.year, start.month, start.day)
# Test today entries with filter by end_time
end_time = start + timedelta(hours=24)
response = await client.get(
f"/api/logbook/{start_date.isoformat()}?end_time={end_time}"
)
assert response.status == HTTPStatus.OK
json_dict = await response.json()
assert json_dict[0]["name"] == "Alarm Control Panel"
async def test_filter_continuous_sensor_values(hass, hass_client): async def test_filter_continuous_sensor_values(hass, hass_client):
"""Test remove continuous sensor events from logbook.""" """Test remove continuous sensor events from logbook."""
await async_init_recorder_component(hass) await async_init_recorder_component(hass)

View file

@ -1,9 +1,6 @@
"""Unit tests for platform/plant.py.""" """Unit tests for platform/plant.py."""
from datetime import datetime, timedelta from datetime import datetime, timedelta
import pytest
from homeassistant.components import recorder
import homeassistant.components.plant as plant import homeassistant.components.plant as plant
from homeassistant.const import ( from homeassistant.const import (
ATTR_UNIT_OF_MEASUREMENT, ATTR_UNIT_OF_MEASUREMENT,
@ -12,12 +9,12 @@ from homeassistant.const import (
STATE_OK, STATE_OK,
STATE_PROBLEM, STATE_PROBLEM,
STATE_UNAVAILABLE, STATE_UNAVAILABLE,
STATE_UNKNOWN,
) )
from homeassistant.core import State from homeassistant.core import State
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
from tests.common import init_recorder_component from tests.common import async_init_recorder_component
from tests.components.recorder.common import async_wait_recording_done_without_instance
GOOD_DATA = { GOOD_DATA = {
"moisture": 50, "moisture": 50,
@ -148,19 +145,13 @@ async def test_state_problem_if_unavailable(hass):
assert state.attributes[plant.READING_MOISTURE] == STATE_UNAVAILABLE assert state.attributes[plant.READING_MOISTURE] == STATE_UNAVAILABLE
@pytest.mark.skipif(
plant.ENABLE_LOAD_HISTORY is False,
reason="tests for loading from DB are unstable, thus"
"this feature is turned of until tests become"
"stable",
)
async def test_load_from_db(hass): async def test_load_from_db(hass):
"""Test bootstrapping the brightness history from the database. """Test bootstrapping the brightness history from the database.
This test can should only be executed if the loading of the history This test can should only be executed if the loading of the history
is enabled via plant.ENABLE_LOAD_HISTORY. is enabled via plant.ENABLE_LOAD_HISTORY.
""" """
init_recorder_component(hass) await async_init_recorder_component(hass)
plant_name = "wise_plant" plant_name = "wise_plant"
for value in [20, 30, 10]: for value in [20, 30, 10]:
@ -169,7 +160,7 @@ async def test_load_from_db(hass):
) )
await hass.async_block_till_done() await hass.async_block_till_done()
# wait for the recorder to really store the data # wait for the recorder to really store the data
hass.data[recorder.DATA_INSTANCE].block_till_done() await async_wait_recording_done_without_instance(hass)
assert await async_setup_component( assert await async_setup_component(
hass, plant.DOMAIN, {plant.DOMAIN: {plant_name: GOOD_CONFIG}} hass, plant.DOMAIN, {plant.DOMAIN: {plant_name: GOOD_CONFIG}}
@ -177,7 +168,7 @@ async def test_load_from_db(hass):
await hass.async_block_till_done() await hass.async_block_till_done()
state = hass.states.get(f"plant.{plant_name}") state = hass.states.get(f"plant.{plant_name}")
assert state.state == STATE_UNKNOWN assert state.state == STATE_PROBLEM
max_brightness = state.attributes.get(plant.ATTR_MAX_BRIGHTNESS_HISTORY) max_brightness = state.attributes.get(plant.ATTR_MAX_BRIGHTNESS_HISTORY)
assert max_brightness == 30 assert max_brightness == 30

View file

@ -31,6 +31,7 @@ from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.models import ( from homeassistant.components.recorder.models import (
Events, Events,
RecorderRuns, RecorderRuns,
StateAttributes,
States, States,
StatisticsRuns, StatisticsRuns,
process_timestamp, process_timestamp,
@ -166,10 +167,13 @@ async def test_saving_state(
await async_wait_recording_done(hass, instance) await async_wait_recording_done(hass, instance)
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
db_states = list(session.query(States)) db_states = []
for db_state, db_state_attributes in session.query(States, StateAttributes):
db_states.append(db_state)
state = db_state.to_native()
state.attributes = db_state_attributes.to_native()
assert len(db_states) == 1 assert len(db_states) == 1
assert db_states[0].event_id > 0 assert db_states[0].event_id > 0
state = db_states[0].to_native()
assert state == _state_empty_context(hass, entity_id) assert state == _state_empty_context(hass, entity_id)
@ -400,7 +404,14 @@ def _add_entities(hass, entity_ids):
wait_recording_done(hass) wait_recording_done(hass)
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
return [st.to_native() for st in session.query(States)] states = []
for state, state_attributes in session.query(States, StateAttributes).outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
):
native_state = state.to_native()
native_state.attributes = state_attributes.to_native()
states.append(native_state)
return states
def _add_events(hass, events): def _add_events(hass, events):

View file

@ -1,5 +1,6 @@
"""The tests for the Recorder component.""" """The tests for the Recorder component."""
from datetime import datetime from datetime import datetime, timedelta
from unittest.mock import PropertyMock
import pytest import pytest
from sqlalchemy import create_engine from sqlalchemy import create_engine
@ -8,7 +9,9 @@ from sqlalchemy.orm import scoped_session, sessionmaker
from homeassistant.components.recorder.models import ( from homeassistant.components.recorder.models import (
Base, Base,
Events, Events,
LazyState,
RecorderRuns, RecorderRuns,
StateAttributes,
States, States,
process_timestamp, process_timestamp,
process_timestamp_to_utc_isoformat, process_timestamp_to_utc_isoformat,
@ -16,8 +19,7 @@ from homeassistant.components.recorder.models import (
from homeassistant.const import EVENT_STATE_CHANGED from homeassistant.const import EVENT_STATE_CHANGED
import homeassistant.core as ha import homeassistant.core as ha
from homeassistant.exceptions import InvalidEntityFormatError from homeassistant.exceptions import InvalidEntityFormatError
from homeassistant.util import dt from homeassistant.util import dt, dt as dt_util
import homeassistant.util.dt as dt_util
def test_from_event_to_db_event(): def test_from_event_to_db_event():
@ -40,6 +42,27 @@ def test_from_event_to_db_state():
assert state == States.from_event(event).to_native() assert state == States.from_event(event).to_native()
def test_from_event_to_db_state_attributes():
"""Test converting event to db state attributes."""
attrs = {"this_attr": True}
state = ha.State("sensor.temperature", "18", attrs)
event = ha.Event(
EVENT_STATE_CHANGED,
{"entity_id": "sensor.temperature", "old_state": None, "new_state": state},
context=state.context,
)
assert StateAttributes.from_event(event).to_native() == attrs
def test_handling_broken_json_state_attributes(caplog):
"""Test we handle broken json in state attributes."""
state_attributes = StateAttributes(
attributes_id=444, hash=1234, shared_attrs="{NOT_PARSE}"
)
assert state_attributes.to_native() == {}
assert "Error converting row to state attributes" in caplog.text
def test_from_event_to_delete_state(): def test_from_event_to_delete_state():
"""Test converting deleting state event to db state.""" """Test converting deleting state event to db state."""
event = ha.Event( event = ha.Event(
@ -215,3 +238,97 @@ async def test_event_to_db_model():
native = Events.from_event(event, event_data="{}").to_native() native = Events.from_event(event, event_data="{}").to_native()
event.data = {} event.data = {}
assert native == event assert native == event
async def test_lazy_state_handles_include_json(caplog):
"""Test that the LazyState class handles invalid json."""
row = PropertyMock(
entity_id="sensor.invalid",
shared_attrs="{INVALID_JSON}",
)
assert LazyState(row).attributes == {}
assert "Error converting row to state attributes" in caplog.text
async def test_lazy_state_prefers_shared_attrs_over_attrs(caplog):
"""Test that the LazyState prefers shared_attrs over attributes."""
row = PropertyMock(
entity_id="sensor.invalid",
shared_attrs='{"shared":true}',
attributes='{"shared":false}',
)
assert LazyState(row).attributes == {"shared": True}
async def test_lazy_state_handles_different_last_updated_and_last_changed(caplog):
"""Test that the LazyState handles different last_updated and last_changed."""
now = datetime(2021, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC)
row = PropertyMock(
entity_id="sensor.valid",
state="off",
shared_attrs='{"shared":true}',
last_updated=now,
last_changed=now - timedelta(seconds=60),
)
lstate = LazyState(row)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
"last_changed": "2021-06-12T03:03:01.000323+00:00",
"last_updated": "2021-06-12T03:04:01.000323+00:00",
"state": "off",
}
assert lstate.last_updated == row.last_updated
assert lstate.last_changed == row.last_changed
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
"last_changed": "2021-06-12T03:03:01.000323+00:00",
"last_updated": "2021-06-12T03:04:01.000323+00:00",
"state": "off",
}
async def test_lazy_state_handles_same_last_updated_and_last_changed(caplog):
"""Test that the LazyState handles same last_updated and last_changed."""
now = datetime(2021, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC)
row = PropertyMock(
entity_id="sensor.valid",
state="off",
shared_attrs='{"shared":true}',
last_updated=now,
last_changed=now,
)
lstate = LazyState(row)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
"last_changed": "2021-06-12T03:04:01.000323+00:00",
"last_updated": "2021-06-12T03:04:01.000323+00:00",
"state": "off",
}
assert lstate.last_updated == row.last_updated
assert lstate.last_changed == row.last_changed
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
"last_changed": "2021-06-12T03:04:01.000323+00:00",
"last_updated": "2021-06-12T03:04:01.000323+00:00",
"state": "off",
}
lstate.last_updated = datetime(2020, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
"last_changed": "2021-06-12T03:04:01.000323+00:00",
"last_updated": "2020-06-12T03:04:01.000323+00:00",
"state": "off",
}
lstate.last_changed = datetime(2020, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
"last_changed": "2020-06-12T03:04:01.000323+00:00",
"last_updated": "2020-06-12T03:04:01.000323+00:00",
"state": "off",
}

View file

@ -13,13 +13,14 @@ from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE
from homeassistant.components.recorder.models import ( from homeassistant.components.recorder.models import (
Events, Events,
RecorderRuns, RecorderRuns,
StateAttributes,
States, States,
StatisticsRuns, StatisticsRuns,
StatisticsShortTerm, StatisticsShortTerm,
) )
from homeassistant.components.recorder.purge import purge_old_data from homeassistant.components.recorder.purge import purge_old_data
from homeassistant.components.recorder.util import session_scope from homeassistant.components.recorder.util import session_scope
from homeassistant.const import EVENT_STATE_CHANGED from homeassistant.const import EVENT_STATE_CHANGED, STATE_ON
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.typing import ConfigType
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
@ -44,9 +45,12 @@ async def test_purge_old_states(
# make sure we start with 6 states # make sure we start with 6 states
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
states = session.query(States) states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 6 assert states.count() == 6
assert states[0].old_state_id is None assert states[0].old_state_id is None
assert states[-1].old_state_id == states[-2].state_id assert states[-1].old_state_id == states[-2].state_id
assert state_attributes.count() == 3
events = session.query(Events).filter(Events.event_type == "state_changed") events = session.query(Events).filter(Events.event_type == "state_changed")
assert events.count() == 6 assert events.count() == 6
@ -58,6 +62,8 @@ async def test_purge_old_states(
finished = purge_old_data(instance, purge_before, repack=False) finished = purge_old_data(instance, purge_before, repack=False)
assert not finished assert not finished
assert states.count() == 2 assert states.count() == 2
assert state_attributes.count() == 1
assert "test.recorder2" in instance._old_states assert "test.recorder2" in instance._old_states
states_after_purge = session.query(States) states_after_purge = session.query(States)
@ -67,6 +73,8 @@ async def test_purge_old_states(
finished = purge_old_data(instance, purge_before, repack=False) finished = purge_old_data(instance, purge_before, repack=False)
assert finished assert finished
assert states.count() == 2 assert states.count() == 2
assert state_attributes.count() == 1
assert "test.recorder2" in instance._old_states assert "test.recorder2" in instance._old_states
# run purge_old_data again # run purge_old_data again
@ -74,6 +82,8 @@ async def test_purge_old_states(
finished = purge_old_data(instance, purge_before, repack=False) finished = purge_old_data(instance, purge_before, repack=False)
assert not finished assert not finished
assert states.count() == 0 assert states.count() == 0
assert state_attributes.count() == 0
assert "test.recorder2" not in instance._old_states assert "test.recorder2" not in instance._old_states
# Add some more states # Add some more states
@ -90,6 +100,9 @@ async def test_purge_old_states(
assert events.count() == 6 assert events.count() == 6
assert "test.recorder2" in instance._old_states assert "test.recorder2" in instance._old_states
state_attributes = session.query(StateAttributes)
assert state_attributes.count() == 3
async def test_purge_old_states_encouters_database_corruption( async def test_purge_old_states_encouters_database_corruption(
hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT
@ -368,6 +381,14 @@ async def test_purge_edge_case(
last_changed=timestamp, last_changed=timestamp,
last_updated=timestamp, last_updated=timestamp,
event_id=1001, event_id=1001,
attributes_id=1002,
)
)
session.add(
StateAttributes(
shared_attrs="{}",
hash=1234,
attributes_id=1002,
) )
) )
@ -382,6 +403,9 @@ async def test_purge_edge_case(
states = session.query(States) states = session.query(States)
assert states.count() == 1 assert states.count() == 1
state_attributes = session.query(StateAttributes)
assert state_attributes.count() == 1
events = session.query(Events).filter(Events.event_type == "EVENT_TEST_PURGE") events = session.query(Events).filter(Events.event_type == "EVENT_TEST_PURGE")
assert events.count() == 1 assert events.count() == 1
@ -426,6 +450,14 @@ async def test_purge_cutoff_date(
last_changed=timestamp_keep, last_changed=timestamp_keep,
last_updated=timestamp_keep, last_updated=timestamp_keep,
event_id=1000, event_id=1000,
attributes_id=1000,
)
)
session.add(
StateAttributes(
shared_attrs="{}",
hash=1234,
attributes_id=1000,
) )
) )
for row in range(1, rows): for row in range(1, rows):
@ -447,6 +479,14 @@ async def test_purge_cutoff_date(
last_changed=timestamp_purge, last_changed=timestamp_purge,
last_updated=timestamp_purge, last_updated=timestamp_purge,
event_id=1000 + row, event_id=1000 + row,
attributes_id=1000 + row,
)
)
session.add(
StateAttributes(
shared_attrs="{}",
hash=1234,
attributes_id=1000 + row,
) )
) )
@ -462,9 +502,18 @@ async def test_purge_cutoff_date(
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
states = session.query(States) states = session.query(States)
state_attributes = session.query(StateAttributes)
events = session.query(Events) events = session.query(Events)
assert states.filter(States.state == "purge").count() == rows - 1 assert states.filter(States.state == "purge").count() == rows - 1
assert states.filter(States.state == "keep").count() == 1 assert states.filter(States.state == "keep").count() == 1
assert (
state_attributes.outerjoin(
States, StateAttributes.attributes_id == States.attributes_id
)
.filter(States.state == "keep")
.count()
== 1
)
assert events.filter(Events.event_type == "PURGE").count() == rows - 1 assert events.filter(Events.event_type == "PURGE").count() == rows - 1
assert events.filter(Events.event_type == "KEEP").count() == 1 assert events.filter(Events.event_type == "KEEP").count() == 1
@ -474,12 +523,47 @@ async def test_purge_cutoff_date(
await async_wait_purge_done(hass, instance) await async_wait_purge_done(hass, instance)
states = session.query(States) states = session.query(States)
state_attributes = session.query(StateAttributes)
events = session.query(Events) events = session.query(Events)
assert states.filter(States.state == "purge").count() == 0 assert states.filter(States.state == "purge").count() == 0
assert (
state_attributes.outerjoin(
States, StateAttributes.attributes_id == States.attributes_id
)
.filter(States.state == "purge")
.count()
== 0
)
assert states.filter(States.state == "keep").count() == 1 assert states.filter(States.state == "keep").count() == 1
assert (
state_attributes.outerjoin(
States, StateAttributes.attributes_id == States.attributes_id
)
.filter(States.state == "keep")
.count()
== 1
)
assert events.filter(Events.event_type == "PURGE").count() == 0 assert events.filter(Events.event_type == "PURGE").count() == 0
assert events.filter(Events.event_type == "KEEP").count() == 1 assert events.filter(Events.event_type == "KEEP").count() == 1
# Make sure we can purge everything
instance.queue.put(
PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False)
)
await async_recorder_block_till_done(hass, instance)
await async_wait_purge_done(hass, instance)
assert states.count() == 0
assert state_attributes.count() == 0
# Make sure we can purge everything when the db is already empty
instance.queue.put(
PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False)
)
await async_recorder_block_till_done(hass, instance)
await async_wait_purge_done(hass, instance)
assert states.count() == 0
assert state_attributes.count() == 0
async def test_purge_filtered_states( async def test_purge_filtered_states(
hass: HomeAssistant, hass: HomeAssistant,
@ -527,6 +611,12 @@ async def test_purge_filtered_states(
) )
# Add states with linked old_state_ids that need to be handled # Add states with linked old_state_ids that need to be handled
timestamp = dt_util.utcnow() - timedelta(days=0) timestamp = dt_util.utcnow() - timedelta(days=0)
state_attrs = StateAttributes(
hash=0,
shared_attrs=json.dumps(
{"sensor.linked_old_state_id": "sensor.linked_old_state_id"}
),
)
state_1 = States( state_1 = States(
entity_id="sensor.linked_old_state_id", entity_id="sensor.linked_old_state_id",
domain="sensor", domain="sensor",
@ -535,6 +625,7 @@ async def test_purge_filtered_states(
last_changed=timestamp, last_changed=timestamp,
last_updated=timestamp, last_updated=timestamp,
old_state_id=1, old_state_id=1,
state_attributes=state_attrs,
) )
timestamp = dt_util.utcnow() - timedelta(days=4) timestamp = dt_util.utcnow() - timedelta(days=4)
state_2 = States( state_2 = States(
@ -545,6 +636,7 @@ async def test_purge_filtered_states(
last_changed=timestamp, last_changed=timestamp,
last_updated=timestamp, last_updated=timestamp,
old_state_id=2, old_state_id=2,
state_attributes=state_attrs,
) )
state_3 = States( state_3 = States(
entity_id="sensor.linked_old_state_id", entity_id="sensor.linked_old_state_id",
@ -554,8 +646,9 @@ async def test_purge_filtered_states(
last_changed=timestamp, last_changed=timestamp,
last_updated=timestamp, last_updated=timestamp,
old_state_id=62, # keep old_state_id=62, # keep
state_attributes=state_attrs,
) )
session.add_all((state_1, state_2, state_3)) session.add_all((state_attrs, state_1, state_2, state_3))
# Add event that should be keeped # Add event that should be keeped
session.add( session.add(
Events( Events(
@ -617,8 +710,154 @@ async def test_purge_filtered_states(
assert states_sensor_excluded.count() == 0 assert states_sensor_excluded.count() == 0
assert session.query(States).get(72).old_state_id is None assert session.query(States).get(72).old_state_id is None
assert session.query(States).get(72).attributes_id is None
assert session.query(States).get(73).old_state_id is None assert session.query(States).get(73).old_state_id is None
assert session.query(States).get(74).old_state_id == 62 # should have been kept assert session.query(States).get(73).attributes_id is None
final_keep_state = session.query(States).get(74)
assert final_keep_state.old_state_id == 62 # should have been kept
assert final_keep_state.attributes_id == 71
assert session.query(StateAttributes).count() == 11
# Do it again to make sure nothing changes
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, service_data
)
await async_recorder_block_till_done(hass, instance)
await async_wait_purge_done(hass, instance)
final_keep_state = session.query(States).get(74)
assert final_keep_state.old_state_id == 62 # should have been kept
assert final_keep_state.attributes_id == 71
assert session.query(StateAttributes).count() == 11
# Finally make sure we can delete them all except for the ones missing an event_id
service_data = {"keep_days": 0}
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, service_data
)
await async_recorder_block_till_done(hass, instance)
await async_wait_purge_done(hass, instance)
remaining = list(session.query(States))
for state in remaining:
assert state.event_id is None
assert len(remaining) == 3
assert session.query(StateAttributes).count() == 1
async def test_purge_filtered_states_to_empty(
hass: HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT,
):
"""Test filtered states are purged all the way to an empty db."""
config: ConfigType = {"exclude": {"entities": ["sensor.excluded"]}}
instance = await async_setup_recorder_instance(hass, config)
assert instance.entity_filter("sensor.excluded") is False
def _add_db_entries(hass: HomeAssistant) -> None:
with recorder.session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
_add_state_and_state_changed_event(
session,
"sensor.excluded",
"purgeme",
timestamp,
event_id * days,
)
service_data = {"keep_days": 10}
_add_db_entries(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 60
assert state_attributes.count() == 60
# Test with 'apply_filter' = True
service_data["apply_filter"] = True
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, service_data
)
await async_recorder_block_till_done(hass, instance)
await async_wait_purge_done(hass, instance)
assert states.count() == 0
assert state_attributes.count() == 0
# Do it again to make sure nothing changes
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, service_data
)
await async_recorder_block_till_done(hass, instance)
await async_wait_purge_done(hass, instance)
async def test_purge_without_state_attributes_filtered_states_to_empty(
hass: HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT,
):
"""Test filtered legacy states without state attributes are purged all the way to an empty db."""
config: ConfigType = {"exclude": {"entities": ["sensor.old_format"]}}
instance = await async_setup_recorder_instance(hass, config)
assert instance.entity_filter("sensor.old_format") is False
def _add_db_entries(hass: HomeAssistant) -> None:
with recorder.session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
# in the legacy format
timestamp = dt_util.utcnow() - timedelta(days=5)
event_id = 1021
session.add(
States(
entity_id="sensor.old_format",
domain="sensor",
state=STATE_ON,
attributes=json.dumps({"old": "not_using_state_attributes"}),
last_changed=timestamp,
last_updated=timestamp,
event_id=event_id,
state_attributes=None,
)
)
session.add(
Events(
event_id=event_id,
event_type=EVENT_STATE_CHANGED,
event_data="{}",
origin="LOCAL",
time_fired=timestamp,
)
)
service_data = {"keep_days": 10}
_add_db_entries(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 1
assert state_attributes.count() == 0
# Test with 'apply_filter' = True
service_data["apply_filter"] = True
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, service_data
)
await async_recorder_block_till_done(hass, instance)
await async_wait_purge_done(hass, instance)
assert states.count() == 0
assert state_attributes.count() == 0
# Do it again to make sure nothing changes
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, service_data
)
await async_recorder_block_till_done(hass, instance)
await async_wait_purge_done(hass, instance)
async def test_purge_filtered_events( async def test_purge_filtered_events(
@ -923,7 +1162,7 @@ async def _add_test_states(hass: HomeAssistant, instance: recorder.Recorder):
utcnow = dt_util.utcnow() utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5) five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11) eleven_days_ago = utcnow - timedelta(days=11)
attributes = {"test_attr": 5, "test_attr_10": "nice"} base_attributes = {"test_attr": 5, "test_attr_10": "nice"}
async def set_state(entity_id, state, **kwargs): async def set_state(entity_id, state, **kwargs):
"""Set the state.""" """Set the state."""
@ -935,12 +1174,15 @@ async def _add_test_states(hass: HomeAssistant, instance: recorder.Recorder):
if event_id < 2: if event_id < 2:
timestamp = eleven_days_ago timestamp = eleven_days_ago
state = f"autopurgeme_{event_id}" state = f"autopurgeme_{event_id}"
attributes = {"autopurgeme": True, **base_attributes}
elif event_id < 4: elif event_id < 4:
timestamp = five_days_ago timestamp = five_days_ago
state = f"purgeme_{event_id}" state = f"purgeme_{event_id}"
attributes = {"purgeme": True, **base_attributes}
else: else:
timestamp = utcnow timestamp = utcnow
state = f"dontpurgeme_{event_id}" state = f"dontpurgeme_{event_id}"
attributes = {"dontpurgeme": True, **base_attributes}
with patch( with patch(
"homeassistant.components.recorder.dt_util.utcnow", return_value=timestamp "homeassistant.components.recorder.dt_util.utcnow", return_value=timestamp
@ -1069,15 +1311,20 @@ def _add_state_and_state_changed_event(
event_id: int, event_id: int,
) -> None: ) -> None:
"""Add state and state_changed event to database for testing.""" """Add state and state_changed event to database for testing."""
state_attrs = StateAttributes(
hash=event_id, shared_attrs=json.dumps({entity_id: entity_id})
)
session.add(state_attrs)
session.add( session.add(
States( States(
entity_id=entity_id, entity_id=entity_id,
domain="sensor", domain="sensor",
state=state, state=state,
attributes="{}", attributes=None,
last_changed=timestamp, last_changed=timestamp,
last_updated=timestamp, last_updated=timestamp,
event_id=event_id, event_id=event_id,
state_attributes=state_attrs,
) )
) )
session.add( session.add(