Improve ZHA startup performance (#70111)

* Remove semaphores and background mains init

* additional logging

* correct cache usage and update tests
This commit is contained in:
David F. Mulcahey 2022-04-27 11:24:26 -04:00 committed by GitHub
parent 02ddfd513a
commit 361119d5c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 76 additions and 45 deletions

View file

@ -2,7 +2,6 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from collections.abc import Coroutine
from typing import TYPE_CHECKING, Any, TypeVar from typing import TYPE_CHECKING, Any, TypeVar
import zigpy.endpoint import zigpy.endpoint
@ -50,7 +49,6 @@ class Channels:
self._pools: list[ChannelPool] = [] self._pools: list[ChannelPool] = []
self._power_config: base.ZigbeeChannel | None = None self._power_config: base.ZigbeeChannel | None = None
self._identify: base.ZigbeeChannel | None = None self._identify: base.ZigbeeChannel | None = None
self._semaphore = asyncio.Semaphore(3)
self._unique_id = str(zha_device.ieee) self._unique_id = str(zha_device.ieee)
self._zdo_channel = base.ZDOChannel(zha_device.device.endpoints[0], zha_device) self._zdo_channel = base.ZDOChannel(zha_device.device.endpoints[0], zha_device)
self._zha_device = zha_device self._zha_device = zha_device
@ -82,11 +80,6 @@ class Channels:
if self._identify is None: if self._identify is None:
self._identify = channel self._identify = channel
@property
def semaphore(self) -> asyncio.Semaphore:
"""Return semaphore for concurrent tasks."""
return self._semaphore
@property @property
def zdo_channel(self) -> base.ZDOChannel: def zdo_channel(self) -> base.ZDOChannel:
"""Return ZDO channel.""" """Return ZDO channel."""
@ -336,13 +329,8 @@ class ChannelPool:
async def _execute_channel_tasks(self, func_name: str, *args: Any) -> None: async def _execute_channel_tasks(self, func_name: str, *args: Any) -> None:
"""Add a throttled channel task and swallow exceptions.""" """Add a throttled channel task and swallow exceptions."""
async def _throttle(coro: Coroutine[Any, Any, None]) -> None:
async with self._channels.semaphore:
return await coro
channels = [*self.claimed_channels.values(), *self.client_channels.values()] channels = [*self.claimed_channels.values(), *self.client_channels.values()]
tasks = [_throttle(getattr(ch, func_name)(*args)) for ch in channels] tasks = [getattr(ch, func_name)(*args) for ch in channels]
results = await asyncio.gather(*tasks, return_exceptions=True) results = await asyncio.gather(*tasks, return_exceptions=True)
for channel, outcome in zip(channels, results): for channel, outcome in zip(channels, results):
if isinstance(outcome, Exception): if isinstance(outcome, Exception):

View file

@ -310,11 +310,14 @@ class ZigbeeChannel(LogMixin):
"""Set cluster binding and attribute reporting.""" """Set cluster binding and attribute reporting."""
if not self._ch_pool.skip_configuration: if not self._ch_pool.skip_configuration:
if self.BIND: if self.BIND:
self.debug("Performing cluster binding")
await self.bind() await self.bind()
if self.cluster.is_server: if self.cluster.is_server:
self.debug("Configuring cluster attribute reporting")
await self.configure_reporting() await self.configure_reporting()
ch_specific_cfg = getattr(self, "async_configure_channel_specific", None) ch_specific_cfg = getattr(self, "async_configure_channel_specific", None)
if ch_specific_cfg: if ch_specific_cfg:
self.debug("Performing channel specific configuration")
await ch_specific_cfg() await ch_specific_cfg()
self.debug("finished channel configuration") self.debug("finished channel configuration")
else: else:
@ -325,6 +328,7 @@ class ZigbeeChannel(LogMixin):
async def async_initialize(self, from_cache: bool) -> None: async def async_initialize(self, from_cache: bool) -> None:
"""Initialize channel.""" """Initialize channel."""
if not from_cache and self._ch_pool.skip_configuration: if not from_cache and self._ch_pool.skip_configuration:
self.debug("Skipping channel initialization")
self._status = ChannelStatus.INITIALIZED self._status = ChannelStatus.INITIALIZED
return return
@ -334,12 +338,23 @@ class ZigbeeChannel(LogMixin):
uncached.extend([cfg["attr"] for cfg in self.REPORT_CONFIG]) uncached.extend([cfg["attr"] for cfg in self.REPORT_CONFIG])
if cached: if cached:
await self._get_attributes(True, cached, from_cache=True) self.debug("initializing cached channel attributes: %s", cached)
await self._get_attributes(
True, cached, from_cache=True, only_cache=from_cache
)
if uncached: if uncached:
await self._get_attributes(True, uncached, from_cache=from_cache) self.debug(
"initializing uncached channel attributes: %s - from cache[%s]",
uncached,
from_cache,
)
await self._get_attributes(
True, uncached, from_cache=from_cache, only_cache=from_cache
)
ch_specific_init = getattr(self, "async_initialize_channel_specific", None) ch_specific_init = getattr(self, "async_initialize_channel_specific", None)
if ch_specific_init: if ch_specific_init:
self.debug("Performing channel specific initialization: %s", uncached)
await ch_specific_init(from_cache=from_cache) await ch_specific_init(from_cache=from_cache)
self.debug("finished channel initialization") self.debug("finished channel initialization")
@ -407,7 +422,7 @@ class ZigbeeChannel(LogMixin):
self._cluster, self._cluster,
[attribute], [attribute],
allow_cache=from_cache, allow_cache=from_cache,
only_cache=from_cache and not self._ch_pool.is_mains_powered, only_cache=from_cache,
manufacturer=manufacturer, manufacturer=manufacturer,
) )
return result.get(attribute) return result.get(attribute)
@ -417,6 +432,7 @@ class ZigbeeChannel(LogMixin):
raise_exceptions: bool, raise_exceptions: bool,
attributes: list[int | str], attributes: list[int | str],
from_cache: bool = True, from_cache: bool = True,
only_cache: bool = True,
) -> dict[int | str, Any]: ) -> dict[int | str, Any]:
"""Get the values for a list of attributes.""" """Get the values for a list of attributes."""
manufacturer = None manufacturer = None
@ -428,17 +444,18 @@ class ZigbeeChannel(LogMixin):
result = {} result = {}
while chunk: while chunk:
try: try:
self.debug("Reading attributes in chunks: %s", chunk)
read, _ = await self.cluster.read_attributes( read, _ = await self.cluster.read_attributes(
attributes, attributes,
allow_cache=from_cache, allow_cache=from_cache,
only_cache=from_cache and not self._ch_pool.is_mains_powered, only_cache=only_cache,
manufacturer=manufacturer, manufacturer=manufacturer,
) )
result.update(read) result.update(read)
except (asyncio.TimeoutError, zigpy.exceptions.ZigbeeException) as ex: except (asyncio.TimeoutError, zigpy.exceptions.ZigbeeException) as ex:
self.debug( self.debug(
"failed to get attributes '%s' on '%s' cluster: %s", "failed to get attributes '%s' on '%s' cluster: %s",
attributes, chunk,
self.cluster.ep_attribute, self.cluster.ep_attribute,
str(ex), str(ex),
) )

View file

@ -463,7 +463,9 @@ class PowerConfigurationChannel(ZigbeeChannel):
"battery_size", "battery_size",
"battery_quantity", "battery_quantity",
] ]
return self.get_attributes(attributes, from_cache=from_cache) return self.get_attributes(
attributes, from_cache=from_cache, only_cache=from_cache
)
@registries.ZIGBEE_CHANNEL_REGISTRY.register(general.PowerProfile.cluster_id) @registries.ZIGBEE_CHANNEL_REGISTRY.register(general.PowerProfile.cluster_id)

View file

@ -97,7 +97,7 @@ class ElectricalMeasurementChannel(ZigbeeChannel):
for a in self.REPORT_CONFIG for a in self.REPORT_CONFIG
if a["attr"] not in self.cluster.unsupported_attributes if a["attr"] not in self.cluster.unsupported_attributes
] ]
result = await self.get_attributes(attrs, from_cache=False) result = await self.get_attributes(attrs, from_cache=False, only_cache=False)
if result: if result:
for attr, value in result.items(): for attr, value in result.items():
self.async_send_signal( self.async_send_signal(

View file

@ -351,11 +351,15 @@ class ZHADevice(LogMixin):
if self.is_coordinator: if self.is_coordinator:
return return
if self.last_seen is None: if self.last_seen is None:
self.debug("last_seen is None, marking the device unavailable")
self.update_available(False) self.update_available(False)
return return
difference = time.time() - self.last_seen difference = time.time() - self.last_seen
if difference < self.consider_unavailable_time: if difference < self.consider_unavailable_time:
self.debug(
"Device seen - marking the device available and resetting counter"
)
self.update_available(True) self.update_available(True)
self._checkins_missed_count = 0 self._checkins_missed_count = 0
return return
@ -365,6 +369,10 @@ class ZHADevice(LogMixin):
or self.manufacturer == "LUMI" or self.manufacturer == "LUMI"
or not self._channels.pools or not self._channels.pools
): ):
self.debug(
"last_seen is %s seconds ago and ping attempts have been exhausted, marking the device unavailable",
difference,
)
self.update_available(False) self.update_available(False)
return return
@ -386,13 +394,23 @@ class ZHADevice(LogMixin):
def update_available(self, available: bool) -> None: def update_available(self, available: bool) -> None:
"""Update device availability and signal entities.""" """Update device availability and signal entities."""
self.debug(
"Update device availability - device available: %s - new availability: %s - changed: %s",
self.available,
available,
self.available ^ available,
)
availability_changed = self.available ^ available availability_changed = self.available ^ available
self.available = available self.available = available
if availability_changed and available: if availability_changed and available:
# reinit channels then signal entities # reinit channels then signal entities
self.debug(
"Device availability changed and device became available, reinitializing channels"
)
self.hass.async_create_task(self._async_became_available()) self.hass.async_create_task(self._async_became_available())
return return
if availability_changed and not available: if availability_changed and not available:
self.debug("Device availability changed and device became unavailable")
self._channels.zha_send_event( self._channels.zha_send_event(
{ {
"device_event_type": "device_offline", "device_event_type": "device_offline",

View file

@ -239,29 +239,25 @@ class ZHAGateway:
async def async_initialize_devices_and_entities(self) -> None: async def async_initialize_devices_and_entities(self) -> None:
"""Initialize devices and load entities.""" """Initialize devices and load entities."""
semaphore = asyncio.Semaphore(2)
async def _throttle(zha_device: ZHADevice, cached: bool) -> None: _LOGGER.warning("Loading all devices")
async with semaphore:
await zha_device.async_initialize(from_cache=cached)
_LOGGER.debug("Loading battery powered devices")
await asyncio.gather( await asyncio.gather(
*( *(dev.async_initialize(from_cache=True) for dev in self.devices.values())
_throttle(dev, cached=True)
for dev in self.devices.values()
if not dev.is_mains_powered
)
) )
_LOGGER.debug("Loading mains powered devices") async def fetch_updated_state() -> None:
await asyncio.gather( """Fetch updated state for mains powered devices."""
*( _LOGGER.warning("Fetching current state for mains powered devices")
_throttle(dev, cached=False) await asyncio.gather(
for dev in self.devices.values() *(
if dev.is_mains_powered dev.async_initialize(from_cache=False)
for dev in self.devices.values()
if dev.is_mains_powered
)
) )
)
# background the fetching of state for mains powered devices
asyncio.create_task(fetch_updated_state())
def device_joined(self, device: zigpy.device.Device) -> None: def device_joined(self, device: zigpy.device.Device) -> None:
"""Handle device joined. """Handle device joined.

View file

@ -488,7 +488,7 @@ class Light(BaseLight, ZhaEntity):
] ]
results = await self._color_channel.get_attributes( results = await self._color_channel.get_attributes(
attributes, from_cache=False attributes, from_cache=False, only_cache=False
) )
if (color_mode := results.get("color_mode")) is not None: if (color_mode := results.get("color_mode")) is not None:

View file

@ -177,6 +177,7 @@ def zha_device_joined(hass, setup_zha):
"""Return a newly joined ZHA device.""" """Return a newly joined ZHA device."""
async def _zha_device(zigpy_dev): async def _zha_device(zigpy_dev):
zigpy_dev.last_seen = time.time()
await setup_zha() await setup_zha()
zha_gateway = common.get_zha_gateway(hass) zha_gateway = common.get_zha_gateway(hass)
await zha_gateway.async_device_initialized(zigpy_dev) await zha_gateway.async_device_initialized(zigpy_dev)

View file

@ -106,7 +106,7 @@ def _send_time_changed(hass, seconds):
@patch( @patch(
"homeassistant.components.zha.core.channels.general.BasicChannel.async_initialize", "homeassistant.components.zha.core.channels.general.BasicChannel.async_initialize",
new=mock.MagicMock(), new=mock.AsyncMock(),
) )
async def test_check_available_success( async def test_check_available_success(
hass, device_with_basic_channel, zha_device_restored hass, device_with_basic_channel, zha_device_restored
@ -160,7 +160,7 @@ async def test_check_available_success(
@patch( @patch(
"homeassistant.components.zha.core.channels.general.BasicChannel.async_initialize", "homeassistant.components.zha.core.channels.general.BasicChannel.async_initialize",
new=mock.MagicMock(), new=mock.AsyncMock(),
) )
async def test_check_available_unsuccessful( async def test_check_available_unsuccessful(
hass, device_with_basic_channel, zha_device_restored hass, device_with_basic_channel, zha_device_restored
@ -203,7 +203,7 @@ async def test_check_available_unsuccessful(
@patch( @patch(
"homeassistant.components.zha.core.channels.general.BasicChannel.async_initialize", "homeassistant.components.zha.core.channels.general.BasicChannel.async_initialize",
new=mock.MagicMock(), new=mock.AsyncMock(),
) )
async def test_check_available_no_basic_channel( async def test_check_available_no_basic_channel(
hass, device_without_basic_channel, zha_device_restored, caplog hass, device_without_basic_channel, zha_device_restored, caplog

View file

@ -471,7 +471,10 @@ async def test_fan_update_entity(
assert hass.states.get(entity_id).attributes[ATTR_PERCENTAGE] == 0 assert hass.states.get(entity_id).attributes[ATTR_PERCENTAGE] == 0
assert hass.states.get(entity_id).attributes[ATTR_PRESET_MODE] is None assert hass.states.get(entity_id).attributes[ATTR_PRESET_MODE] is None
assert hass.states.get(entity_id).attributes[ATTR_PERCENTAGE_STEP] == 100 / 3 assert hass.states.get(entity_id).attributes[ATTR_PERCENTAGE_STEP] == 100 / 3
assert cluster.read_attributes.await_count == 2 if zha_device_joined_restored.name == "zha_device_joined":
assert cluster.read_attributes.await_count == 2
else:
assert cluster.read_attributes.await_count == 4
await async_setup_component(hass, "homeassistant", {}) await async_setup_component(hass, "homeassistant", {})
await hass.async_block_till_done() await hass.async_block_till_done()
@ -480,7 +483,10 @@ async def test_fan_update_entity(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True "homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
) )
assert hass.states.get(entity_id).state == STATE_OFF assert hass.states.get(entity_id).state == STATE_OFF
assert cluster.read_attributes.await_count == 3 if zha_device_joined_restored.name == "zha_device_joined":
assert cluster.read_attributes.await_count == 3
else:
assert cluster.read_attributes.await_count == 5
cluster.PLUGGED_ATTR_READS = {"fan_mode": 1} cluster.PLUGGED_ATTR_READS = {"fan_mode": 1}
await hass.services.async_call( await hass.services.async_call(
@ -490,4 +496,7 @@ async def test_fan_update_entity(
assert hass.states.get(entity_id).attributes[ATTR_PERCENTAGE] == 33 assert hass.states.get(entity_id).attributes[ATTR_PERCENTAGE] == 33
assert hass.states.get(entity_id).attributes[ATTR_PRESET_MODE] is None assert hass.states.get(entity_id).attributes[ATTR_PRESET_MODE] is None
assert hass.states.get(entity_id).attributes[ATTR_PERCENTAGE_STEP] == 100 / 3 assert hass.states.get(entity_id).attributes[ATTR_PERCENTAGE_STEP] == 100 / 3
assert cluster.read_attributes.await_count == 4 if zha_device_joined_restored.name == "zha_device_joined":
assert cluster.read_attributes.await_count == 4
else:
assert cluster.read_attributes.await_count == 6