diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 67d3bff3b2a..5023393dc5e 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -299,9 +299,39 @@ class Recorder(threading.Thread): @callback def async_initialize(self) -> None: """Initialize the recorder.""" + entity_filter = self.entity_filter + exclude_event_types = self.exclude_event_types + queue_put = self._queue.put_nowait + event_task = EventTask + + @callback + def _event_listener(event: Event) -> None: + """Listen for new events and put them in the process queue.""" + if event.event_type in exclude_event_types: + return + + if (entity_id := event.data.get(ATTR_ENTITY_ID)) is None: + queue_put(event_task(event)) + return + + if isinstance(entity_id, str): + if entity_filter(entity_id): + queue_put(event_task(event)) + return + + if isinstance(entity_id, list): + for eid in entity_id: + if entity_filter(eid): + queue_put(event_task(event)) + return + return + + # Unknown what it is. + queue_put(event_task(event)) + self._event_listener = self.hass.bus.async_listen( MATCH_ALL, - self.event_listener, + _event_listener, run_immediately=True, ) self._queue_watcher = async_track_time_interval( @@ -412,27 +442,6 @@ class Recorder(threading.Thread): self._periodic_listener() self._periodic_listener = None - @callback - def _async_event_filter(self, event: Event) -> bool: - """Filter events.""" - if event.event_type in self.exclude_event_types: - return False - - if (entity_id := event.data.get(ATTR_ENTITY_ID)) is None: - return True - - if isinstance(entity_id, str): - return self.entity_filter(entity_id) - - if isinstance(entity_id, list): - for eid in entity_id: - if self.entity_filter(eid): - return True - return False - - # Unknown what it is. - return True - async def _async_close(self, event: Event) -> None: """Empty the queue if its still present at close.""" @@ -1257,12 +1266,6 @@ class Recorder(threading.Thread): _LOGGER.debug("Sending keepalive") self.event_session.connection().scalar(select(1)) - @callback - def event_listener(self, event: Event) -> None: - """Listen for new events and put them in the process queue.""" - if self._async_event_filter(event): - self.queue_task(EventTask(event)) - async def async_block_till_done(self) -> None: """Async version of block_till_done.""" if self._queue.empty() and not self._event_session_has_pending_writes: