Significantly speed up recorder event listener (#93532)

* Significantly speed up recorder event listener

This code is called every time an event happens since it
subscribes to all events. Its our most frequently called
listener out of the box.

It used to have a seperate filter function but it was
later combined after core had some previous refactoring.

It was never optimized after that happened.

This change reduces the run time by ~70%

* decruft
This commit is contained in:
J. Nick Koston 2023-05-25 20:44:12 -05:00 committed by GitHub
parent 10aa49be2b
commit d9b43fc43f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -299,9 +299,39 @@ class Recorder(threading.Thread):
@callback
def async_initialize(self) -> None:
"""Initialize the recorder."""
entity_filter = self.entity_filter
exclude_event_types = self.exclude_event_types
queue_put = self._queue.put_nowait
event_task = EventTask
@callback
def _event_listener(event: Event) -> None:
"""Listen for new events and put them in the process queue."""
if event.event_type in exclude_event_types:
return
if (entity_id := event.data.get(ATTR_ENTITY_ID)) is None:
queue_put(event_task(event))
return
if isinstance(entity_id, str):
if entity_filter(entity_id):
queue_put(event_task(event))
return
if isinstance(entity_id, list):
for eid in entity_id:
if entity_filter(eid):
queue_put(event_task(event))
return
return
# Unknown what it is.
queue_put(event_task(event))
self._event_listener = self.hass.bus.async_listen(
MATCH_ALL,
self.event_listener,
_event_listener,
run_immediately=True,
)
self._queue_watcher = async_track_time_interval(
@ -412,27 +442,6 @@ class Recorder(threading.Thread):
self._periodic_listener()
self._periodic_listener = None
@callback
def _async_event_filter(self, event: Event) -> bool:
"""Filter events."""
if event.event_type in self.exclude_event_types:
return False
if (entity_id := event.data.get(ATTR_ENTITY_ID)) is None:
return True
if isinstance(entity_id, str):
return self.entity_filter(entity_id)
if isinstance(entity_id, list):
for eid in entity_id:
if self.entity_filter(eid):
return True
return False
# Unknown what it is.
return True
async def _async_close(self, event: Event) -> None:
"""Empty the queue if its still present at close."""
@ -1257,12 +1266,6 @@ class Recorder(threading.Thread):
_LOGGER.debug("Sending keepalive")
self.event_session.connection().scalar(select(1))
@callback
def event_listener(self, event: Event) -> None:
"""Listen for new events and put them in the process queue."""
if self._async_event_filter(event):
self.queue_task(EventTask(event))
async def async_block_till_done(self) -> None:
"""Async version of block_till_done."""
if self._queue.empty() and not self._event_session_has_pending_writes: