Fix missing historical context data in logbook for MySQL and PostgreSQL (#73011)

This commit is contained in:
J. Nick Koston 2022-06-04 09:54:10 -10:00 committed by GitHub
parent 0a2a166860
commit 7ac7af094f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -5,7 +5,7 @@ from collections.abc import Callable, Iterable
import json
from typing import Any
from sqlalchemy import JSON, Column, Text, cast, not_, or_
from sqlalchemy import Column, Text, cast, not_, or_
from sqlalchemy.sql.elements import ClauseList
from homeassistant.const import CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_INCLUDE
@ -16,6 +16,7 @@ from .models import ENTITY_ID_IN_EVENT, OLD_ENTITY_ID_IN_EVENT, States
DOMAIN = "history"
HISTORY_FILTERS = "history_filters"
JSON_NULL = json.dumps(None)
GLOB_TO_SQL_CHARS = {
ord("*"): "%",
@ -196,7 +197,17 @@ class Filters:
"""Generate the entity filter query."""
_encoder = json.dumps
return or_(
(ENTITY_ID_IN_EVENT == JSON.NULL) & (OLD_ENTITY_ID_IN_EVENT == JSON.NULL),
# sqlalchemy's SQLite json implementation always
# wraps everything with JSON_QUOTE so it resolves to 'null'
# when its empty
#
# For MySQL and PostgreSQL it will resolve to a literal
# NULL when its empty
#
((ENTITY_ID_IN_EVENT == JSON_NULL) | ENTITY_ID_IN_EVENT.is_(None))
& (
(OLD_ENTITY_ID_IN_EVENT == JSON_NULL) | OLD_ENTITY_ID_IN_EVENT.is_(None)
),
self._generate_filter_for_columns(
(ENTITY_ID_IN_EVENT, OLD_ENTITY_ID_IN_EVENT), _encoder
).self_group(),
@ -208,8 +219,11 @@ def _globs_to_like(
) -> ClauseList:
"""Translate glob to sql."""
matchers = [
cast(column, Text()).like(
encoder(glob_str).translate(GLOB_TO_SQL_CHARS), escape="\\"
(
column.is_not(None)
& cast(column, Text()).like(
encoder(glob_str).translate(GLOB_TO_SQL_CHARS), escape="\\"
)
)
for glob_str in glob_strs
for column in columns
@ -221,7 +235,10 @@ def _entity_matcher(
entity_ids: Iterable[str], columns: Iterable[Column], encoder: Callable[[Any], Any]
) -> ClauseList:
matchers = [
cast(column, Text()).in_([encoder(entity_id) for entity_id in entity_ids])
(
column.is_not(None)
& cast(column, Text()).in_([encoder(entity_id) for entity_id in entity_ids])
)
for column in columns
]
return or_(*matchers) if matchers else or_(False)
@ -231,7 +248,7 @@ def _domain_matcher(
domains: Iterable[str], columns: Iterable[Column], encoder: Callable[[Any], Any]
) -> ClauseList:
matchers = [
cast(column, Text()).like(encoder(f"{domain}.%"))
(column.is_not(None) & cast(column, Text()).like(encoder(f"{domain}.%")))
for domain in domains
for column in columns
]