diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 71795bfa664..385c12f37a4 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -142,12 +142,10 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: hass_config_path=hass.config.path(DEFAULT_DB_FILE) ) exclude = conf[CONF_EXCLUDE] - exclude_t = exclude.get(CONF_EVENT_TYPES, []) - if EVENT_STATE_CHANGED in exclude_t: - _LOGGER.warning( - "State change events are excluded, recorder will not record state changes." - "This will become an error in Home Assistant Core 2022.2" - ) + exclude_event_types: set[str] = set(exclude.get(CONF_EVENT_TYPES, [])) + if EVENT_STATE_CHANGED in exclude_event_types: + _LOGGER.error("State change events cannot be excluded, use a filter instead") + exclude_event_types.remove(EVENT_STATE_CHANGED) instance = hass.data[DATA_INSTANCE] = Recorder( hass=hass, auto_purge=auto_purge, @@ -158,7 +156,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: db_max_retries=db_max_retries, db_retry_wait=db_retry_wait, entity_filter=entity_filter, - exclude_t=exclude_t, + exclude_event_types=exclude_event_types, exclude_attributes_by_domain=exclude_attributes_by_domain, ) instance.async_initialize() diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 9eb1c6c166f..b57498e2987 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -181,7 +181,7 @@ class Recorder(threading.Thread): db_max_retries: int, db_retry_wait: int, entity_filter: Callable[[str], bool], - exclude_t: list[str], + exclude_event_types: set[str], exclude_attributes_by_domain: dict[str, set[str]], ) -> None: """Initialize the recorder.""" @@ -214,7 +214,7 @@ class Recorder(threading.Thread): # it can be used to see if an entity is being recorded and is called # by is_entity_recorder and the sensor recorder. self.entity_filter = entity_filter - self.exclude_t = set(exclude_t) + self.exclude_event_types = exclude_event_types self.schema_version = 0 self._commits_without_expire = 0 @@ -388,7 +388,7 @@ class Recorder(threading.Thread): @callback def _async_event_filter(self, event: Event) -> bool: """Filter events.""" - if event.event_type in self.exclude_t: + if event.event_type in self.exclude_event_types: return False if (entity_id := event.data.get(ATTR_ENTITY_ID)) is None: diff --git a/homeassistant/components/recorder/db_schema.py b/homeassistant/components/recorder/db_schema.py index a310161457b..2fa3746a2c8 100644 --- a/homeassistant/components/recorder/db_schema.py +++ b/homeassistant/components/recorder/db_schema.py @@ -435,7 +435,8 @@ class States(Base): def __repr__(self) -> str: """Return string representation of instance for debugging.""" return ( - f"" diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index bb97448f149..c644a17be0b 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -1,21 +1,20 @@ """Purge old data helper.""" from __future__ import annotations -from collections.abc import Callable, Iterable +from collections.abc import Callable from datetime import datetime from itertools import zip_longest import logging +import time from typing import TYPE_CHECKING from sqlalchemy.engine.row import Row from sqlalchemy.orm.session import Session -from sqlalchemy.sql.expression import distinct -from homeassistant.const import EVENT_STATE_CHANGED import homeassistant.util.dt as dt_util from .const import SQLITE_MAX_BIND_VARS -from .db_schema import Events, StateAttributes, States, StatesMeta +from .db_schema import Events, States, StatesMeta from .models import DatabaseEngine from .queries import ( attributes_ids_exist_in_states, @@ -144,11 +143,9 @@ def _purge_legacy_format( ) = _select_legacy_event_state_and_attributes_and_data_ids_to_purge( session, purge_before ) - if state_ids: - _purge_state_ids(instance, session, state_ids) + _purge_state_ids(instance, session, state_ids) _purge_unused_attributes_ids(instance, session, attributes_ids) - if event_ids: - _purge_event_ids(session, event_ids) + _purge_event_ids(session, event_ids) _purge_unused_data_ids(instance, session, data_ids) return bool(event_ids or state_ids or attributes_ids or data_ids) @@ -448,6 +445,8 @@ def _select_legacy_event_state_and_attributes_and_data_ids_to_purge( def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) -> None: """Disconnect states and delete by state id.""" + if not state_ids: + return # Update old_state_id to NULL before deleting to ensure # the delete does not fail due to a foreign key constraint @@ -559,8 +558,10 @@ def _purge_short_term_statistics( _LOGGER.debug("Deleted %s short term statistics", deleted_rows) -def _purge_event_ids(session: Session, event_ids: Iterable[int]) -> None: +def _purge_event_ids(session: Session, event_ids: set[int]) -> None: """Delete by event id.""" + if not event_ids: + return deleted_rows = session.execute(delete_event_rows(event_ids)) _LOGGER.debug("Deleted %s events", deleted_rows) @@ -619,9 +620,11 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool: _LOGGER.debug("Cleanup filtered data") database_engine = instance.database_engine assert database_engine is not None + now_timestamp = time.time() # Check if excluded entity_ids are in database entity_filter = instance.entity_filter + has_more_states_to_purge = False excluded_metadata_ids: list[str] = [ metadata_id for (metadata_id, entity_id) in session.query( @@ -629,92 +632,123 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool: ).all() if not entity_filter(entity_id) ] - if len(excluded_metadata_ids) > 0: - _purge_filtered_states( - instance, session, excluded_metadata_ids, database_engine + if excluded_metadata_ids: + has_more_states_to_purge = _purge_filtered_states( + instance, session, excluded_metadata_ids, database_engine, now_timestamp ) - return False # Check if excluded event_types are in database - excluded_event_types: list[str] = [ - event_type - for (event_type,) in session.query(distinct(Events.event_type)).all() - if event_type in instance.exclude_t - ] - if len(excluded_event_types) > 0: - _purge_filtered_events(instance, session, excluded_event_types) - return False + has_more_events_to_purge = False + if ( + event_type_to_event_type_ids := instance.event_type_manager.get_many( + instance.exclude_event_types, session + ) + ) and ( + excluded_event_type_ids := [ + event_type_id + for event_type_id in event_type_to_event_type_ids.values() + if event_type_id is not None + ] + ): + has_more_events_to_purge = _purge_filtered_events( + instance, session, excluded_event_type_ids, now_timestamp + ) - return True + # Purge has completed if there are not more state or events to purge + return not (has_more_states_to_purge or has_more_events_to_purge) def _purge_filtered_states( instance: Recorder, session: Session, - excluded_metadata_ids: list[str], + metadata_ids_to_purge: list[str], database_engine: DatabaseEngine, -) -> None: - """Remove filtered states and linked events.""" + purge_before_timestamp: float, +) -> bool: + """Remove filtered states and linked events. + + Return true if all states are purged + """ state_ids: tuple[int, ...] attributes_ids: tuple[int, ...] event_ids: tuple[int, ...] - state_ids, attributes_ids, event_ids = zip( - *( - session.query(States.state_id, States.attributes_id, States.event_id) - .filter(States.metadata_id.in_(excluded_metadata_ids)) - .limit(SQLITE_MAX_BIND_VARS) - .all() - ) + to_purge = list( + session.query(States.state_id, States.attributes_id, States.event_id) + .filter(States.metadata_id.in_(metadata_ids_to_purge)) + .filter(States.last_updated_ts < purge_before_timestamp) + .limit(SQLITE_MAX_BIND_VARS) + .all() ) - filtered_event_ids = [id_ for id_ in event_ids if id_ is not None] + if not to_purge: + return True + state_ids, attributes_ids, event_ids = zip(*to_purge) + filtered_event_ids = {id_ for id_ in event_ids if id_ is not None} _LOGGER.debug( "Selected %s state_ids to remove that should be filtered", len(state_ids) ) _purge_state_ids(instance, session, set(state_ids)) + # These are legacy events that are linked to a state that are no longer + # created but since we did not remove them when we stopped adding new ones + # we will need to purge them here. _purge_event_ids(session, filtered_event_ids) unused_attribute_ids_set = _select_unused_attributes_ids( session, {id_ for id_ in attributes_ids if id_ is not None}, database_engine ) _purge_batch_attributes_ids(instance, session, unused_attribute_ids_set) + return False def _purge_filtered_events( - instance: Recorder, session: Session, excluded_event_types: list[str] -) -> None: - """Remove filtered events and linked states.""" + instance: Recorder, + session: Session, + excluded_event_type_ids: list[int], + purge_before_timestamp: float, +) -> bool: + """Remove filtered events and linked states. + + Return true if all events are purged. + """ database_engine = instance.database_engine assert database_engine is not None - event_ids, data_ids = zip( - *( - session.query(Events.event_id, Events.data_id) - .filter(Events.event_type.in_(excluded_event_types)) - .limit(SQLITE_MAX_BIND_VARS) - .all() - ) + to_purge = list( + session.query(Events.event_id, Events.data_id) + .filter(Events.event_type_id.in_(excluded_event_type_ids)) + .filter(Events.time_fired_ts < purge_before_timestamp) + .limit(SQLITE_MAX_BIND_VARS) + .all() ) + if not to_purge: + return True + event_ids, data_ids = zip(*to_purge) + event_ids_set = set(event_ids) _LOGGER.debug( - "Selected %s event_ids to remove that should be filtered", len(event_ids) + "Selected %s event_ids to remove that should be filtered", len(event_ids_set) ) states: list[Row[tuple[int]]] = ( - session.query(States.state_id).filter(States.event_id.in_(event_ids)).all() + session.query(States.state_id).filter(States.event_id.in_(event_ids_set)).all() ) - state_ids: set[int] = {state.state_id for state in states} - _purge_state_ids(instance, session, state_ids) - _purge_event_ids(session, event_ids) + if states: + # These are legacy states that are linked to an event that are no longer + # created but since we did not remove them when we stopped adding new ones + # we will need to purge them here. + state_ids: set[int] = {state.state_id for state in states} + _purge_state_ids(instance, session, state_ids) + _purge_event_ids(session, event_ids_set) if unused_data_ids_set := _select_unused_event_data_ids( session, set(data_ids), database_engine ): _purge_batch_data_ids(instance, session, unused_data_ids_set) - if EVENT_STATE_CHANGED in excluded_event_types: - session.query(StateAttributes).delete(synchronize_session=False) - instance._state_attributes_ids = {} # pylint: disable=protected-access + return False -@retryable_database_job("purge") -def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) -> bool: +@retryable_database_job("purge_entity_data") +def purge_entity_data( + instance: Recorder, entity_filter: Callable[[str], bool], purge_before: datetime +) -> bool: """Purge states and events of specified entities.""" database_engine = instance.database_engine assert database_engine is not None + purge_before_timestamp = purge_before.timestamp() with session_scope(session=instance.get_session()) as session: selected_metadata_ids: list[str] = [ metadata_id @@ -724,12 +758,18 @@ def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) if entity_filter(entity_id) ] _LOGGER.debug("Purging entity data for %s", selected_metadata_ids) - if len(selected_metadata_ids) > 0: - # Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states - # or events record. - _purge_filtered_states( - instance, session, selected_metadata_ids, database_engine - ) + if not selected_metadata_ids: + return True + + # Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states + # or events record. + if not _purge_filtered_states( + instance, + session, + selected_metadata_ids, + database_engine, + purge_before_timestamp, + ): _LOGGER.debug("Purging entity data hasn't fully completed yet") return False diff --git a/homeassistant/components/recorder/services.py b/homeassistant/components/recorder/services.py index 14337290c9b..e1b2e388d6c 100644 --- a/homeassistant/components/recorder/services.py +++ b/homeassistant/components/recorder/services.py @@ -71,7 +71,8 @@ def _async_register_purge_entities_service( domains = service.data.get(ATTR_DOMAINS, []) entity_globs = service.data.get(ATTR_ENTITY_GLOBS, []) entity_filter = generate_filter(domains, list(entity_ids), [], [], entity_globs) - instance.queue_task(PurgeEntitiesTask(entity_filter)) + purge_before = dt_util.utcnow() + instance.queue_task(PurgeEntitiesTask(entity_filter, purge_before)) hass.services.async_register( DOMAIN, diff --git a/homeassistant/components/recorder/tasks.py b/homeassistant/components/recorder/tasks.py index 17b63aad229..f2ba42bdea7 100644 --- a/homeassistant/components/recorder/tasks.py +++ b/homeassistant/components/recorder/tasks.py @@ -114,13 +114,14 @@ class PurgeEntitiesTask(RecorderTask): """Object to store entity information about purge task.""" entity_filter: Callable[[str], bool] + purge_before: datetime def run(self, instance: Recorder) -> None: """Purge entities from the database.""" - if purge.purge_entity_data(instance, self.entity_filter): + if purge.purge_entity_data(instance, self.entity_filter, self.purge_before): return # Schedule a new purge task if this one didn't finish - instance.queue_task(PurgeEntitiesTask(self.entity_filter)) + instance.queue_task(PurgeEntitiesTask(self.entity_filter, self.purge_before)) @dataclass diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index d6162dd20e2..5355931a76a 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -100,7 +100,7 @@ def _default_recorder(hass): db_max_retries=10, db_retry_wait=3, entity_filter=CONFIG_SCHEMA({DOMAIN: {}}), - exclude_t=[], + exclude_event_types=set(), exclude_attributes_by_domain={}, ) diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index b865af68dfd..6594f0352a5 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -26,6 +26,7 @@ from homeassistant.components.recorder.db_schema import ( StatisticsShortTerm, ) from homeassistant.components.recorder.purge import purge_old_data +from homeassistant.components.recorder.queries import select_event_type_ids from homeassistant.components.recorder.services import ( SERVICE_PURGE, SERVICE_PURGE_ENTITIES, @@ -676,8 +677,8 @@ def _convert_pending_states_to_meta(instance: Recorder, session: Session) -> Non """Convert pending states to use states_metadata.""" entity_ids: set[str] = set() states: set[States] = set() + states_meta_objects: dict[str, StatesMeta] = {} for object in session: - states_meta_objects: dict[str, StatesMeta] = {} if isinstance(object, States): entity_ids.add(object.entity_id) states.add(object) @@ -697,6 +698,33 @@ def _convert_pending_states_to_meta(instance: Recorder, session: Session) -> Non state.states_meta_rel = states_meta_objects[entity_id] +def _convert_pending_events_to_event_types( + instance: Recorder, session: Session +) -> None: + """Convert pending events to use event_type_ids.""" + event_types: set[str] = set() + events: set[Events] = set() + event_types_objects: dict[str, EventTypes] = {} + for object in session: + if isinstance(object, Events): + event_types.add(object.event_type) + events.add(object) + + event_type_to_event_type_ids = instance.event_type_manager.get_many( + event_types, session + ) + + for event in events: + event_type = event.event_type + event.event_type = None + if event_type_id := event_type_to_event_type_ids.get(event_type): + event.event_type_id = event_type_id + continue + if event_type not in event_types_objects: + event_types_objects[event_type] = EventTypes(event_type=event_type) + event.event_type_rel = event_types_objects[event_type] + + @pytest.mark.parametrize("use_sqlite", (True, False), indirect=True) async def test_purge_filtered_states( async_setup_recorder_instance: RecorderInstanceGenerator, @@ -850,12 +878,24 @@ async def test_purge_filtered_states( ) assert states_sensor_excluded.count() == 0 - assert session.query(States).get(72).old_state_id is None - assert session.query(States).get(72).attributes_id == 71 - assert session.query(States).get(73).old_state_id is None - assert session.query(States).get(73).attributes_id == 71 + assert ( + session.query(States).filter(States.state_id == 72).first().old_state_id + is None + ) + assert ( + session.query(States).filter(States.state_id == 72).first().attributes_id + == 71 + ) + assert ( + session.query(States).filter(States.state_id == 73).first().old_state_id + is None + ) + assert ( + session.query(States).filter(States.state_id == 73).first().attributes_id + == 71 + ) - final_keep_state = session.query(States).get(74) + final_keep_state = session.query(States).filter(States.state_id == 74).first() assert final_keep_state.old_state_id == 62 # should have been kept assert final_keep_state.attributes_id == 71 @@ -867,7 +907,7 @@ async def test_purge_filtered_states( await async_wait_purge_done(hass) with session_scope(hass=hass) as session: - final_keep_state = session.query(States).get(74) + final_keep_state = session.query(States).filter(States.state_id == 74).first() assert final_keep_state.old_state_id == 62 # should have been kept assert final_keep_state.attributes_id == 71 @@ -1022,7 +1062,7 @@ async def test_purge_filtered_events( ) -> None: """Test filtered events are purged.""" config: ConfigType = {"exclude": {"event_types": ["EVENT_PURGE"]}} - await async_setup_recorder_instance(hass, config) + instance = await async_setup_recorder_instance(hass, config) def _add_db_entries(hass: HomeAssistant) -> None: with session_scope(hass=hass) as session: @@ -1050,14 +1090,17 @@ async def test_purge_filtered_events( timestamp, event_id, ) + _convert_pending_events_to_event_types(instance, session) service_data = {"keep_days": 10} _add_db_entries(hass) with session_scope(hass=hass) as session: - events_purge = session.query(Events).filter(Events.event_type == "EVENT_PURGE") + events_purge = session.query(Events).filter( + Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",))) + ) events_keep = session.query(Events).filter( - Events.event_type == EVENT_STATE_CHANGED + Events.event_type_id.in_(select_event_type_ids((EVENT_STATE_CHANGED,))) ) states = session.query(States) @@ -1073,9 +1116,11 @@ async def test_purge_filtered_events( await async_wait_purge_done(hass) with session_scope(hass=hass) as session: - events_purge = session.query(Events).filter(Events.event_type == "EVENT_PURGE") + events_purge = session.query(Events).filter( + Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",))) + ) events_keep = session.query(Events).filter( - Events.event_type == EVENT_STATE_CHANGED + Events.event_type_id.in_(select_event_type_ids((EVENT_STATE_CHANGED,))) ) states = session.query(States) assert events_purge.count() == 60 @@ -1094,9 +1139,11 @@ async def test_purge_filtered_events( await async_wait_purge_done(hass) with session_scope(hass=hass) as session: - events_purge = session.query(Events).filter(Events.event_type == "EVENT_PURGE") + events_purge = session.query(Events).filter( + Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",))) + ) events_keep = session.query(Events).filter( - Events.event_type == EVENT_STATE_CHANGED + Events.event_type_id.in_(select_event_type_ids((EVENT_STATE_CHANGED,))) ) states = session.query(States) assert events_purge.count() == 0 @@ -1109,10 +1156,18 @@ async def test_purge_filtered_events_state_changed( hass: HomeAssistant, ) -> None: """Test filtered state_changed events are purged. This should also remove all states.""" - config: ConfigType = {"exclude": {"event_types": [EVENT_STATE_CHANGED]}} + config: ConfigType = { + "exclude": { + "event_types": ["excluded_event"], + "entities": ["sensor.excluded", "sensor.old_format"], + } + } instance = await async_setup_recorder_instance(hass, config) # Assert entity_id is NOT excluded - assert instance.entity_filter("sensor.excluded") is True + assert instance.entity_filter("sensor.excluded") is False + assert instance.entity_filter("sensor.old_format") is False + assert instance.entity_filter("sensor.keep") is True + assert "excluded_event" in instance.exclude_event_types def _add_db_entries(hass: HomeAssistant) -> None: with session_scope(hass=hass) as session: @@ -1167,34 +1222,56 @@ async def test_purge_filtered_events_state_changed( old_state_id=62, # keep ) session.add_all((state_1, state_2, state_3)) + session.add( + Events( + event_id=231, + event_type="excluded_event", + event_data="{}", + origin="LOCAL", + time_fired_ts=dt_util.utc_to_timestamp(timestamp), + ) + ) + session.add( + States( + entity_id="sensor.old_format", + state="remove", + attributes="{}", + last_changed_ts=dt_util.utc_to_timestamp(timestamp), + last_updated_ts=dt_util.utc_to_timestamp(timestamp), + ) + ) + _convert_pending_events_to_event_types(instance, session) + _convert_pending_states_to_meta(instance, session) service_data = {"keep_days": 10, "apply_filter": True} _add_db_entries(hass) with session_scope(hass=hass) as session: - events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP") + events_keep = session.query(Events).filter( + Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",))) + ) events_purge = session.query(Events).filter( - Events.event_type == EVENT_STATE_CHANGED + Events.event_type_id.in_(select_event_type_ids(("excluded_event",))) ) states = session.query(States) assert events_keep.count() == 10 - assert events_purge.count() == 60 - assert states.count() == 63 + assert events_purge.count() == 1 + assert states.count() == 64 await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data) await hass.async_block_till_done() - await async_recorder_block_till_done(hass) - await async_wait_purge_done(hass) - - await async_recorder_block_till_done(hass) - await async_wait_purge_done(hass) + for _ in range(4): + await async_recorder_block_till_done(hass) + await async_wait_purge_done(hass) with session_scope(hass=hass) as session: - events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP") + events_keep = session.query(Events).filter( + Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",))) + ) events_purge = session.query(Events).filter( - Events.event_type == EVENT_STATE_CHANGED + Events.event_type_id.in_(select_event_type_ids(("excluded_event",))) ) states = session.query(States) @@ -1202,9 +1279,18 @@ async def test_purge_filtered_events_state_changed( assert events_purge.count() == 0 assert states.count() == 3 - assert session.query(States).get(61).old_state_id is None - assert session.query(States).get(62).old_state_id is None - assert session.query(States).get(63).old_state_id == 62 # should have been kept + assert ( + session.query(States).filter(States.state_id == 61).first().old_state_id + is None + ) + assert ( + session.query(States).filter(States.state_id == 62).first().old_state_id + is None + ) + assert ( + session.query(States).filter(States.state_id == 63).first().old_state_id + == 62 + ) # should have been kept async def test_purge_entities( @@ -1330,7 +1416,7 @@ async def test_purge_entities( _add_purge_records(hass) - # Confirm calling service without arguments matches all records (default filter behaviour) + # Confirm calling service without arguments matches all records (default filter behavior) with session_scope(hass=hass) as session: states = session.query(States) assert states.count() == 190