From 0639681991a0d8cd0235ddc023ea5cb5c883c57a Mon Sep 17 00:00:00 2001 From: Jc2k Date: Wed, 10 Aug 2022 18:56:34 +0100 Subject: [PATCH] Add new Bluetooth coordinator helper for polling mostly passive devices (#76549) --- .../bluetooth/active_update_coordinator.py | 141 +++++++ .../test_active_update_coordinator.py | 348 ++++++++++++++++++ 2 files changed, 489 insertions(+) create mode 100644 homeassistant/components/bluetooth/active_update_coordinator.py create mode 100644 tests/components/bluetooth/test_active_update_coordinator.py diff --git a/homeassistant/components/bluetooth/active_update_coordinator.py b/homeassistant/components/bluetooth/active_update_coordinator.py new file mode 100644 index 00000000000..1ebd26f8203 --- /dev/null +++ b/homeassistant/components/bluetooth/active_update_coordinator.py @@ -0,0 +1,141 @@ +"""A Bluetooth passive coordinator that collects data from advertisements but can also poll.""" +from __future__ import annotations + +from collections.abc import Callable, Coroutine +import logging +import time +from typing import Any, Generic, TypeVar + +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.debounce import Debouncer + +from . import BluetoothChange, BluetoothScanningMode, BluetoothServiceInfoBleak +from .passive_update_processor import PassiveBluetoothProcessorCoordinator + +POLL_DEFAULT_COOLDOWN = 10 +POLL_DEFAULT_IMMEDIATE = True + +_T = TypeVar("_T") + + +class ActiveBluetoothProcessorCoordinator( + Generic[_T], PassiveBluetoothProcessorCoordinator[_T] +): + """ + A coordinator that parses passive data from advertisements but can also poll. + + Every time an advertisement is received, needs_poll_method is called to work + out if a poll is needed. This should return True if it is and False if it is + not needed. + + def needs_poll_method(svc_info: BluetoothServiceInfoBleak, last_poll: float | None) -> bool: + return True + + If there has been no poll since HA started, `last_poll` will be None. Otherwise it is + the number of seconds since one was last attempted. + + If a poll is needed, the coordinator will call poll_method. This is a coroutine. + It should return the same type of data as your update_method. The expectation is that + data from advertisements and from polling are being parsed and fed into a shared + object that represents the current state of the device. + + async def poll_method(svc_info: BluetoothServiceInfoBleak) -> YourDataType: + return YourDataType(....) + + BluetoothServiceInfoBleak.device contains a BLEDevice. You should use this in + your poll function, as it is the most efficient way to get a BleakClient. + """ + + def __init__( + self, + hass: HomeAssistant, + logger: logging.Logger, + *, + address: str, + mode: BluetoothScanningMode, + update_method: Callable[[BluetoothServiceInfoBleak], _T], + needs_poll_method: Callable[[BluetoothServiceInfoBleak, float | None], bool], + poll_method: Callable[ + [BluetoothServiceInfoBleak], + Coroutine[Any, Any, _T], + ] + | None = None, + poll_debouncer: Debouncer[Coroutine[Any, Any, None]] | None = None, + ) -> None: + """Initialize the processor.""" + super().__init__(hass, logger, address, mode, update_method) + + self._needs_poll_method = needs_poll_method + self._poll_method = poll_method + self._last_poll: float | None = None + self.last_poll_successful = True + + # We keep the last service info in case the poller needs to refer to + # e.g. its BLEDevice + self._last_service_info: BluetoothServiceInfoBleak | None = None + + if poll_debouncer is None: + poll_debouncer = Debouncer( + hass, + logger, + cooldown=POLL_DEFAULT_COOLDOWN, + immediate=POLL_DEFAULT_IMMEDIATE, + function=self._async_poll, + ) + else: + poll_debouncer.function = self._async_poll + + self._debounced_poll = poll_debouncer + + def needs_poll(self, service_info: BluetoothServiceInfoBleak) -> bool: + """Return true if time to try and poll.""" + poll_age: float | None = None + if self._last_poll: + poll_age = time.monotonic() - self._last_poll + return self._needs_poll_method(service_info, poll_age) + + async def _async_poll_data( + self, last_service_info: BluetoothServiceInfoBleak + ) -> _T: + """Fetch the latest data from the source.""" + if self._poll_method is None: + raise NotImplementedError("Poll method not implemented") + return await self._poll_method(last_service_info) + + async def _async_poll(self) -> None: + """Poll the device to retrieve any extra data.""" + assert self._last_service_info + + try: + update = await self._async_poll_data(self._last_service_info) + except Exception: # pylint: disable=broad-except + if self.last_poll_successful: + self.logger.exception("%s: Failure while polling", self.address) + self.last_poll_successful = False + return + finally: + self._last_poll = time.monotonic() + + if not self.last_poll_successful: + self.logger.debug("%s: Polling recovered") + self.last_poll_successful = True + + for processor in self._processors: + processor.async_handle_update(update) + + @callback + def _async_handle_bluetooth_event( + self, + service_info: BluetoothServiceInfoBleak, + change: BluetoothChange, + ) -> None: + """Handle a Bluetooth event.""" + super()._async_handle_bluetooth_event(service_info, change) + + self._last_service_info = service_info + + # See if its time to poll + # We use bluetooth events to trigger the poll so that we scan as soon as + # possible after a device comes online or back in range, if a poll is due + if self.needs_poll(service_info): + self.hass.async_create_task(self._debounced_poll.async_call()) diff --git a/tests/components/bluetooth/test_active_update_coordinator.py b/tests/components/bluetooth/test_active_update_coordinator.py new file mode 100644 index 00000000000..24ad96c523e --- /dev/null +++ b/tests/components/bluetooth/test_active_update_coordinator.py @@ -0,0 +1,348 @@ +"""Tests for the Bluetooth integration PassiveBluetoothDataUpdateCoordinator.""" +from __future__ import annotations + +import asyncio +import logging +from unittest.mock import MagicMock, call, patch + +from homeassistant.components.bluetooth import ( + DOMAIN, + BluetoothChange, + BluetoothScanningMode, + BluetoothServiceInfoBleak, +) +from homeassistant.components.bluetooth.active_update_coordinator import ( + ActiveBluetoothProcessorCoordinator, +) +from homeassistant.core import HomeAssistant +from homeassistant.helpers.debounce import Debouncer +from homeassistant.helpers.service_info.bluetooth import BluetoothServiceInfo +from homeassistant.setup import async_setup_component + +_LOGGER = logging.getLogger(__name__) + + +GENERIC_BLUETOOTH_SERVICE_INFO = BluetoothServiceInfo( + name="Generic", + address="aa:bb:cc:dd:ee:ff", + rssi=-95, + manufacturer_data={ + 1: b"\x01\x01\x01\x01\x01\x01\x01\x01", + }, + service_data={}, + service_uuids=[], + source="local", +) + + +async def test_basic_usage(hass: HomeAssistant, mock_bleak_scanner_start): + """Test basic usage of the ActiveBluetoothProcessorCoordinator.""" + await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) + + def _update_method(service_info: BluetoothServiceInfoBleak): + return {"testdata": 0} + + def _poll_needed(*args, **kwargs): + return True + + async def _poll(*args, **kwargs): + return {"testdata": 1} + + coordinator = ActiveBluetoothProcessorCoordinator( + hass, + _LOGGER, + address="aa:bb:cc:dd:ee:ff", + mode=BluetoothScanningMode.ACTIVE, + update_method=_update_method, + needs_poll_method=_poll_needed, + poll_method=_poll, + ) + assert coordinator.available is False # no data yet + saved_callback = None + + processor = MagicMock() + coordinator.async_register_processor(processor) + async_handle_update = processor.async_handle_update + + def _async_register_callback(_hass, _callback, _matcher, _mode): + nonlocal saved_callback + saved_callback = _callback + return lambda: None + + with patch( + "homeassistant.components.bluetooth.update_coordinator.async_register_callback", + _async_register_callback, + ): + cancel = coordinator.async_start() + + assert saved_callback is not None + + saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT) + await hass.async_block_till_done() + + assert coordinator.available is True + + # async_handle_update should have been called twice + # The first time, it was passed the data from parsing the advertisement + # The second time, it was passed the data from polling + assert len(async_handle_update.mock_calls) == 2 + assert async_handle_update.mock_calls[0] == call({"testdata": 0}) + assert async_handle_update.mock_calls[1] == call({"testdata": 1}) + + cancel() + + +async def test_poll_can_be_skipped(hass: HomeAssistant, mock_bleak_scanner_start): + """Test need_poll callback works and can skip a poll if its not needed.""" + await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) + + flag = True + + def _update_method(service_info: BluetoothServiceInfoBleak): + return {"testdata": None} + + def _poll_needed(*args, **kwargs): + nonlocal flag + return flag + + async def _poll(*args, **kwargs): + return {"testdata": flag} + + coordinator = ActiveBluetoothProcessorCoordinator( + hass, + _LOGGER, + address="aa:bb:cc:dd:ee:ff", + mode=BluetoothScanningMode.ACTIVE, + update_method=_update_method, + needs_poll_method=_poll_needed, + poll_method=_poll, + poll_debouncer=Debouncer( + hass, + _LOGGER, + cooldown=0, + immediate=True, + ), + ) + assert coordinator.available is False # no data yet + saved_callback = None + + processor = MagicMock() + coordinator.async_register_processor(processor) + async_handle_update = processor.async_handle_update + + def _async_register_callback(_hass, _callback, _matcher, _mode): + nonlocal saved_callback + saved_callback = _callback + return lambda: None + + with patch( + "homeassistant.components.bluetooth.update_coordinator.async_register_callback", + _async_register_callback, + ): + cancel = coordinator.async_start() + + assert saved_callback is not None + + saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT) + await hass.async_block_till_done() + assert async_handle_update.mock_calls[-1] == call({"testdata": True}) + + flag = False + + saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT) + await hass.async_block_till_done() + assert async_handle_update.mock_calls[-1] == call({"testdata": None}) + + flag = True + + saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT) + await hass.async_block_till_done() + assert async_handle_update.mock_calls[-1] == call({"testdata": True}) + + cancel() + + +async def test_poll_failure_and_recover(hass: HomeAssistant, mock_bleak_scanner_start): + """Test error handling and recovery.""" + await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) + + flag = True + + def _update_method(service_info: BluetoothServiceInfoBleak): + return {"testdata": None} + + def _poll_needed(*args, **kwargs): + return True + + async def _poll(*args, **kwargs): + nonlocal flag + if flag: + raise RuntimeError("Poll failure") + return {"testdata": flag} + + coordinator = ActiveBluetoothProcessorCoordinator( + hass, + _LOGGER, + address="aa:bb:cc:dd:ee:ff", + mode=BluetoothScanningMode.ACTIVE, + update_method=_update_method, + needs_poll_method=_poll_needed, + poll_method=_poll, + poll_debouncer=Debouncer( + hass, + _LOGGER, + cooldown=0, + immediate=True, + ), + ) + assert coordinator.available is False # no data yet + saved_callback = None + + processor = MagicMock() + coordinator.async_register_processor(processor) + async_handle_update = processor.async_handle_update + + def _async_register_callback(_hass, _callback, _matcher, _mode): + nonlocal saved_callback + saved_callback = _callback + return lambda: None + + with patch( + "homeassistant.components.bluetooth.update_coordinator.async_register_callback", + _async_register_callback, + ): + cancel = coordinator.async_start() + + assert saved_callback is not None + + # First poll fails + saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT) + await hass.async_block_till_done() + assert async_handle_update.mock_calls[-1] == call({"testdata": None}) + + # Second poll works + flag = False + saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT) + await hass.async_block_till_done() + assert async_handle_update.mock_calls[-1] == call({"testdata": False}) + + cancel() + + +async def test_second_poll_needed(hass: HomeAssistant, mock_bleak_scanner_start): + """If a poll is queued, by the time it starts it may no longer be needed.""" + await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) + + count = 0 + + def _update_method(service_info: BluetoothServiceInfoBleak): + return {"testdata": None} + + # Only poll once + def _poll_needed(*args, **kwargs): + nonlocal count + return count == 0 + + async def _poll(*args, **kwargs): + nonlocal count + count += 1 + return {"testdata": count} + + coordinator = ActiveBluetoothProcessorCoordinator( + hass, + _LOGGER, + address="aa:bb:cc:dd:ee:ff", + mode=BluetoothScanningMode.ACTIVE, + update_method=_update_method, + needs_poll_method=_poll_needed, + poll_method=_poll, + ) + assert coordinator.available is False # no data yet + saved_callback = None + + processor = MagicMock() + coordinator.async_register_processor(processor) + async_handle_update = processor.async_handle_update + + def _async_register_callback(_hass, _callback, _matcher, _mode): + nonlocal saved_callback + saved_callback = _callback + return lambda: None + + with patch( + "homeassistant.components.bluetooth.update_coordinator.async_register_callback", + _async_register_callback, + ): + cancel = coordinator.async_start() + + assert saved_callback is not None + + # First poll gets queued + saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT) + # Second poll gets stuck behind first poll + saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT) + + await hass.async_block_till_done() + assert async_handle_update.mock_calls[-1] == call({"testdata": 1}) + + cancel() + + +async def test_rate_limit(hass: HomeAssistant, mock_bleak_scanner_start): + """Test error handling and recovery.""" + await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) + + count = 0 + + def _update_method(service_info: BluetoothServiceInfoBleak): + return {"testdata": None} + + def _poll_needed(*args, **kwargs): + return True + + async def _poll(*args, **kwargs): + nonlocal count + count += 1 + await asyncio.sleep(0) + return {"testdata": count} + + coordinator = ActiveBluetoothProcessorCoordinator( + hass, + _LOGGER, + address="aa:bb:cc:dd:ee:ff", + mode=BluetoothScanningMode.ACTIVE, + update_method=_update_method, + needs_poll_method=_poll_needed, + poll_method=_poll, + ) + assert coordinator.available is False # no data yet + saved_callback = None + + processor = MagicMock() + coordinator.async_register_processor(processor) + async_handle_update = processor.async_handle_update + + def _async_register_callback(_hass, _callback, _matcher, _mode): + nonlocal saved_callback + saved_callback = _callback + return lambda: None + + with patch( + "homeassistant.components.bluetooth.update_coordinator.async_register_callback", + _async_register_callback, + ): + cancel = coordinator.async_start() + + assert saved_callback is not None + + # First poll gets queued + saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT) + # Second poll gets stuck behind first poll + saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT) + # Third poll gets stuck behind first poll doesn't get queued + saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT) + + await hass.async_block_till_done() + assert async_handle_update.mock_calls[-1] == call({"testdata": 1}) + + cancel()