"""The bluetooth integration.""" from __future__ import annotations from collections.abc import Callable from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum import fnmatch import logging from typing import Final, TypedDict, Union from bleak import BleakError from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData from lru import LRU # pylint: disable=no-name-in-module from homeassistant import config_entries from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.core import ( CALLBACK_TYPE, Event, HomeAssistant, callback as hass_callback, ) from homeassistant.helpers import discovery_flow from homeassistant.helpers.event import async_track_time_interval from homeassistant.helpers.service_info.bluetooth import BluetoothServiceInfo from homeassistant.helpers.typing import ConfigType from homeassistant.loader import ( BluetoothMatcher, BluetoothMatcherOptional, async_get_bluetooth, ) from . import models from .const import DOMAIN from .models import HaBleakScanner from .usage import install_multiple_bleak_catcher _LOGGER = logging.getLogger(__name__) MAX_REMEMBER_ADDRESSES: Final = 2048 UNAVAILABLE_TRACK_SECONDS: Final = 60 * 5 SOURCE_LOCAL: Final = "local" @dataclass class BluetoothServiceInfoBleak(BluetoothServiceInfo): """BluetoothServiceInfo with bleak data. Integrations may need BLEDevice and AdvertisementData to connect to the device without having bleak trigger another scan to translate the address to the system's internal details. """ device: BLEDevice advertisement: AdvertisementData @classmethod def from_advertisement( cls, device: BLEDevice, advertisement_data: AdvertisementData, source: str ) -> BluetoothServiceInfoBleak: """Create a BluetoothServiceInfoBleak from an advertisement.""" return cls( name=advertisement_data.local_name or device.name or device.address, address=device.address, rssi=device.rssi, manufacturer_data=advertisement_data.manufacturer_data, service_data=advertisement_data.service_data, service_uuids=advertisement_data.service_uuids, source=source, device=device, advertisement=advertisement_data, ) class BluetoothCallbackMatcherOptional(TypedDict, total=False): """Matcher for the bluetooth integration for callback optional fields.""" address: str class BluetoothCallbackMatcher( BluetoothMatcherOptional, BluetoothCallbackMatcherOptional, ): """Callback matcher for the bluetooth integration.""" class BluetoothScanningMode(Enum): """The mode of scanning for bluetooth devices.""" PASSIVE = "passive" ACTIVE = "active" SCANNING_MODE_TO_BLEAK = { BluetoothScanningMode.ACTIVE: "active", BluetoothScanningMode.PASSIVE: "passive", } ADDRESS: Final = "address" LOCAL_NAME: Final = "local_name" SERVICE_UUID: Final = "service_uuid" MANUFACTURER_ID: Final = "manufacturer_id" MANUFACTURER_DATA_START: Final = "manufacturer_data_start" BluetoothChange = Enum("BluetoothChange", "ADVERTISEMENT") BluetoothCallback = Callable[ [Union[BluetoothServiceInfoBleak, BluetoothServiceInfo], BluetoothChange], None ] @hass_callback def async_discovered_service_info( hass: HomeAssistant, ) -> list[BluetoothServiceInfoBleak]: """Return the discovered devices list.""" if DOMAIN not in hass.data: return [] manager: BluetoothManager = hass.data[DOMAIN] return manager.async_discovered_service_info() @hass_callback def async_ble_device_from_address( hass: HomeAssistant, address: str, ) -> BLEDevice | None: """Return BLEDevice for an address if its present.""" if DOMAIN not in hass.data: return None manager: BluetoothManager = hass.data[DOMAIN] return manager.async_ble_device_from_address(address) @hass_callback def async_address_present( hass: HomeAssistant, address: str, ) -> bool: """Check if an address is present in the bluetooth device list.""" if DOMAIN not in hass.data: return False manager: BluetoothManager = hass.data[DOMAIN] return manager.async_address_present(address) @hass_callback def async_register_callback( hass: HomeAssistant, callback: BluetoothCallback, match_dict: BluetoothCallbackMatcher | None, ) -> Callable[[], None]: """Register to receive a callback on bluetooth change. Returns a callback that can be used to cancel the registration. """ manager: BluetoothManager = hass.data[DOMAIN] return manager.async_register_callback(callback, match_dict) @hass_callback def async_track_unavailable( hass: HomeAssistant, callback: Callable[[str], None], address: str, ) -> Callable[[], None]: """Register to receive a callback when an address is unavailable. Returns a callback that can be used to cancel the registration. """ manager: BluetoothManager = hass.data[DOMAIN] return manager.async_track_unavailable(callback, address) async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the bluetooth integration.""" integration_matchers = await async_get_bluetooth(hass) bluetooth_discovery = BluetoothManager( hass, integration_matchers, BluetoothScanningMode.PASSIVE ) await bluetooth_discovery.async_setup() hass.data[DOMAIN] = bluetooth_discovery return True def _ble_device_matches( matcher: BluetoothCallbackMatcher | BluetoothMatcher, device: BLEDevice, advertisement_data: AdvertisementData, ) -> bool: """Check if a ble device and advertisement_data matches the matcher.""" if ( matcher_address := matcher.get(ADDRESS) ) is not None and device.address != matcher_address: return False if ( matcher_local_name := matcher.get(LOCAL_NAME) ) is not None and not fnmatch.fnmatch( advertisement_data.local_name or device.name or device.address, matcher_local_name, ): return False if ( matcher_service_uuid := matcher.get(SERVICE_UUID) ) is not None and matcher_service_uuid not in advertisement_data.service_uuids: return False if ( (matcher_manfacturer_id := matcher.get(MANUFACTURER_ID)) is not None and matcher_manfacturer_id not in advertisement_data.manufacturer_data ): return False if ( matcher_manufacturer_data_start := matcher.get(MANUFACTURER_DATA_START) ) is not None: matcher_manufacturer_data_start_bytes = bytearray( matcher_manufacturer_data_start ) if not any( manufacturer_data.startswith(matcher_manufacturer_data_start_bytes) for manufacturer_data in advertisement_data.manufacturer_data.values() ): return False return True class BluetoothManager: """Manage Bluetooth.""" def __init__( self, hass: HomeAssistant, integration_matchers: list[BluetoothMatcher], scanning_mode: BluetoothScanningMode, ) -> None: """Init bluetooth discovery.""" self.hass = hass self.scanning_mode = scanning_mode self._integration_matchers = integration_matchers self.scanner: HaBleakScanner | None = None self._cancel_device_detected: CALLBACK_TYPE | None = None self._cancel_unavailable_tracking: CALLBACK_TYPE | None = None self._unavailable_callbacks: dict[str, list[Callable[[str], None]]] = {} self._callbacks: list[ tuple[BluetoothCallback, BluetoothCallbackMatcher | None] ] = [] # Some devices use a random address so we need to use # an LRU to avoid memory issues. self._matched: LRU = LRU(MAX_REMEMBER_ADDRESSES) async def async_setup(self) -> None: """Set up BT Discovery.""" try: self.scanner = HaBleakScanner( scanning_mode=SCANNING_MODE_TO_BLEAK[self.scanning_mode] ) except (FileNotFoundError, BleakError) as ex: _LOGGER.warning( "Could not create bluetooth scanner (is bluetooth present and enabled?): %s", ex, ) return install_multiple_bleak_catcher(self.scanner) self.async_setup_unavailable_tracking() # We have to start it right away as some integrations might # need it straight away. _LOGGER.debug("Starting bluetooth scanner") self.scanner.register_detection_callback(self.scanner.async_callback_dispatcher) self._cancel_device_detected = self.scanner.async_register_callback( self._device_detected, {} ) self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stop) await self.scanner.start() @hass_callback def async_setup_unavailable_tracking(self) -> None: """Set up the unavailable tracking.""" @hass_callback def _async_check_unavailable(now: datetime) -> None: """Watch for unavailable devices.""" assert models.HA_BLEAK_SCANNER is not None scanner = models.HA_BLEAK_SCANNER history = set(scanner.history) active = {device.address for device in scanner.discovered_devices} disappeared = history.difference(active) for address in disappeared: del scanner.history[address] if not (callbacks := self._unavailable_callbacks.get(address)): continue for callback in callbacks: try: callback(address) except Exception: # pylint: disable=broad-except _LOGGER.exception("Error in unavailable callback") self._cancel_unavailable_tracking = async_track_time_interval( self.hass, _async_check_unavailable, timedelta(seconds=UNAVAILABLE_TRACK_SECONDS), ) @hass_callback def _device_detected( self, device: BLEDevice, advertisement_data: AdvertisementData ) -> None: """Handle a detected device.""" matched_domains: set[str] | None = None match_key = (device.address, bool(advertisement_data.manufacturer_data)) match_key_has_mfr_data = (device.address, True) # If we matched without manufacturer_data, we need to do it again # since we may think the device is unsupported otherwise if ( match_key_has_mfr_data not in self._matched and match_key not in self._matched ): matched_domains = { matcher["domain"] for matcher in self._integration_matchers if _ble_device_matches(matcher, device, advertisement_data) } if matched_domains: self._matched[match_key] = True _LOGGER.debug( "Device detected: %s with advertisement_data: %s matched domains: %s", device, advertisement_data, matched_domains, ) if not matched_domains and not self._callbacks: return service_info: BluetoothServiceInfoBleak | None = None for callback, matcher in self._callbacks: if matcher is None or _ble_device_matches( matcher, device, advertisement_data ): if service_info is None: service_info = BluetoothServiceInfoBleak.from_advertisement( device, advertisement_data, SOURCE_LOCAL ) try: callback(service_info, BluetoothChange.ADVERTISEMENT) except Exception: # pylint: disable=broad-except _LOGGER.exception("Error in bluetooth callback") if not matched_domains: return if service_info is None: service_info = BluetoothServiceInfoBleak.from_advertisement( device, advertisement_data, SOURCE_LOCAL ) for domain in matched_domains: discovery_flow.async_create_flow( self.hass, domain, {"source": config_entries.SOURCE_BLUETOOTH}, service_info, ) @hass_callback def async_track_unavailable( self, callback: Callable[[str], None], address: str ) -> Callable[[], None]: """Register a callback.""" self._unavailable_callbacks.setdefault(address, []).append(callback) @hass_callback def _async_remove_callback() -> None: self._unavailable_callbacks[address].remove(callback) if not self._unavailable_callbacks[address]: del self._unavailable_callbacks[address] return _async_remove_callback @hass_callback def async_register_callback( self, callback: BluetoothCallback, matcher: BluetoothCallbackMatcher | None = None, ) -> Callable[[], None]: """Register a callback.""" callback_entry = (callback, matcher) self._callbacks.append(callback_entry) @hass_callback def _async_remove_callback() -> None: self._callbacks.remove(callback_entry) # If we have history for the subscriber, we can trigger the callback # immediately with the last packet so the subscriber can see the # device. if ( matcher and (address := matcher.get(ADDRESS)) and models.HA_BLEAK_SCANNER and (device_adv_data := models.HA_BLEAK_SCANNER.history.get(address)) ): try: callback( BluetoothServiceInfoBleak.from_advertisement( *device_adv_data, SOURCE_LOCAL ), BluetoothChange.ADVERTISEMENT, ) except Exception: # pylint: disable=broad-except _LOGGER.exception("Error in bluetooth callback") return _async_remove_callback @hass_callback def async_ble_device_from_address(self, address: str) -> BLEDevice | None: """Return the BLEDevice if present.""" if models.HA_BLEAK_SCANNER and ( ble_adv := models.HA_BLEAK_SCANNER.history.get(address) ): return ble_adv[0] return None @hass_callback def async_address_present(self, address: str) -> bool: """Return if the address is present.""" return bool( models.HA_BLEAK_SCANNER and address in models.HA_BLEAK_SCANNER.history ) @hass_callback def async_discovered_service_info(self) -> list[BluetoothServiceInfoBleak]: """Return if the address is present.""" if models.HA_BLEAK_SCANNER: history = models.HA_BLEAK_SCANNER.history return [ BluetoothServiceInfoBleak.from_advertisement(*device_adv, SOURCE_LOCAL) for device_adv in history.values() ] return [] async def async_stop(self, event: Event) -> None: """Stop bluetooth discovery.""" if self._cancel_device_detected: self._cancel_device_detected() self._cancel_device_detected = None if self._cancel_unavailable_tracking: self._cancel_unavailable_tracking() self._cancel_unavailable_tracking = None if self.scanner: await self.scanner.stop() models.HA_BLEAK_SCANNER = None