#94138 added support for raw/bundled advertisements. We should use the same monotonic time for all advertisements in the bundle if not time is passed, or calculate the timestamp and pass it if its known
398 lines
14 KiB
Python
398 lines
14 KiB
Python
"""Base classes for HA Bluetooth scanners for bluetooth."""
|
|
from __future__ import annotations
|
|
|
|
from abc import ABC, abstractmethod
|
|
from collections.abc import Callable, Generator
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass
|
|
import datetime
|
|
from datetime import timedelta
|
|
import logging
|
|
from typing import Any, Final
|
|
|
|
from bleak.backends.device import BLEDevice
|
|
from bleak.backends.scanner import AdvertisementData
|
|
from bleak_retry_connector import NO_RSSI_VALUE
|
|
from bluetooth_adapters import DiscoveredDeviceAdvertisementData, adapter_human_name
|
|
from home_assistant_bluetooth import BluetoothServiceInfoBleak
|
|
|
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
|
from homeassistant.core import (
|
|
CALLBACK_TYPE,
|
|
Event,
|
|
HomeAssistant,
|
|
callback as hass_callback,
|
|
)
|
|
from homeassistant.helpers.event import async_track_time_interval
|
|
import homeassistant.util.dt as dt_util
|
|
from homeassistant.util.dt import monotonic_time_coarse
|
|
|
|
from . import models
|
|
from .const import (
|
|
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
|
|
SCANNER_WATCHDOG_INTERVAL,
|
|
SCANNER_WATCHDOG_TIMEOUT,
|
|
)
|
|
from .models import HaBluetoothConnector
|
|
|
|
MONOTONIC_TIME: Final = monotonic_time_coarse
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class BluetoothScannerDevice:
|
|
"""Data for a bluetooth device from a given scanner."""
|
|
|
|
scanner: BaseHaScanner
|
|
ble_device: BLEDevice
|
|
advertisement: AdvertisementData
|
|
|
|
|
|
class BaseHaScanner(ABC):
|
|
"""Base class for Ha Scanners."""
|
|
|
|
__slots__ = (
|
|
"hass",
|
|
"adapter",
|
|
"connectable",
|
|
"source",
|
|
"connector",
|
|
"_connecting",
|
|
"name",
|
|
"scanning",
|
|
"_last_detection",
|
|
"_start_time",
|
|
"_cancel_watchdog",
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
source: str,
|
|
adapter: str,
|
|
connector: HaBluetoothConnector | None = None,
|
|
) -> None:
|
|
"""Initialize the scanner."""
|
|
self.hass = hass
|
|
self.connectable = False
|
|
self.source = source
|
|
self.connector = connector
|
|
self._connecting = 0
|
|
self.adapter = adapter
|
|
self.name = adapter_human_name(adapter, source) if adapter != source else source
|
|
self.scanning = True
|
|
self._last_detection = 0.0
|
|
self._start_time = 0.0
|
|
self._cancel_watchdog: CALLBACK_TYPE | None = None
|
|
|
|
@hass_callback
|
|
def _async_stop_scanner_watchdog(self) -> None:
|
|
"""Stop the scanner watchdog."""
|
|
if self._cancel_watchdog:
|
|
self._cancel_watchdog()
|
|
self._cancel_watchdog = None
|
|
|
|
@hass_callback
|
|
def _async_setup_scanner_watchdog(self) -> None:
|
|
"""If something has restarted or updated, we need to restart the scanner."""
|
|
self._start_time = self._last_detection = MONOTONIC_TIME()
|
|
if not self._cancel_watchdog:
|
|
self._cancel_watchdog = async_track_time_interval(
|
|
self.hass,
|
|
self._async_scanner_watchdog,
|
|
SCANNER_WATCHDOG_INTERVAL,
|
|
name=f"{self.name} Bluetooth scanner watchdog",
|
|
)
|
|
|
|
@hass_callback
|
|
def _async_watchdog_triggered(self) -> bool:
|
|
"""Check if the watchdog has been triggered."""
|
|
time_since_last_detection = MONOTONIC_TIME() - self._last_detection
|
|
_LOGGER.debug(
|
|
"%s: Scanner watchdog time_since_last_detection: %s",
|
|
self.name,
|
|
time_since_last_detection,
|
|
)
|
|
return time_since_last_detection > SCANNER_WATCHDOG_TIMEOUT
|
|
|
|
@hass_callback
|
|
def _async_scanner_watchdog(self, now: datetime.datetime) -> None:
|
|
"""Check if the scanner is running.
|
|
|
|
Override this method if you need to do something else when the watchdog
|
|
is triggered.
|
|
"""
|
|
if self._async_watchdog_triggered():
|
|
_LOGGER.info(
|
|
(
|
|
"%s: Bluetooth scanner has gone quiet for %ss, check logs on the"
|
|
" scanner device for more information"
|
|
),
|
|
self.name,
|
|
SCANNER_WATCHDOG_TIMEOUT,
|
|
)
|
|
|
|
@contextmanager
|
|
def connecting(self) -> Generator[None, None, None]:
|
|
"""Context manager to track connecting state."""
|
|
self._connecting += 1
|
|
self.scanning = not self._connecting
|
|
try:
|
|
yield
|
|
finally:
|
|
self._connecting -= 1
|
|
self.scanning = not self._connecting
|
|
|
|
@property
|
|
@abstractmethod
|
|
def discovered_devices(self) -> list[BLEDevice]:
|
|
"""Return a list of discovered devices."""
|
|
|
|
@property
|
|
@abstractmethod
|
|
def discovered_devices_and_advertisement_data(
|
|
self,
|
|
) -> dict[str, tuple[BLEDevice, AdvertisementData]]:
|
|
"""Return a list of discovered devices and their advertisement data."""
|
|
|
|
async def async_diagnostics(self) -> dict[str, Any]:
|
|
"""Return diagnostic information about the scanner."""
|
|
device_adv_datas = self.discovered_devices_and_advertisement_data.values()
|
|
return {
|
|
"name": self.name,
|
|
"start_time": self._start_time,
|
|
"source": self.source,
|
|
"scanning": self.scanning,
|
|
"type": self.__class__.__name__,
|
|
"last_detection": self._last_detection,
|
|
"monotonic_time": MONOTONIC_TIME(),
|
|
"discovered_devices_and_advertisement_data": [
|
|
{
|
|
"name": device.name,
|
|
"address": device.address,
|
|
"rssi": advertisement_data.rssi,
|
|
"advertisement_data": advertisement_data,
|
|
"details": device.details,
|
|
}
|
|
for device, advertisement_data in device_adv_datas
|
|
],
|
|
}
|
|
|
|
|
|
class BaseHaRemoteScanner(BaseHaScanner):
|
|
"""Base class for a Home Assistant remote BLE scanner."""
|
|
|
|
__slots__ = (
|
|
"_new_info_callback",
|
|
"_discovered_device_advertisement_datas",
|
|
"_discovered_device_timestamps",
|
|
"_details",
|
|
"_expire_seconds",
|
|
"_storage",
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
scanner_id: str,
|
|
name: str,
|
|
new_info_callback: Callable[[BluetoothServiceInfoBleak], None],
|
|
connector: HaBluetoothConnector | None,
|
|
connectable: bool,
|
|
) -> None:
|
|
"""Initialize the scanner."""
|
|
super().__init__(hass, scanner_id, name, connector)
|
|
self._new_info_callback = new_info_callback
|
|
self._discovered_device_advertisement_datas: dict[
|
|
str, tuple[BLEDevice, AdvertisementData]
|
|
] = {}
|
|
self._discovered_device_timestamps: dict[str, float] = {}
|
|
self.connectable = connectable
|
|
self._details: dict[str, str | HaBluetoothConnector] = {"source": scanner_id}
|
|
# Scanners only care about connectable devices. The manager
|
|
# will handle taking care of availability for non-connectable devices
|
|
self._expire_seconds = CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
|
|
assert models.MANAGER is not None
|
|
self._storage = models.MANAGER.storage
|
|
|
|
@hass_callback
|
|
def async_setup(self) -> CALLBACK_TYPE:
|
|
"""Set up the scanner."""
|
|
if history := self._storage.async_get_advertisement_history(self.source):
|
|
self._discovered_device_advertisement_datas = (
|
|
history.discovered_device_advertisement_datas
|
|
)
|
|
self._discovered_device_timestamps = history.discovered_device_timestamps
|
|
# Expire anything that is too old
|
|
self._async_expire_devices(dt_util.utcnow())
|
|
|
|
cancel_track = async_track_time_interval(
|
|
self.hass,
|
|
self._async_expire_devices,
|
|
timedelta(seconds=30),
|
|
name=f"{self.name} Bluetooth scanner device expire",
|
|
)
|
|
cancel_stop = self.hass.bus.async_listen(
|
|
EVENT_HOMEASSISTANT_STOP, self._async_save_history
|
|
)
|
|
self._async_setup_scanner_watchdog()
|
|
|
|
@hass_callback
|
|
def _cancel() -> None:
|
|
self._async_save_history()
|
|
self._async_stop_scanner_watchdog()
|
|
cancel_track()
|
|
cancel_stop()
|
|
|
|
return _cancel
|
|
|
|
@hass_callback
|
|
def _async_save_history(self, event: Event | None = None) -> None:
|
|
"""Save the history."""
|
|
self._storage.async_set_advertisement_history(
|
|
self.source,
|
|
DiscoveredDeviceAdvertisementData(
|
|
self.connectable,
|
|
self._expire_seconds,
|
|
self._discovered_device_advertisement_datas,
|
|
self._discovered_device_timestamps,
|
|
),
|
|
)
|
|
|
|
@hass_callback
|
|
def _async_expire_devices(self, _datetime: datetime.datetime) -> None:
|
|
"""Expire old devices."""
|
|
now = MONOTONIC_TIME()
|
|
expired = [
|
|
address
|
|
for address, timestamp in self._discovered_device_timestamps.items()
|
|
if now - timestamp > self._expire_seconds
|
|
]
|
|
for address in expired:
|
|
del self._discovered_device_advertisement_datas[address]
|
|
del self._discovered_device_timestamps[address]
|
|
|
|
@property
|
|
def discovered_devices(self) -> list[BLEDevice]:
|
|
"""Return a list of discovered devices."""
|
|
device_adv_datas = self._discovered_device_advertisement_datas.values()
|
|
return [
|
|
device_advertisement_data[0]
|
|
for device_advertisement_data in device_adv_datas
|
|
]
|
|
|
|
@property
|
|
def discovered_devices_and_advertisement_data(
|
|
self,
|
|
) -> dict[str, tuple[BLEDevice, AdvertisementData]]:
|
|
"""Return a list of discovered devices and advertisement data."""
|
|
return self._discovered_device_advertisement_datas
|
|
|
|
@hass_callback
|
|
def _async_on_advertisement(
|
|
self,
|
|
address: str,
|
|
rssi: int,
|
|
local_name: str | None,
|
|
service_uuids: list[str],
|
|
service_data: dict[str, bytes],
|
|
manufacturer_data: dict[int, bytes],
|
|
tx_power: int | None,
|
|
details: dict[Any, Any],
|
|
advertisement_monotonic_time: float,
|
|
) -> None:
|
|
"""Call the registered callback."""
|
|
self._last_detection = advertisement_monotonic_time
|
|
if prev_discovery := self._discovered_device_advertisement_datas.get(address):
|
|
# Merge the new data with the old data
|
|
# to function the same as BlueZ which
|
|
# merges the dicts on PropertiesChanged
|
|
prev_device = prev_discovery[0]
|
|
prev_advertisement = prev_discovery[1]
|
|
prev_service_uuids = prev_advertisement.service_uuids
|
|
prev_service_data = prev_advertisement.service_data
|
|
prev_manufacturer_data = prev_advertisement.manufacturer_data
|
|
prev_name = prev_device.name
|
|
|
|
if local_name and prev_name and len(prev_name) > len(local_name):
|
|
local_name = prev_name
|
|
|
|
if service_uuids and service_uuids != prev_service_uuids:
|
|
service_uuids = list(set(service_uuids + prev_service_uuids))
|
|
elif not service_uuids:
|
|
service_uuids = prev_service_uuids
|
|
|
|
if service_data and service_data != prev_service_data:
|
|
service_data = prev_service_data | service_data
|
|
elif not service_data:
|
|
service_data = prev_service_data
|
|
|
|
if manufacturer_data and manufacturer_data != prev_manufacturer_data:
|
|
manufacturer_data = prev_manufacturer_data | manufacturer_data
|
|
elif not manufacturer_data:
|
|
manufacturer_data = prev_manufacturer_data
|
|
#
|
|
# Bleak updates the BLEDevice via create_or_update_device.
|
|
# We need to do the same to ensure integrations that already
|
|
# have the BLEDevice object get the updated details when they
|
|
# change.
|
|
#
|
|
# https://github.com/hbldh/bleak/blob/222618b7747f0467dbb32bd3679f8cfaa19b1668/bleak/backends/scanner.py#L203
|
|
#
|
|
device = prev_device
|
|
device.name = local_name
|
|
device.details = self._details | details
|
|
# pylint: disable-next=protected-access
|
|
device._rssi = rssi # deprecated, will be removed in newer bleak
|
|
else:
|
|
device = BLEDevice(
|
|
address=address,
|
|
name=local_name,
|
|
details=self._details | details,
|
|
rssi=rssi, # deprecated, will be removed in newer bleak
|
|
)
|
|
|
|
advertisement_data = AdvertisementData(
|
|
local_name=None if local_name == "" else local_name,
|
|
manufacturer_data=manufacturer_data,
|
|
service_data=service_data,
|
|
service_uuids=service_uuids,
|
|
tx_power=NO_RSSI_VALUE if tx_power is None else tx_power,
|
|
rssi=rssi,
|
|
platform_data=(),
|
|
)
|
|
self._discovered_device_advertisement_datas[address] = (
|
|
device,
|
|
advertisement_data,
|
|
)
|
|
self._discovered_device_timestamps[address] = advertisement_monotonic_time
|
|
self._new_info_callback(
|
|
BluetoothServiceInfoBleak(
|
|
name=local_name or address,
|
|
address=address,
|
|
rssi=rssi,
|
|
manufacturer_data=manufacturer_data,
|
|
service_data=service_data,
|
|
service_uuids=service_uuids,
|
|
source=self.source,
|
|
device=device,
|
|
advertisement=advertisement_data,
|
|
connectable=self.connectable,
|
|
time=advertisement_monotonic_time,
|
|
)
|
|
)
|
|
|
|
async def async_diagnostics(self) -> dict[str, Any]:
|
|
"""Return diagnostic information about the scanner."""
|
|
now = MONOTONIC_TIME()
|
|
return await super().async_diagnostics() | {
|
|
"storage": self._storage.async_get_advertisement_history_as_dict(
|
|
self.source
|
|
),
|
|
"connectable": self.connectable,
|
|
"discovered_device_timestamps": self._discovered_device_timestamps,
|
|
"time_since_last_device_detection": {
|
|
address: now - timestamp
|
|
for address, timestamp in self._discovered_device_timestamps.items()
|
|
},
|
|
}
|