"""Queries for logbook.""" from __future__ import annotations from collections.abc import Callable from datetime import datetime as dt import json from typing import Any import sqlalchemy from sqlalchemy import JSON, select, type_coerce from sqlalchemy.orm import Query, aliased from sqlalchemy.sql.elements import ClauseList from sqlalchemy.sql.expression import literal from sqlalchemy.sql.selectable import Select from homeassistant.components.proximity import DOMAIN as PROXIMITY_DOMAIN from homeassistant.components.recorder.models import ( JSON_VARIENT_CAST, JSONB_VARIENT_CAST, EventData, Events, StateAttributes, States, ) from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN CONTINUOUS_DOMAINS = {PROXIMITY_DOMAIN, SENSOR_DOMAIN} CONTINUOUS_ENTITY_ID_LIKE = [f"{domain}.%" for domain in CONTINUOUS_DOMAINS] UNIT_OF_MEASUREMENT_JSON = '"unit_of_measurement":' UNIT_OF_MEASUREMENT_JSON_LIKE = f"%{UNIT_OF_MEASUREMENT_JSON}%" OLD_STATE = aliased(States, name="old_state") class JSONLiteral(JSON): # type: ignore[misc] """Teach SA how to literalize json.""" def literal_processor(self, dialect: str) -> Callable[[Any], str]: """Processor to convert a value to JSON.""" def process(value: Any) -> str: """Dump json.""" return json.dumps(value) return process EVENT_DATA_JSON = type_coerce( EventData.shared_data.cast(JSONB_VARIENT_CAST), JSONLiteral(none_as_null=True) ) OLD_FORMAT_EVENT_DATA_JSON = type_coerce( Events.event_data.cast(JSONB_VARIENT_CAST), JSONLiteral(none_as_null=True) ) SHARED_ATTRS_JSON = type_coerce( StateAttributes.shared_attrs.cast(JSON_VARIENT_CAST), JSON(none_as_null=True) ) OLD_FORMAT_ATTRS_JSON = type_coerce( States.attributes.cast(JSON_VARIENT_CAST), JSON(none_as_null=True) ) PSUEDO_EVENT_STATE_CHANGED = None # Since we don't store event_types and None # and we don't store state_changed in events # we use a NULL for state_changed events # when we synthesize them from the states table # since it avoids another column being sent # in the payload EVENT_COLUMNS = ( Events.event_id.label("event_id"), Events.event_type.label("event_type"), Events.event_data.label("event_data"), Events.time_fired.label("time_fired"), Events.context_id.label("context_id"), Events.context_user_id.label("context_user_id"), Events.context_parent_id.label("context_parent_id"), ) STATE_COLUMNS = ( States.state_id.label("state_id"), States.state.label("state"), States.entity_id.label("entity_id"), SHARED_ATTRS_JSON["icon"].as_string().label("icon"), OLD_FORMAT_ATTRS_JSON["icon"].as_string().label("old_format_icon"), ) STATE_CONTEXT_ONLY_COLUMNS = ( States.state_id.label("state_id"), States.state.label("state"), States.entity_id.label("entity_id"), literal(value=None, type_=sqlalchemy.String).label("icon"), literal(value=None, type_=sqlalchemy.String).label("old_format_icon"), ) EVENT_COLUMNS_FOR_STATE_SELECT = [ literal(value=None, type_=sqlalchemy.Text).label("event_id"), # We use PSUEDO_EVENT_STATE_CHANGED aka None for # state_changed events since it takes up less # space in the response and every row has to be # marked with the event_type literal(value=PSUEDO_EVENT_STATE_CHANGED, type_=sqlalchemy.String).label( "event_type" ), literal(value=None, type_=sqlalchemy.Text).label("event_data"), States.last_updated.label("time_fired"), States.context_id.label("context_id"), States.context_user_id.label("context_user_id"), States.context_parent_id.label("context_parent_id"), literal(value=None, type_=sqlalchemy.Text).label("shared_data"), ] EMPTY_STATE_COLUMNS = ( literal(value=None, type_=sqlalchemy.String).label("state_id"), literal(value=None, type_=sqlalchemy.String).label("state"), literal(value=None, type_=sqlalchemy.String).label("entity_id"), literal(value=None, type_=sqlalchemy.String).label("icon"), literal(value=None, type_=sqlalchemy.String).label("old_format_icon"), ) EVENT_ROWS_NO_STATES = ( *EVENT_COLUMNS, EventData.shared_data.label("shared_data"), *EMPTY_STATE_COLUMNS, ) # Virtual column to tell logbook if it should avoid processing # the event as its only used to link contexts CONTEXT_ONLY = literal("1").label("context_only") NOT_CONTEXT_ONLY = literal(None).label("context_only") def select_events_context_id_subquery( start_day: dt, end_day: dt, event_types: tuple[str, ...], ) -> Select: """Generate the select for a context_id subquery.""" return ( select(Events.context_id) .where((Events.time_fired > start_day) & (Events.time_fired < end_day)) .where(Events.event_type.in_(event_types)) .outerjoin(EventData, (Events.data_id == EventData.data_id)) ) def select_events_context_only() -> Select: """Generate an events query that mark them as for context_only. By marking them as context_only we know they are only for linking context ids and we can avoid processing them. """ return select(*EVENT_ROWS_NO_STATES, CONTEXT_ONLY).outerjoin( EventData, (Events.data_id == EventData.data_id) ) def select_states_context_only() -> Select: """Generate an states query that mark them as for context_only. By marking them as context_only we know they are only for linking context ids and we can avoid processing them. """ return select( *EVENT_COLUMNS_FOR_STATE_SELECT, *STATE_CONTEXT_ONLY_COLUMNS, CONTEXT_ONLY ) def select_events_without_states( start_day: dt, end_day: dt, event_types: tuple[str, ...] ) -> Select: """Generate an events select that does not join states.""" return ( select(*EVENT_ROWS_NO_STATES, NOT_CONTEXT_ONLY) .where((Events.time_fired > start_day) & (Events.time_fired < end_day)) .where(Events.event_type.in_(event_types)) .outerjoin(EventData, (Events.data_id == EventData.data_id)) ) def select_states() -> Select: """Generate a states select that formats the states table as event rows.""" return select( *EVENT_COLUMNS_FOR_STATE_SELECT, *STATE_COLUMNS, NOT_CONTEXT_ONLY, ) def legacy_select_events_context_id( start_day: dt, end_day: dt, context_id: str ) -> Select: """Generate a legacy events context id select that also joins states.""" # This can be removed once we no longer have event_ids in the states table return ( select( *EVENT_COLUMNS, literal(value=None, type_=sqlalchemy.String).label("shared_data"), *STATE_COLUMNS, NOT_CONTEXT_ONLY, ) .outerjoin(States, (Events.event_id == States.event_id)) .where( (States.last_updated == States.last_changed) | States.last_changed.is_(None) ) .where(_not_continuous_entity_matcher()) .outerjoin( StateAttributes, (States.attributes_id == StateAttributes.attributes_id) ) .where((Events.time_fired > start_day) & (Events.time_fired < end_day)) .where(Events.context_id == context_id) ) def apply_states_filters(query: Query, start_day: dt, end_day: dt) -> Query: """Filter states by time range. Filters states that do not have an old state or new state (added / removed) Filters states that are in a continuous domain with a UOM. Filters states that do not have matching last_updated and last_changed. """ return ( query.filter( (States.last_updated > start_day) & (States.last_updated < end_day) ) .outerjoin(OLD_STATE, (States.old_state_id == OLD_STATE.state_id)) .where(_missing_state_matcher()) .where(_not_continuous_entity_matcher()) .where( (States.last_updated == States.last_changed) | States.last_changed.is_(None) ) .outerjoin( StateAttributes, (States.attributes_id == StateAttributes.attributes_id) ) ) def _missing_state_matcher() -> sqlalchemy.and_: # The below removes state change events that do not have # and old_state or the old_state is missing (newly added entities) # or the new_state is missing (removed entities) return sqlalchemy.and_( OLD_STATE.state_id.isnot(None), (States.state != OLD_STATE.state), States.state.isnot(None), ) def _not_continuous_entity_matcher() -> sqlalchemy.or_: """Match non continuous entities.""" return sqlalchemy.or_( _not_continuous_domain_matcher(), sqlalchemy.and_( _continuous_domain_matcher, _not_uom_attributes_matcher() ).self_group(), ) def _not_continuous_domain_matcher() -> sqlalchemy.and_: """Match not continuous domains.""" return sqlalchemy.and_( *[ ~States.entity_id.like(entity_domain) for entity_domain in CONTINUOUS_ENTITY_ID_LIKE ], ).self_group() def _continuous_domain_matcher() -> sqlalchemy.or_: """Match continuous domains.""" return sqlalchemy.or_( *[ States.entity_id.like(entity_domain) for entity_domain in CONTINUOUS_ENTITY_ID_LIKE ], ).self_group() def _not_uom_attributes_matcher() -> ClauseList: """Prefilter ATTR_UNIT_OF_MEASUREMENT as its much faster in sql.""" return ~StateAttributes.shared_attrs.like( UNIT_OF_MEASUREMENT_JSON_LIKE ) | ~States.attributes.like(UNIT_OF_MEASUREMENT_JSON_LIKE)