From 28f6e79385092d263c1b99a8c28c1190923f7de8 Mon Sep 17 00:00:00 2001 From: Paulus Schoutsen Date: Tue, 28 Apr 2020 14:31:16 -0700 Subject: [PATCH] Parallelize collections helper (#34783) --- homeassistant/helpers/collection.py | 40 +++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/homeassistant/helpers/collection.py b/homeassistant/helpers/collection.py index e720887eb70..06c86d3aa1c 100644 --- a/homeassistant/helpers/collection.py +++ b/homeassistant/helpers/collection.py @@ -1,5 +1,6 @@ """Helper to deal with YAML + storage.""" from abc import ABC, abstractmethod +import asyncio import logging from typing import Any, Awaitable, Callable, Dict, List, Optional, cast @@ -107,8 +108,9 @@ class ObservableCollection(ABC): async def notify_change(self, change_type: str, item_id: str, item: dict) -> None: """Notify listeners of a change.""" self.logger.debug("%s %s: %s", change_type, item_id, item) - for listener in self.listeners: - await listener(change_type, item_id, item) + await asyncio.gather( + *[listener(change_type, item_id, item) for listener in self.listeners] + ) class YamlCollection(ObservableCollection): @@ -118,6 +120,8 @@ class YamlCollection(ObservableCollection): """Load the YAML collection. Overrides existing data.""" old_ids = set(self.data) + tasks = [] + for item in data: item_id = item[CONF_ID] @@ -131,11 +135,15 @@ class YamlCollection(ObservableCollection): event = CHANGE_ADDED self.data[item_id] = item - await self.notify_change(event, item_id, item) + tasks.append(self.notify_change(event, item_id, item)) for item_id in old_ids: + tasks.append( + self.notify_change(CHANGE_REMOVED, item_id, self.data.pop(item_id)) + ) - await self.notify_change(CHANGE_REMOVED, item_id, self.data.pop(item_id)) + if tasks: + await asyncio.gather(*tasks) class StorageCollection(ObservableCollection): @@ -169,7 +177,13 @@ class StorageCollection(ObservableCollection): for item in raw_storage["items"]: self.data[item[CONF_ID]] = item - await self.notify_change(CHANGE_ADDED, item[CONF_ID], item) + + await asyncio.gather( + *[ + self.notify_change(CHANGE_ADDED, item[CONF_ID], item) + for item in raw_storage["items"] + ] + ) @abstractmethod async def _process_create_data(self, data: dict) -> dict: @@ -240,8 +254,12 @@ class IDLessCollection(ObservableCollection): async def async_load(self, data: List[dict]) -> None: """Load the collection. Overrides existing data.""" - for item_id, item in list(self.data.items()): - await self.notify_change(CHANGE_REMOVED, item_id, item) + await asyncio.gather( + *[ + self.notify_change(CHANGE_REMOVED, item_id, item) + for item_id, item in list(self.data.items()) + ] + ) self.data.clear() @@ -250,7 +268,13 @@ class IDLessCollection(ObservableCollection): item_id = f"fakeid-{self.counter}" self.data[item_id] = item - await self.notify_change(CHANGE_ADDED, item_id, item) + + await asyncio.gather( + *[ + self.notify_change(CHANGE_ADDED, item_id, item) + for item_id, item in self.data.items() + ] + ) @callback