Automatically retry lost/timed out LIFX requests (#91157)

This commit is contained in:
J. Nick Koston 2023-04-16 02:27:17 -10:00 committed by GitHub
parent 3ff03eef46
commit 9625444989
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 278 additions and 140 deletions

View file

@ -18,12 +18,19 @@ from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.typing import DiscoveryInfoType
from .const import _LOGGER, CONF_SERIAL, DOMAIN, TARGET_ANY
from .const import (
_LOGGER,
CONF_SERIAL,
DEFAULT_ATTEMPTS,
DOMAIN,
OVERALL_TIMEOUT,
TARGET_ANY,
)
from .discovery import async_discover_devices
from .util import (
async_entry_is_legacy,
async_execute_lifx,
async_get_legacy_entry,
async_multi_execute_lifx_with_retries,
formatted_serial,
lifx_features,
mac_matches_serial_number,
@ -225,13 +232,15 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
# get_version required for lifx_features()
# get_label required to log the name of the device
# get_group required to populate suggested areas
messages = await asyncio.gather(
*[
async_execute_lifx(device.get_hostfirmware),
async_execute_lifx(device.get_version),
async_execute_lifx(device.get_label),
async_execute_lifx(device.get_group),
]
messages = await async_multi_execute_lifx_with_retries(
[
device.get_hostfirmware,
device.get_version,
device.get_label,
device.get_group,
],
DEFAULT_ATTEMPTS,
OVERALL_TIMEOUT,
)
except asyncio.TimeoutError:
return None

View file

@ -7,11 +7,22 @@ DOMAIN = "lifx"
TARGET_ANY = "00:00:00:00:00:00"
DISCOVERY_INTERVAL = 10
MESSAGE_TIMEOUT = 1.65
MESSAGE_RETRIES = 5
OVERALL_TIMEOUT = 9
# The number of seconds before we will no longer accept a response
# to a message and consider it invalid
MESSAGE_TIMEOUT = 18
# Disable the retries in the library since they are not spaced out
# enough to account for WiFi and UDP dropouts
MESSAGE_RETRIES = 1
OVERALL_TIMEOUT = 15
UNAVAILABLE_GRACE = 90
# The number of times to retry a request message
DEFAULT_ATTEMPTS = 5
# The maximum time to wait for a bulb to respond to an update
MAX_UPDATE_TIME = 90
# The number of tries to send each request message to a bulb during an update
MAX_ATTEMPTS_PER_UPDATE_REQUEST_MESSAGE = 5
CONF_LABEL = "label"
CONF_SERIAL = "serial"
@ -50,4 +61,5 @@ INFRARED_BRIGHTNESS_VALUES_MAP = {
}
DATA_LIFX_MANAGER = "lifx_manager"
_LOGGER = logging.getLogger(__package__)

View file

@ -28,20 +28,25 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import (
_LOGGER,
ATTR_REMAINING,
DEFAULT_ATTEMPTS,
DOMAIN,
IDENTIFY_WAVEFORM,
MAX_ATTEMPTS_PER_UPDATE_REQUEST_MESSAGE,
MAX_UPDATE_TIME,
MESSAGE_RETRIES,
MESSAGE_TIMEOUT,
OVERALL_TIMEOUT,
TARGET_ANY,
UNAVAILABLE_GRACE,
)
from .util import (
async_execute_lifx,
async_multi_execute_lifx_with_retries,
get_real_mac_addr,
infrared_brightness_option_to_value,
infrared_brightness_value_to_option,
@ -49,7 +54,6 @@ from .util import (
)
LIGHT_UPDATE_INTERVAL = 10
SENSOR_UPDATE_INTERVAL = 30
REQUEST_REFRESH_DELAY = 0.35
LIFX_IDENTIFY_DELAY = 3.0
RSSI_DBM_FW = AwesomeVersion("2.77")
@ -186,60 +190,84 @@ class LIFXUpdateCoordinator(DataUpdateCoordinator[None]):
platform, DOMAIN, f"{self.serial_number}_{key}"
)
async def _async_populate_device_info(self) -> None:
"""Populate device info."""
methods: list[Callable] = []
device = self.device
if self.device.host_firmware_version is None:
methods.append(device.get_hostfirmware)
if self.device.product is None:
methods.append(device.get_version)
if self.device.group is None:
methods.append(device.get_group)
assert methods, "Device info already populated"
await async_multi_execute_lifx_with_retries(
methods, DEFAULT_ATTEMPTS, OVERALL_TIMEOUT
)
@callback
def _async_build_color_zones_update_requests(self) -> list[Callable]:
"""Build a color zones update request."""
device = self.device
return [
partial(device.get_color_zones, start_index=zone)
for zone in range(0, len(device.color_zones), 8)
]
async def _async_update_data(self) -> None:
"""Fetch all device data from the api."""
async with self.lock:
if self.device.host_firmware_version is None:
self.device.get_hostfirmware()
if self.device.product is None:
self.device.get_version()
if self.device.group is None:
self.device.get_group()
device = self.device
if (
device.host_firmware_version is None
or device.product is None
or device.group is None
):
await self._async_populate_device_info()
response = await async_execute_lifx(self.device.get_color)
num_zones = len(device.color_zones) if device.color_zones is not None else 0
features = lifx_features(self.device)
is_extended_multizone = features["extended_multizone"]
is_legacy_multizone = not is_extended_multizone and features["multizone"]
update_rssi = self._update_rssi
methods: list[Callable] = [self.device.get_color]
if update_rssi:
methods.append(self.device.get_wifiinfo)
if is_extended_multizone:
methods.append(self.device.get_extended_color_zones)
elif is_legacy_multizone:
methods.extend(self._async_build_color_zones_update_requests())
if is_extended_multizone or is_legacy_multizone:
methods.append(self.device.get_multizone_effect)
if features["hev"]:
methods.append(self.device.get_hev_cycle)
if features["infrared"]:
methods.append(self.device.get_infrared)
if self.device.product is None:
raise UpdateFailed(
f"Failed to fetch get version from device: {self.device.ip_addr}"
)
responses = await async_multi_execute_lifx_with_retries(
methods, MAX_ATTEMPTS_PER_UPDATE_REQUEST_MESSAGE, MAX_UPDATE_TIME
)
# device.mac_addr is not the mac_address, its the serial number
if device.mac_addr == TARGET_ANY:
device.mac_addr = responses[0].target_addr
# device.mac_addr is not the mac_address, its the serial number
if self.device.mac_addr == TARGET_ANY:
self.device.mac_addr = response.target_addr
if update_rssi:
# We always send the rssi request second
self._rssi = int(floor(10 * log10(responses[1].signal) + 0.5))
if self._update_rssi is True:
await self.async_update_rssi()
# Update extended multizone devices
if lifx_features(self.device)["extended_multizone"]:
await self.async_get_extended_color_zones()
await self.async_get_multizone_effect()
# use legacy methods for older devices
elif lifx_features(self.device)["multizone"]:
await self.async_get_color_zones()
await self.async_get_multizone_effect()
if lifx_features(self.device)["hev"]:
await self.async_get_hev_cycle()
if lifx_features(self.device)["infrared"]:
await async_execute_lifx(self.device.get_infrared)
if is_extended_multizone or is_legacy_multizone:
self.active_effect = FirmwareEffect[self.device.effect.get("effect", "OFF")]
if is_legacy_multizone and num_zones != len(device.color_zones):
# The number of zones has changed so we need
# to update the zones again. This happens rarely.
await self.async_get_color_zones()
async def async_get_color_zones(self) -> None:
"""Get updated color information for each zone."""
zone = 0
top = 1
while zone < top:
# Each get_color_zones can update 8 zones at once
resp = await async_execute_lifx(
partial(self.device.get_color_zones, start_index=zone)
)
zone += 8
top = resp.count
# We only await multizone responses so don't ask for just one
if zone == top - 1:
zone -= 1
await async_multi_execute_lifx_with_retries(
self._async_build_color_zones_update_requests(),
DEFAULT_ATTEMPTS,
OVERALL_TIMEOUT,
)
async def async_get_extended_color_zones(self) -> None:
"""Get updated color information for all zones."""
@ -323,11 +351,6 @@ class LIFXUpdateCoordinator(DataUpdateCoordinator[None]):
)
)
async def async_get_multizone_effect(self) -> None:
"""Update the device firmware effect running state."""
await async_execute_lifx(self.device.get_multizone_effect)
self.active_effect = FirmwareEffect[self.device.effect.get("effect", "OFF")]
async def async_set_multizone_effect(
self,
effect: str,
@ -415,22 +438,12 @@ class LIFXUpdateCoordinator(DataUpdateCoordinator[None]):
self._update_rssi = True
return _async_disable_rssi_updates
async def async_update_rssi(self) -> None:
"""Update RSSI value."""
resp = await async_execute_lifx(self.device.get_wifiinfo)
self._rssi = int(floor(10 * log10(resp.signal) + 0.5))
def async_get_hev_cycle_state(self) -> bool | None:
"""Return the current HEV cycle state."""
if self.device.hev_cycle is None:
return None
return bool(self.device.hev_cycle.get(ATTR_REMAINING, 0) > 0)
async def async_get_hev_cycle(self) -> None:
"""Update the HEV cycle status from a LIFX Clean bulb."""
if lifx_features(self.device)["hev"]:
await async_execute_lifx(self.device.get_hev_cycle)
async def async_set_hev_cycle_state(self, enable: bool, duration: int = 0) -> None:
"""Start or stop an HEV cycle on a LIFX Clean bulb."""
if lifx_features(self.device)["hev"]:

View file

@ -206,61 +206,60 @@ class LIFXLight(LIFXEntity, LightEntity):
async def set_state(self, **kwargs: Any) -> None:
"""Set a color on the light and turn it on/off."""
self.coordinator.async_set_updated_data(None)
async with self.coordinator.lock:
# Cancel any pending refreshes
bulb = self.bulb
# Cancel any pending refreshes
bulb = self.bulb
await self.effects_conductor.stop([bulb])
await self.effects_conductor.stop([bulb])
if ATTR_EFFECT in kwargs:
await self.default_effect(**kwargs)
return
if ATTR_EFFECT in kwargs:
await self.default_effect(**kwargs)
return
if ATTR_INFRARED in kwargs:
infrared_entity_id = self.coordinator.async_get_entity_id(
Platform.SELECT, INFRARED_BRIGHTNESS
)
_LOGGER.warning(
(
"The 'infrared' attribute of 'lifx.set_state' is deprecated:"
" call 'select.select_option' targeting '%s' instead"
),
infrared_entity_id,
)
bulb.set_infrared(convert_8_to_16(kwargs[ATTR_INFRARED]))
if ATTR_INFRARED in kwargs:
infrared_entity_id = self.coordinator.async_get_entity_id(
Platform.SELECT, INFRARED_BRIGHTNESS
)
_LOGGER.warning(
(
"The 'infrared' attribute of 'lifx.set_state' is deprecated:"
" call 'select.select_option' targeting '%s' instead"
),
infrared_entity_id,
)
bulb.set_infrared(convert_8_to_16(kwargs[ATTR_INFRARED]))
if ATTR_TRANSITION in kwargs:
fade = int(kwargs[ATTR_TRANSITION] * 1000)
else:
fade = 0
if ATTR_TRANSITION in kwargs:
fade = int(kwargs[ATTR_TRANSITION] * 1000)
else:
fade = 0
# These are both False if ATTR_POWER is not set
power_on = kwargs.get(ATTR_POWER, False)
power_off = not kwargs.get(ATTR_POWER, True)
# These are both False if ATTR_POWER is not set
power_on = kwargs.get(ATTR_POWER, False)
power_off = not kwargs.get(ATTR_POWER, True)
hsbk = find_hsbk(self.hass, **kwargs)
hsbk = find_hsbk(self.hass, **kwargs)
if not self.is_on:
if power_off:
await self.set_power(False)
# If fading on with color, set color immediately
if hsbk and power_on:
await self.set_color(hsbk, kwargs)
await self.set_power(True, duration=fade)
elif hsbk:
await self.set_color(hsbk, kwargs, duration=fade)
elif power_on:
await self.set_power(True, duration=fade)
else:
if power_on:
await self.set_power(True)
if hsbk:
await self.set_color(hsbk, kwargs, duration=fade)
if power_off:
await self.set_power(False, duration=fade)
if not self.is_on:
if power_off:
await self.set_power(False)
# If fading on with color, set color immediately
if hsbk and power_on:
await self.set_color(hsbk, kwargs)
await self.set_power(True, duration=fade)
elif hsbk:
await self.set_color(hsbk, kwargs, duration=fade)
elif power_on:
await self.set_power(True, duration=fade)
else:
if power_on:
await self.set_power(True)
if hsbk:
await self.set_color(hsbk, kwargs, duration=fade)
if power_off:
await self.set_power(False, duration=fade)
# Avoid state ping-pong by holding off updates as the state settles
await asyncio.sleep(LIFX_STATE_SETTLE_DELAY)
# Avoid state ping-pong by holding off updates as the state settles
await asyncio.sleep(LIFX_STATE_SETTLE_DELAY)
# Update when the transition starts and ends
await self.update_during_transition(fade)

View file

@ -4,12 +4,12 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable
from functools import partial
from typing import Any
from aiolifx import products
from aiolifx.aiolifx import Light
from aiolifx.message import Message
import async_timeout
from awesomeversion import AwesomeVersion
from homeassistant.components.light import (
@ -28,7 +28,13 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
import homeassistant.util.color as color_util
from .const import _LOGGER, DOMAIN, INFRARED_BRIGHTNESS_VALUES_MAP, OVERALL_TIMEOUT
from .const import (
_LOGGER,
DEFAULT_ATTEMPTS,
DOMAIN,
INFRARED_BRIGHTNESS_VALUES_MAP,
OVERALL_TIMEOUT,
)
FIX_MAC_FW = AwesomeVersion("3.70")
@ -177,21 +183,61 @@ def mac_matches_serial_number(mac_addr: str, serial_number: str) -> bool:
async def async_execute_lifx(method: Callable) -> Message:
"""Execute a lifx coroutine and wait for a response."""
future: asyncio.Future[Message] = asyncio.Future()
"""Execute a lifx callback method and wait for a response."""
return (
await async_multi_execute_lifx_with_retries(
[method], DEFAULT_ATTEMPTS, OVERALL_TIMEOUT
)
)[0]
def _callback(bulb: Light, message: Message) -> None:
if not future.done():
# The future will get canceled out from under
# us by async_timeout when we hit the OVERALL_TIMEOUT
async def async_multi_execute_lifx_with_retries(
methods: list[Callable], attempts: int, overall_timeout: int
) -> list[Message]:
"""Execute multiple lifx callback methods with retries and wait for a response.
This functional will the overall timeout by the number of attempts and
wait for each method to return a result. If we don't get a result
within the split timeout, we will send all methods that did not generate
a response again.
If we don't get a result after all attempts, we will raise an
asyncio.TimeoutError exception.
"""
loop = asyncio.get_running_loop()
futures: list[asyncio.Future] = [loop.create_future() for _ in methods]
def _callback(
bulb: Light, message: Message | None, future: asyncio.Future[Message]
) -> None:
if message and not future.done():
future.set_result(message)
method(callb=_callback)
result = None
timeout_per_attempt = overall_timeout / attempts
async with async_timeout.timeout(OVERALL_TIMEOUT):
result = await future
for _ in range(attempts):
for idx, method in enumerate(methods):
future = futures[idx]
if not future.done():
method(callb=partial(_callback, future=future))
if result is None:
raise asyncio.TimeoutError("No response from LIFX bulb")
return result
_, pending = await asyncio.wait(futures, timeout=timeout_per_attempt)
if not pending:
break
results: list[Message] = []
failed: list[str] = []
for idx, future in enumerate(futures):
if not future.done() or not (result := future.result()):
method = methods[idx]
failed.append(str(getattr(method, "__name__", method)))
else:
results.append(result)
if failed:
failed_methods = ", ".join(failed)
raise asyncio.TimeoutError(
f"{failed_methods} timed out after {attempts} attempts"
)
return results

View file

@ -3,6 +3,8 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from homeassistant.components.lifx import config_flow, coordinator, util
from tests.common import mock_device_registry, mock_registry
@ -34,6 +36,17 @@ def lifx_mock_get_source_ip(mock_get_source_ip):
"""Mock network util's async_get_source_ip."""
@pytest.fixture(autouse=True)
def lifx_no_wait_for_timeouts():
"""Avoid waiting for timeouts in tests."""
with patch.object(util, "OVERALL_TIMEOUT", 0), patch.object(
config_flow, "OVERALL_TIMEOUT", 0
), patch.object(coordinator, "OVERALL_TIMEOUT", 0), patch.object(
coordinator, "MAX_UPDATE_TIME", 0
):
yield
@pytest.fixture(autouse=True)
def lifx_mock_async_get_ipv4_broadcast_addresses():
"""Mock network util's async_get_ipv4_broadcast_addresses."""

View file

@ -546,9 +546,11 @@ async def test_suggested_area(hass: HomeAssistant) -> None:
self.bulb = bulb
self.lifx_group = kwargs.get("lifx_group")
def __call__(self, *args, **kwargs):
def __call__(self, callb=None, *args, **kwargs):
"""Call command."""
self.bulb.group = self.lifx_group
if callb:
callb(self.bulb, self.lifx_group)
config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "1.2.3.4"}, unique_id=SERIAL

View file

@ -363,7 +363,7 @@ async def test_light_strip(hass: HomeAssistant) -> None:
)
# set a one zone
assert len(bulb.set_power.calls) == 2
assert len(bulb.get_color_zones.calls) == 2
assert len(bulb.get_color_zones.calls) == 1
assert len(bulb.set_color.calls) == 0
call_dict = bulb.set_color_zones.calls[0][1]
call_dict.pop("callb")
@ -1124,7 +1124,7 @@ async def test_config_zoned_light_strip_fails(hass: HomeAssistant) -> None:
entity_id = "light.my_bulb"
class MockFailingLifxCommand:
"""Mock a lifx command that fails on the 3rd try."""
"""Mock a lifx command that fails on the 2nd try."""
def __init__(self, bulb, **kwargs):
"""Init command."""
@ -1134,7 +1134,7 @@ async def test_config_zoned_light_strip_fails(hass: HomeAssistant) -> None:
def __call__(self, callb=None, *args, **kwargs):
"""Call command."""
self.call_count += 1
response = None if self.call_count >= 3 else MockMessage()
response = None if self.call_count >= 2 else MockMessage()
if callb:
callb(self.bulb, response)
@ -1152,6 +1152,50 @@ async def test_config_zoned_light_strip_fails(hass: HomeAssistant) -> None:
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
async def test_legacy_zoned_light_strip(hass: HomeAssistant) -> None:
"""Test we handle failure to update zones."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: IP_ADDRESS}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
light_strip = _mocked_light_strip()
entity_id = "light.my_bulb"
class MockPopulateLifxZonesCommand:
"""Mock populating the number of zones."""
def __init__(self, bulb, **kwargs):
"""Init command."""
self.bulb = bulb
self.call_count = 0
def __call__(self, callb=None, *args, **kwargs):
"""Call command."""
self.call_count += 1
self.bulb.color_zones = [None] * 12
if callb:
callb(self.bulb, MockMessage())
get_color_zones_mock = MockPopulateLifxZonesCommand(light_strip)
light_strip.get_color_zones = get_color_zones_mock
with _patch_discovery(device=light_strip), _patch_device(device=light_strip):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_registry = er.async_get(hass)
assert entity_registry.async_get(entity_id).unique_id == SERIAL
assert hass.states.get(entity_id).state == STATE_OFF
# 1 to get the number of zones
# 2 get populate the zones
assert get_color_zones_mock.call_count == 3
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=30))
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OFF
# 2 get populate the zones
assert get_color_zones_mock.call_count == 5
async def test_white_light_fails(hass: HomeAssistant) -> None:
"""Test we handle failure to power on off."""
already_migrated_config_entry = MockConfigEntry(