Add new Bluetooth coordinator helper for polling mostly passive devices (#76549)

This commit is contained in:
Jc2k 2022-08-10 18:56:34 +01:00 committed by GitHub
parent 982d197ff3
commit 0639681991
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 489 additions and 0 deletions

View file

@ -0,0 +1,141 @@
"""A Bluetooth passive coordinator that collects data from advertisements but can also poll."""
from __future__ import annotations
from collections.abc import Callable, Coroutine
import logging
import time
from typing import Any, Generic, TypeVar
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.debounce import Debouncer
from . import BluetoothChange, BluetoothScanningMode, BluetoothServiceInfoBleak
from .passive_update_processor import PassiveBluetoothProcessorCoordinator
POLL_DEFAULT_COOLDOWN = 10
POLL_DEFAULT_IMMEDIATE = True
_T = TypeVar("_T")
class ActiveBluetoothProcessorCoordinator(
Generic[_T], PassiveBluetoothProcessorCoordinator[_T]
):
"""
A coordinator that parses passive data from advertisements but can also poll.
Every time an advertisement is received, needs_poll_method is called to work
out if a poll is needed. This should return True if it is and False if it is
not needed.
def needs_poll_method(svc_info: BluetoothServiceInfoBleak, last_poll: float | None) -> bool:
return True
If there has been no poll since HA started, `last_poll` will be None. Otherwise it is
the number of seconds since one was last attempted.
If a poll is needed, the coordinator will call poll_method. This is a coroutine.
It should return the same type of data as your update_method. The expectation is that
data from advertisements and from polling are being parsed and fed into a shared
object that represents the current state of the device.
async def poll_method(svc_info: BluetoothServiceInfoBleak) -> YourDataType:
return YourDataType(....)
BluetoothServiceInfoBleak.device contains a BLEDevice. You should use this in
your poll function, as it is the most efficient way to get a BleakClient.
"""
def __init__(
self,
hass: HomeAssistant,
logger: logging.Logger,
*,
address: str,
mode: BluetoothScanningMode,
update_method: Callable[[BluetoothServiceInfoBleak], _T],
needs_poll_method: Callable[[BluetoothServiceInfoBleak, float | None], bool],
poll_method: Callable[
[BluetoothServiceInfoBleak],
Coroutine[Any, Any, _T],
]
| None = None,
poll_debouncer: Debouncer[Coroutine[Any, Any, None]] | None = None,
) -> None:
"""Initialize the processor."""
super().__init__(hass, logger, address, mode, update_method)
self._needs_poll_method = needs_poll_method
self._poll_method = poll_method
self._last_poll: float | None = None
self.last_poll_successful = True
# We keep the last service info in case the poller needs to refer to
# e.g. its BLEDevice
self._last_service_info: BluetoothServiceInfoBleak | None = None
if poll_debouncer is None:
poll_debouncer = Debouncer(
hass,
logger,
cooldown=POLL_DEFAULT_COOLDOWN,
immediate=POLL_DEFAULT_IMMEDIATE,
function=self._async_poll,
)
else:
poll_debouncer.function = self._async_poll
self._debounced_poll = poll_debouncer
def needs_poll(self, service_info: BluetoothServiceInfoBleak) -> bool:
"""Return true if time to try and poll."""
poll_age: float | None = None
if self._last_poll:
poll_age = time.monotonic() - self._last_poll
return self._needs_poll_method(service_info, poll_age)
async def _async_poll_data(
self, last_service_info: BluetoothServiceInfoBleak
) -> _T:
"""Fetch the latest data from the source."""
if self._poll_method is None:
raise NotImplementedError("Poll method not implemented")
return await self._poll_method(last_service_info)
async def _async_poll(self) -> None:
"""Poll the device to retrieve any extra data."""
assert self._last_service_info
try:
update = await self._async_poll_data(self._last_service_info)
except Exception: # pylint: disable=broad-except
if self.last_poll_successful:
self.logger.exception("%s: Failure while polling", self.address)
self.last_poll_successful = False
return
finally:
self._last_poll = time.monotonic()
if not self.last_poll_successful:
self.logger.debug("%s: Polling recovered")
self.last_poll_successful = True
for processor in self._processors:
processor.async_handle_update(update)
@callback
def _async_handle_bluetooth_event(
self,
service_info: BluetoothServiceInfoBleak,
change: BluetoothChange,
) -> None:
"""Handle a Bluetooth event."""
super()._async_handle_bluetooth_event(service_info, change)
self._last_service_info = service_info
# See if its time to poll
# We use bluetooth events to trigger the poll so that we scan as soon as
# possible after a device comes online or back in range, if a poll is due
if self.needs_poll(service_info):
self.hass.async_create_task(self._debounced_poll.async_call())

View file

@ -0,0 +1,348 @@
"""Tests for the Bluetooth integration PassiveBluetoothDataUpdateCoordinator."""
from __future__ import annotations
import asyncio
import logging
from unittest.mock import MagicMock, call, patch
from homeassistant.components.bluetooth import (
DOMAIN,
BluetoothChange,
BluetoothScanningMode,
BluetoothServiceInfoBleak,
)
from homeassistant.components.bluetooth.active_update_coordinator import (
ActiveBluetoothProcessorCoordinator,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.service_info.bluetooth import BluetoothServiceInfo
from homeassistant.setup import async_setup_component
_LOGGER = logging.getLogger(__name__)
GENERIC_BLUETOOTH_SERVICE_INFO = BluetoothServiceInfo(
name="Generic",
address="aa:bb:cc:dd:ee:ff",
rssi=-95,
manufacturer_data={
1: b"\x01\x01\x01\x01\x01\x01\x01\x01",
},
service_data={},
service_uuids=[],
source="local",
)
async def test_basic_usage(hass: HomeAssistant, mock_bleak_scanner_start):
"""Test basic usage of the ActiveBluetoothProcessorCoordinator."""
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
def _update_method(service_info: BluetoothServiceInfoBleak):
return {"testdata": 0}
def _poll_needed(*args, **kwargs):
return True
async def _poll(*args, **kwargs):
return {"testdata": 1}
coordinator = ActiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address="aa:bb:cc:dd:ee:ff",
mode=BluetoothScanningMode.ACTIVE,
update_method=_update_method,
needs_poll_method=_poll_needed,
poll_method=_poll,
)
assert coordinator.available is False # no data yet
saved_callback = None
processor = MagicMock()
coordinator.async_register_processor(processor)
async_handle_update = processor.async_handle_update
def _async_register_callback(_hass, _callback, _matcher, _mode):
nonlocal saved_callback
saved_callback = _callback
return lambda: None
with patch(
"homeassistant.components.bluetooth.update_coordinator.async_register_callback",
_async_register_callback,
):
cancel = coordinator.async_start()
assert saved_callback is not None
saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT)
await hass.async_block_till_done()
assert coordinator.available is True
# async_handle_update should have been called twice
# The first time, it was passed the data from parsing the advertisement
# The second time, it was passed the data from polling
assert len(async_handle_update.mock_calls) == 2
assert async_handle_update.mock_calls[0] == call({"testdata": 0})
assert async_handle_update.mock_calls[1] == call({"testdata": 1})
cancel()
async def test_poll_can_be_skipped(hass: HomeAssistant, mock_bleak_scanner_start):
"""Test need_poll callback works and can skip a poll if its not needed."""
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
flag = True
def _update_method(service_info: BluetoothServiceInfoBleak):
return {"testdata": None}
def _poll_needed(*args, **kwargs):
nonlocal flag
return flag
async def _poll(*args, **kwargs):
return {"testdata": flag}
coordinator = ActiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address="aa:bb:cc:dd:ee:ff",
mode=BluetoothScanningMode.ACTIVE,
update_method=_update_method,
needs_poll_method=_poll_needed,
poll_method=_poll,
poll_debouncer=Debouncer(
hass,
_LOGGER,
cooldown=0,
immediate=True,
),
)
assert coordinator.available is False # no data yet
saved_callback = None
processor = MagicMock()
coordinator.async_register_processor(processor)
async_handle_update = processor.async_handle_update
def _async_register_callback(_hass, _callback, _matcher, _mode):
nonlocal saved_callback
saved_callback = _callback
return lambda: None
with patch(
"homeassistant.components.bluetooth.update_coordinator.async_register_callback",
_async_register_callback,
):
cancel = coordinator.async_start()
assert saved_callback is not None
saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT)
await hass.async_block_till_done()
assert async_handle_update.mock_calls[-1] == call({"testdata": True})
flag = False
saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT)
await hass.async_block_till_done()
assert async_handle_update.mock_calls[-1] == call({"testdata": None})
flag = True
saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT)
await hass.async_block_till_done()
assert async_handle_update.mock_calls[-1] == call({"testdata": True})
cancel()
async def test_poll_failure_and_recover(hass: HomeAssistant, mock_bleak_scanner_start):
"""Test error handling and recovery."""
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
flag = True
def _update_method(service_info: BluetoothServiceInfoBleak):
return {"testdata": None}
def _poll_needed(*args, **kwargs):
return True
async def _poll(*args, **kwargs):
nonlocal flag
if flag:
raise RuntimeError("Poll failure")
return {"testdata": flag}
coordinator = ActiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address="aa:bb:cc:dd:ee:ff",
mode=BluetoothScanningMode.ACTIVE,
update_method=_update_method,
needs_poll_method=_poll_needed,
poll_method=_poll,
poll_debouncer=Debouncer(
hass,
_LOGGER,
cooldown=0,
immediate=True,
),
)
assert coordinator.available is False # no data yet
saved_callback = None
processor = MagicMock()
coordinator.async_register_processor(processor)
async_handle_update = processor.async_handle_update
def _async_register_callback(_hass, _callback, _matcher, _mode):
nonlocal saved_callback
saved_callback = _callback
return lambda: None
with patch(
"homeassistant.components.bluetooth.update_coordinator.async_register_callback",
_async_register_callback,
):
cancel = coordinator.async_start()
assert saved_callback is not None
# First poll fails
saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT)
await hass.async_block_till_done()
assert async_handle_update.mock_calls[-1] == call({"testdata": None})
# Second poll works
flag = False
saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT)
await hass.async_block_till_done()
assert async_handle_update.mock_calls[-1] == call({"testdata": False})
cancel()
async def test_second_poll_needed(hass: HomeAssistant, mock_bleak_scanner_start):
"""If a poll is queued, by the time it starts it may no longer be needed."""
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
count = 0
def _update_method(service_info: BluetoothServiceInfoBleak):
return {"testdata": None}
# Only poll once
def _poll_needed(*args, **kwargs):
nonlocal count
return count == 0
async def _poll(*args, **kwargs):
nonlocal count
count += 1
return {"testdata": count}
coordinator = ActiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address="aa:bb:cc:dd:ee:ff",
mode=BluetoothScanningMode.ACTIVE,
update_method=_update_method,
needs_poll_method=_poll_needed,
poll_method=_poll,
)
assert coordinator.available is False # no data yet
saved_callback = None
processor = MagicMock()
coordinator.async_register_processor(processor)
async_handle_update = processor.async_handle_update
def _async_register_callback(_hass, _callback, _matcher, _mode):
nonlocal saved_callback
saved_callback = _callback
return lambda: None
with patch(
"homeassistant.components.bluetooth.update_coordinator.async_register_callback",
_async_register_callback,
):
cancel = coordinator.async_start()
assert saved_callback is not None
# First poll gets queued
saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT)
# Second poll gets stuck behind first poll
saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT)
await hass.async_block_till_done()
assert async_handle_update.mock_calls[-1] == call({"testdata": 1})
cancel()
async def test_rate_limit(hass: HomeAssistant, mock_bleak_scanner_start):
"""Test error handling and recovery."""
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
count = 0
def _update_method(service_info: BluetoothServiceInfoBleak):
return {"testdata": None}
def _poll_needed(*args, **kwargs):
return True
async def _poll(*args, **kwargs):
nonlocal count
count += 1
await asyncio.sleep(0)
return {"testdata": count}
coordinator = ActiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address="aa:bb:cc:dd:ee:ff",
mode=BluetoothScanningMode.ACTIVE,
update_method=_update_method,
needs_poll_method=_poll_needed,
poll_method=_poll,
)
assert coordinator.available is False # no data yet
saved_callback = None
processor = MagicMock()
coordinator.async_register_processor(processor)
async_handle_update = processor.async_handle_update
def _async_register_callback(_hass, _callback, _matcher, _mode):
nonlocal saved_callback
saved_callback = _callback
return lambda: None
with patch(
"homeassistant.components.bluetooth.update_coordinator.async_register_callback",
_async_register_callback,
):
cancel = coordinator.async_start()
assert saved_callback is not None
# First poll gets queued
saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT)
# Second poll gets stuck behind first poll
saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT)
# Third poll gets stuck behind first poll doesn't get queued
saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT)
await hass.async_block_till_done()
assert async_handle_update.mock_calls[-1] == call({"testdata": 1})
cancel()