Add MySQL index hints to logbook (#71864)

* Add MySQL index hints to logbook

* fix mysql query planner
This commit is contained in:
J. Nick Koston 2022-05-14 17:01:36 -05:00 committed by GitHub
parent 68632cb267
commit 0584e84c30
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 76 additions and 30 deletions

View file

@ -7,7 +7,7 @@ from typing import Any
import sqlalchemy import sqlalchemy
from sqlalchemy import lambda_stmt, select, union_all from sqlalchemy import lambda_stmt, select, union_all
from sqlalchemy.orm import aliased from sqlalchemy.orm import Query, aliased
from sqlalchemy.sql.expression import literal from sqlalchemy.sql.expression import literal
from sqlalchemy.sql.lambdas import StatementLambdaElement from sqlalchemy.sql.lambdas import StatementLambdaElement
from sqlalchemy.sql.selectable import Select from sqlalchemy.sql.selectable import Select
@ -15,6 +15,8 @@ from sqlalchemy.sql.selectable import Select
from homeassistant.components.history import Filters from homeassistant.components.history import Filters
from homeassistant.components.proximity import DOMAIN as PROXIMITY_DOMAIN from homeassistant.components.proximity import DOMAIN as PROXIMITY_DOMAIN
from homeassistant.components.recorder.models import ( from homeassistant.components.recorder.models import (
ENTITY_ID_LAST_UPDATED_INDEX,
LAST_UPDATED_INDEX,
EventData, EventData,
Events, Events,
StateAttributes, StateAttributes,
@ -31,6 +33,8 @@ CONTINUOUS_ENTITY_ID_LIKE = [f"{domain}.%" for domain in CONTINUOUS_DOMAINS]
UNIT_OF_MEASUREMENT_JSON = '"unit_of_measurement":' UNIT_OF_MEASUREMENT_JSON = '"unit_of_measurement":'
UNIT_OF_MEASUREMENT_JSON_LIKE = f"%{UNIT_OF_MEASUREMENT_JSON}%" UNIT_OF_MEASUREMENT_JSON_LIKE = f"%{UNIT_OF_MEASUREMENT_JSON}%"
OLD_STATE = aliased(States, name="old_state")
EVENT_COLUMNS = ( EVENT_COLUMNS = (
Events.event_id.label("event_id"), Events.event_id.label("event_id"),
@ -126,7 +130,7 @@ def _select_entities_context_ids_sub_query(
_select_events_context_id_subquery(start_day, end_day, event_types).where( _select_events_context_id_subquery(start_day, end_day, event_types).where(
_apply_event_entity_id_matchers(entity_ids) _apply_event_entity_id_matchers(entity_ids)
), ),
select(States.context_id) _apply_entities_hints(select(States.context_id))
.filter((States.last_updated > start_day) & (States.last_updated < end_day)) .filter((States.last_updated > start_day) & (States.last_updated < end_day))
.where(States.entity_id.in_(entity_ids)), .where(States.entity_id.in_(entity_ids)),
).c.context_id ).c.context_id
@ -156,7 +160,7 @@ def _entities_stmt(
) )
stmt = stmt.add_criteria( stmt = stmt.add_criteria(
lambda s: s.where(_apply_event_entity_id_matchers(entity_ids)).union_all( lambda s: s.where(_apply_event_entity_id_matchers(entity_ids)).union_all(
_select_states(start_day, end_day).where(States.entity_id.in_(entity_ids)), _states_query_for_entitiy_ids(start_day, end_day, entity_ids),
_select_events_context_only().where( _select_events_context_only().where(
Events.context_id.in_( Events.context_id.in_(
_select_entities_context_ids_sub_query( _select_entities_context_ids_sub_query(
@ -192,7 +196,7 @@ def _select_entity_context_ids_sub_query(
Events.event_data.like(entity_id_like) Events.event_data.like(entity_id_like)
| EventData.shared_data.like(entity_id_like) | EventData.shared_data.like(entity_id_like)
), ),
select(States.context_id) _apply_entities_hints(select(States.context_id))
.filter((States.last_updated > start_day) & (States.last_updated < end_day)) .filter((States.last_updated > start_day) & (States.last_updated < end_day))
.where(States.entity_id == entity_id), .where(States.entity_id == entity_id),
).c.context_id ).c.context_id
@ -214,7 +218,7 @@ def _single_entity_stmt(
| EventData.shared_data.like(entity_id_like) | EventData.shared_data.like(entity_id_like)
) )
.union_all( .union_all(
_select_states(start_day, end_day).where(States.entity_id == entity_id), _states_query_for_entitiy_id(start_day, end_day, entity_id),
_select_events_context_only().where( _select_events_context_only().where(
Events.context_id.in_( Events.context_id.in_(
_select_entity_context_ids_sub_query( _select_entity_context_ids_sub_query(
@ -244,15 +248,15 @@ def _all_stmt(
# are gone from the database remove the # are gone from the database remove the
# _legacy_select_events_context_id() # _legacy_select_events_context_id()
stmt += lambda s: s.where(Events.context_id == context_id).union_all( stmt += lambda s: s.where(Events.context_id == context_id).union_all(
_select_states(start_day, end_day).where(States.context_id == context_id), _states_query_for_context_id(start_day, end_day, context_id),
_legacy_select_events_context_id(start_day, end_day, context_id), _legacy_select_events_context_id(start_day, end_day, context_id),
) )
elif entity_filter is not None: elif entity_filter is not None:
stmt += lambda s: s.union_all( stmt += lambda s: s.union_all(
_select_states(start_day, end_day).where(entity_filter) _states_query_for_all(start_day, end_day).where(entity_filter)
) )
else: else:
stmt += lambda s: s.union_all(_select_states(start_day, end_day)) stmt += lambda s: s.union_all(_states_query_for_all(start_day, end_day))
stmt += lambda s: s.order_by(Events.time_fired) stmt += lambda s: s.order_by(Events.time_fired)
return stmt return stmt
@ -294,27 +298,67 @@ def _select_events_without_states(
) )
def _select_states(start_day: dt, end_day: dt) -> Select: def _states_query_for_context_id(start_day: dt, end_day: dt, context_id: str) -> Query:
return _apply_states_filters(_select_states(), start_day, end_day).where(
States.context_id == context_id
)
def _states_query_for_entitiy_id(start_day: dt, end_day: dt, entity_id: str) -> Query:
return _apply_states_filters(
_apply_entities_hints(_select_states()), start_day, end_day
).where(States.entity_id == entity_id)
def _states_query_for_entitiy_ids(
start_day: dt, end_day: dt, entity_ids: list[str]
) -> Query:
return _apply_states_filters(
_apply_entities_hints(_select_states()), start_day, end_day
).where(States.entity_id.in_(entity_ids))
def _states_query_for_all(start_day: dt, end_day: dt) -> Query:
return _apply_states_filters(_apply_all_hints(_select_states()), start_day, end_day)
def _select_states() -> Select:
"""Generate a states select that formats the states table as event rows.""" """Generate a states select that formats the states table as event rows."""
old_state = aliased(States, name="old_state") return select(
literal(value=None, type_=sqlalchemy.Text).label("event_id"),
literal(value=EVENT_STATE_CHANGED, type_=sqlalchemy.String).label("event_type"),
literal(value=None, type_=sqlalchemy.Text).label("event_data"),
States.last_updated.label("time_fired"),
States.context_id.label("context_id"),
States.context_user_id.label("context_user_id"),
States.context_parent_id.label("context_parent_id"),
literal(value=None, type_=sqlalchemy.Text).label("shared_data"),
*STATE_COLUMNS,
NOT_CONTEXT_ONLY,
)
def _apply_all_hints(query: Query) -> Query:
"""Force mysql to use the right index on large selects."""
return query.with_hint(
States, f"FORCE INDEX ({LAST_UPDATED_INDEX})", dialect_name="mysql"
)
def _apply_entities_hints(query: Query) -> Query:
"""Force mysql to use the right index on large selects."""
return query.with_hint(
States, f"FORCE INDEX ({ENTITY_ID_LAST_UPDATED_INDEX})", dialect_name="mysql"
)
def _apply_states_filters(query: Query, start_day: dt, end_day: dt) -> Query:
return ( return (
select( query.filter(
literal(value=None, type_=sqlalchemy.Text).label("event_id"), (States.last_updated > start_day) & (States.last_updated < end_day)
literal(value=EVENT_STATE_CHANGED, type_=sqlalchemy.String).label(
"event_type"
),
literal(value=None, type_=sqlalchemy.Text).label("event_data"),
States.last_updated.label("time_fired"),
States.context_id.label("context_id"),
States.context_user_id.label("context_user_id"),
States.context_parent_id.label("context_parent_id"),
literal(value=None, type_=sqlalchemy.Text).label("shared_data"),
*STATE_COLUMNS,
NOT_CONTEXT_ONLY,
) )
.filter((States.last_updated > start_day) & (States.last_updated < end_day)) .outerjoin(OLD_STATE, (States.old_state_id == OLD_STATE.state_id))
.outerjoin(old_state, (States.old_state_id == old_state.state_id)) .where(_missing_state_matcher())
.where(_missing_state_matcher(old_state))
.where(_not_continuous_entity_matcher()) .where(_not_continuous_entity_matcher())
.where( .where(
(States.last_updated == States.last_changed) | States.last_changed.is_(None) (States.last_updated == States.last_changed) | States.last_changed.is_(None)
@ -325,13 +369,13 @@ def _select_states(start_day: dt, end_day: dt) -> Select:
) )
def _missing_state_matcher(old_state: States) -> sqlalchemy.and_: def _missing_state_matcher() -> sqlalchemy.and_:
# The below removes state change events that do not have # The below removes state change events that do not have
# and old_state or the old_state is missing (newly added entities) # and old_state or the old_state is missing (newly added entities)
# or the new_state is missing (removed entities) # or the new_state is missing (removed entities)
return sqlalchemy.and_( return sqlalchemy.and_(
old_state.state_id.isnot(None), OLD_STATE.state_id.isnot(None),
(States.state != old_state.state), (States.state != OLD_STATE.state),
States.state.isnot(None), States.state.isnot(None),
) )

View file

@ -88,6 +88,8 @@ TABLES_TO_CHECK = [
TABLE_SCHEMA_CHANGES, TABLE_SCHEMA_CHANGES,
] ]
LAST_UPDATED_INDEX = "ix_states_last_updated"
ENTITY_ID_LAST_UPDATED_INDEX = "ix_states_entity_id_last_updated"
EMPTY_JSON_OBJECT = "{}" EMPTY_JSON_OBJECT = "{}"
@ -235,7 +237,7 @@ class States(Base): # type: ignore[misc,valid-type]
__table_args__ = ( __table_args__ = (
# Used for fetching the state of entities at a specific time # Used for fetching the state of entities at a specific time
# (get_states in history.py) # (get_states in history.py)
Index("ix_states_entity_id_last_updated", "entity_id", "last_updated"), Index(ENTITY_ID_LAST_UPDATED_INDEX, "entity_id", "last_updated"),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"}, {"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
) )
__tablename__ = TABLE_STATES __tablename__ = TABLE_STATES