Fix bluetooth integration matching with service_data_uuids and service_uuids (#75687)
* Fix bluetooth integration with service_data and service_uuids We would only dispatch a new flow when the address was seen for the first time or the manufacturer_data appeared in a followup advertisement. Its also possible for the service_data and service_uuids to appear in a followup advertisement so we need to track these as well * improve logging to avoid overly large messages * improve logging to avoid overly large messages * adjust * adjsut * split * coverage * coverage * coverage * coverage * fix matcher * more coverage * more coverage * more coverage * revert switchbot changes and move to seperate PR
This commit is contained in:
parent
d890598da7
commit
bbb9443b00
5 changed files with 398 additions and 103 deletions
|
@ -5,15 +5,13 @@ from collections.abc import Callable
|
|||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from enum import Enum
|
||||
import fnmatch
|
||||
import logging
|
||||
import platform
|
||||
from typing import Final, TypedDict, Union
|
||||
from typing import Final, Union
|
||||
|
||||
from bleak import BleakError
|
||||
from bleak.backends.device import BLEDevice
|
||||
from bleak.backends.scanner import AdvertisementData
|
||||
from lru import LRU # pylint: disable=no-name-in-module
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
||||
|
@ -28,20 +26,21 @@ from homeassistant.helpers import discovery_flow
|
|||
from homeassistant.helpers.event import async_track_time_interval
|
||||
from homeassistant.helpers.service_info.bluetooth import BluetoothServiceInfo
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
from homeassistant.loader import (
|
||||
BluetoothMatcher,
|
||||
BluetoothMatcherOptional,
|
||||
async_get_bluetooth,
|
||||
)
|
||||
from homeassistant.loader import async_get_bluetooth
|
||||
|
||||
from . import models
|
||||
from .const import DOMAIN
|
||||
from .match import (
|
||||
ADDRESS,
|
||||
BluetoothCallbackMatcher,
|
||||
IntegrationMatcher,
|
||||
ble_device_matches,
|
||||
)
|
||||
from .models import HaBleakScanner, HaBleakScannerWrapper
|
||||
from .usage import install_multiple_bleak_catcher, uninstall_multiple_bleak_catcher
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
MAX_REMEMBER_ADDRESSES: Final = 2048
|
||||
|
||||
UNAVAILABLE_TRACK_SECONDS: Final = 60 * 5
|
||||
|
||||
|
@ -79,19 +78,6 @@ class BluetoothServiceInfoBleak(BluetoothServiceInfo):
|
|||
)
|
||||
|
||||
|
||||
class BluetoothCallbackMatcherOptional(TypedDict, total=False):
|
||||
"""Matcher for the bluetooth integration for callback optional fields."""
|
||||
|
||||
address: str
|
||||
|
||||
|
||||
class BluetoothCallbackMatcher(
|
||||
BluetoothMatcherOptional,
|
||||
BluetoothCallbackMatcherOptional,
|
||||
):
|
||||
"""Callback matcher for the bluetooth integration."""
|
||||
|
||||
|
||||
class BluetoothScanningMode(Enum):
|
||||
"""The mode of scanning for bluetooth devices."""
|
||||
|
||||
|
@ -104,12 +90,6 @@ SCANNING_MODE_TO_BLEAK = {
|
|||
BluetoothScanningMode.PASSIVE: "passive",
|
||||
}
|
||||
|
||||
ADDRESS: Final = "address"
|
||||
LOCAL_NAME: Final = "local_name"
|
||||
SERVICE_UUID: Final = "service_uuid"
|
||||
MANUFACTURER_ID: Final = "manufacturer_id"
|
||||
MANUFACTURER_DATA_START: Final = "manufacturer_data_start"
|
||||
|
||||
|
||||
BluetoothChange = Enum("BluetoothChange", "ADVERTISEMENT")
|
||||
BluetoothCallback = Callable[
|
||||
|
@ -208,8 +188,8 @@ async def _async_has_bluetooth_adapter() -> bool:
|
|||
|
||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the bluetooth integration."""
|
||||
integration_matchers = await async_get_bluetooth(hass)
|
||||
manager = BluetoothManager(hass, integration_matchers)
|
||||
integration_matcher = IntegrationMatcher(await async_get_bluetooth(hass))
|
||||
manager = BluetoothManager(hass, integration_matcher)
|
||||
manager.async_setup()
|
||||
hass.data[DOMAIN] = manager
|
||||
# The config entry is responsible for starting the manager
|
||||
|
@ -252,62 +232,17 @@ async def async_unload_entry(
|
|||
return True
|
||||
|
||||
|
||||
def _ble_device_matches(
|
||||
matcher: BluetoothCallbackMatcher | BluetoothMatcher,
|
||||
device: BLEDevice,
|
||||
advertisement_data: AdvertisementData,
|
||||
) -> bool:
|
||||
"""Check if a ble device and advertisement_data matches the matcher."""
|
||||
if (
|
||||
matcher_address := matcher.get(ADDRESS)
|
||||
) is not None and device.address != matcher_address:
|
||||
return False
|
||||
|
||||
if (
|
||||
matcher_local_name := matcher.get(LOCAL_NAME)
|
||||
) is not None and not fnmatch.fnmatch(
|
||||
advertisement_data.local_name or device.name or device.address,
|
||||
matcher_local_name,
|
||||
):
|
||||
return False
|
||||
|
||||
if (
|
||||
matcher_service_uuid := matcher.get(SERVICE_UUID)
|
||||
) is not None and matcher_service_uuid not in advertisement_data.service_uuids:
|
||||
return False
|
||||
|
||||
if (
|
||||
(matcher_manfacturer_id := matcher.get(MANUFACTURER_ID)) is not None
|
||||
and matcher_manfacturer_id not in advertisement_data.manufacturer_data
|
||||
):
|
||||
return False
|
||||
|
||||
if (
|
||||
matcher_manufacturer_data_start := matcher.get(MANUFACTURER_DATA_START)
|
||||
) is not None:
|
||||
matcher_manufacturer_data_start_bytes = bytearray(
|
||||
matcher_manufacturer_data_start
|
||||
)
|
||||
if not any(
|
||||
manufacturer_data.startswith(matcher_manufacturer_data_start_bytes)
|
||||
for manufacturer_data in advertisement_data.manufacturer_data.values()
|
||||
):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class BluetoothManager:
|
||||
"""Manage Bluetooth."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
integration_matchers: list[BluetoothMatcher],
|
||||
integration_matcher: IntegrationMatcher,
|
||||
) -> None:
|
||||
"""Init bluetooth discovery."""
|
||||
self.hass = hass
|
||||
self._integration_matchers = integration_matchers
|
||||
self._integration_matcher = integration_matcher
|
||||
self.scanner: HaBleakScanner | None = None
|
||||
self._cancel_device_detected: CALLBACK_TYPE | None = None
|
||||
self._cancel_unavailable_tracking: CALLBACK_TYPE | None = None
|
||||
|
@ -315,9 +250,6 @@ class BluetoothManager:
|
|||
self._callbacks: list[
|
||||
tuple[BluetoothCallback, BluetoothCallbackMatcher | None]
|
||||
] = []
|
||||
# Some devices use a random address so we need to use
|
||||
# an LRU to avoid memory issues.
|
||||
self._matched: LRU = LRU(MAX_REMEMBER_ADDRESSES)
|
||||
|
||||
@hass_callback
|
||||
def async_setup(self) -> None:
|
||||
|
@ -387,27 +319,12 @@ class BluetoothManager:
|
|||
self, device: BLEDevice, advertisement_data: AdvertisementData
|
||||
) -> None:
|
||||
"""Handle a detected device."""
|
||||
matched_domains: set[str] | None = None
|
||||
match_key = (device.address, bool(advertisement_data.manufacturer_data))
|
||||
match_key_has_mfr_data = (device.address, True)
|
||||
|
||||
# If we matched without manufacturer_data, we need to do it again
|
||||
# since we may think the device is unsupported otherwise
|
||||
if (
|
||||
match_key_has_mfr_data not in self._matched
|
||||
and match_key not in self._matched
|
||||
):
|
||||
matched_domains = {
|
||||
matcher["domain"]
|
||||
for matcher in self._integration_matchers
|
||||
if _ble_device_matches(matcher, device, advertisement_data)
|
||||
}
|
||||
if matched_domains:
|
||||
self._matched[match_key] = True
|
||||
|
||||
matched_domains = self._integration_matcher.match_domains(
|
||||
device, advertisement_data
|
||||
)
|
||||
_LOGGER.debug(
|
||||
"Device detected: %s with advertisement_data: %s matched domains: %s",
|
||||
device,
|
||||
device.address,
|
||||
advertisement_data,
|
||||
matched_domains,
|
||||
)
|
||||
|
@ -417,7 +334,7 @@ class BluetoothManager:
|
|||
|
||||
service_info: BluetoothServiceInfoBleak | None = None
|
||||
for callback, matcher in self._callbacks:
|
||||
if matcher is None or _ble_device_matches(
|
||||
if matcher is None or ble_device_matches(
|
||||
matcher, device, advertisement_data
|
||||
):
|
||||
if service_info is None:
|
||||
|
|
139
homeassistant/components/bluetooth/match.py
Normal file
139
homeassistant/components/bluetooth/match.py
Normal file
|
@ -0,0 +1,139 @@
|
|||
"""The bluetooth integration matchers."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
from dataclasses import dataclass
|
||||
import fnmatch
|
||||
from typing import Final, TypedDict
|
||||
|
||||
from bleak.backends.device import BLEDevice
|
||||
from bleak.backends.scanner import AdvertisementData
|
||||
from lru import LRU # pylint: disable=no-name-in-module
|
||||
|
||||
from homeassistant.loader import BluetoothMatcher, BluetoothMatcherOptional
|
||||
|
||||
MAX_REMEMBER_ADDRESSES: Final = 2048
|
||||
|
||||
|
||||
ADDRESS: Final = "address"
|
||||
LOCAL_NAME: Final = "local_name"
|
||||
SERVICE_UUID: Final = "service_uuid"
|
||||
SERVICE_DATA_UUID: Final = "service_data_uuid"
|
||||
MANUFACTURER_ID: Final = "manufacturer_id"
|
||||
MANUFACTURER_DATA_START: Final = "manufacturer_data_start"
|
||||
|
||||
|
||||
class BluetoothCallbackMatcherOptional(TypedDict, total=False):
|
||||
"""Matcher for the bluetooth integration for callback optional fields."""
|
||||
|
||||
address: str
|
||||
|
||||
|
||||
class BluetoothCallbackMatcher(
|
||||
BluetoothMatcherOptional,
|
||||
BluetoothCallbackMatcherOptional,
|
||||
):
|
||||
"""Callback matcher for the bluetooth integration."""
|
||||
|
||||
|
||||
@dataclass(frozen=False)
|
||||
class IntegrationMatchHistory:
|
||||
"""Track which fields have been seen."""
|
||||
|
||||
manufacturer_data: bool
|
||||
service_data: bool
|
||||
service_uuids: bool
|
||||
|
||||
|
||||
def seen_all_fields(
|
||||
previous_match: IntegrationMatchHistory, adv_data: AdvertisementData
|
||||
) -> bool:
|
||||
"""Return if we have seen all fields."""
|
||||
if not previous_match.manufacturer_data and adv_data.manufacturer_data:
|
||||
return False
|
||||
if not previous_match.service_data and adv_data.service_data:
|
||||
return False
|
||||
if not previous_match.service_uuids and adv_data.service_uuids:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class IntegrationMatcher:
|
||||
"""Integration matcher for the bluetooth integration."""
|
||||
|
||||
def __init__(self, integration_matchers: list[BluetoothMatcher]) -> None:
|
||||
"""Initialize the matcher."""
|
||||
self._integration_matchers = integration_matchers
|
||||
# Some devices use a random address so we need to use
|
||||
# an LRU to avoid memory issues.
|
||||
self._matched: Mapping[str, IntegrationMatchHistory] = LRU(
|
||||
MAX_REMEMBER_ADDRESSES
|
||||
)
|
||||
|
||||
def match_domains(self, device: BLEDevice, adv_data: AdvertisementData) -> set[str]:
|
||||
"""Return the domains that are matched."""
|
||||
matched_domains: set[str] = set()
|
||||
if (previous_match := self._matched.get(device.address)) and seen_all_fields(
|
||||
previous_match, adv_data
|
||||
):
|
||||
# We have seen all fields so we can skip the rest of the matchers
|
||||
return matched_domains
|
||||
matched_domains = {
|
||||
matcher["domain"]
|
||||
for matcher in self._integration_matchers
|
||||
if ble_device_matches(matcher, device, adv_data)
|
||||
}
|
||||
if not matched_domains:
|
||||
return matched_domains
|
||||
if previous_match:
|
||||
previous_match.manufacturer_data |= bool(adv_data.manufacturer_data)
|
||||
previous_match.service_data |= bool(adv_data.service_data)
|
||||
previous_match.service_uuids |= bool(adv_data.service_uuids)
|
||||
else:
|
||||
self._matched[device.address] = IntegrationMatchHistory( # type: ignore[index]
|
||||
manufacturer_data=bool(adv_data.manufacturer_data),
|
||||
service_data=bool(adv_data.service_data),
|
||||
service_uuids=bool(adv_data.service_uuids),
|
||||
)
|
||||
return matched_domains
|
||||
|
||||
|
||||
def ble_device_matches(
|
||||
matcher: BluetoothCallbackMatcher | BluetoothMatcher,
|
||||
device: BLEDevice,
|
||||
adv_data: AdvertisementData,
|
||||
) -> bool:
|
||||
"""Check if a ble device and advertisement_data matches the matcher."""
|
||||
if (address := matcher.get(ADDRESS)) is not None and device.address != address:
|
||||
return False
|
||||
|
||||
if (local_name := matcher.get(LOCAL_NAME)) is not None and not fnmatch.fnmatch(
|
||||
adv_data.local_name or device.name or device.address,
|
||||
local_name,
|
||||
):
|
||||
return False
|
||||
|
||||
if (
|
||||
service_uuid := matcher.get(SERVICE_UUID)
|
||||
) is not None and service_uuid not in adv_data.service_uuids:
|
||||
return False
|
||||
|
||||
if (
|
||||
service_data_uuid := matcher.get(SERVICE_DATA_UUID)
|
||||
) is not None and service_data_uuid not in adv_data.service_data:
|
||||
return False
|
||||
|
||||
if (
|
||||
manfacturer_id := matcher.get(MANUFACTURER_ID)
|
||||
) is not None and manfacturer_id not in adv_data.manufacturer_data:
|
||||
return False
|
||||
|
||||
if (manufacturer_data_start := matcher.get(MANUFACTURER_DATA_START)) is not None:
|
||||
manufacturer_data_start_bytes = bytearray(manufacturer_data_start)
|
||||
if not any(
|
||||
manufacturer_data.startswith(manufacturer_data_start_bytes)
|
||||
for manufacturer_data in adv_data.manufacturer_data.values()
|
||||
):
|
||||
return False
|
||||
|
||||
return True
|
|
@ -88,6 +88,7 @@ class BluetoothMatcherOptional(TypedDict, total=False):
|
|||
|
||||
local_name: str
|
||||
service_uuid: str
|
||||
service_data_uuid: str
|
||||
manufacturer_id: int
|
||||
manufacturer_data_start: list[int]
|
||||
|
||||
|
|
|
@ -194,6 +194,7 @@ MANIFEST_SCHEMA = vol.Schema(
|
|||
vol.Schema(
|
||||
{
|
||||
vol.Optional("service_uuid"): vol.All(str, verify_lowercase),
|
||||
vol.Optional("service_data_uuid"): vol.All(str, verify_lowercase),
|
||||
vol.Optional("local_name"): vol.All(str),
|
||||
vol.Optional("manufacturer_id"): int,
|
||||
vol.Optional("manufacturer_data_start"): [int],
|
||||
|
|
|
@ -222,7 +222,7 @@ async def test_discovery_match_by_local_name(hass, mock_bleak_scanner_start):
|
|||
assert mock_config_flow.mock_calls[0][1][0] == "switchbot"
|
||||
|
||||
|
||||
async def test_discovery_match_by_manufacturer_id_and_first_byte(
|
||||
async def test_discovery_match_by_manufacturer_id_and_manufacturer_data_start(
|
||||
hass, mock_bleak_scanner_start
|
||||
):
|
||||
"""Test bluetooth discovery match by manufacturer_id and manufacturer_data_start."""
|
||||
|
@ -248,20 +248,33 @@ async def test_discovery_match_by_manufacturer_id_and_first_byte(
|
|||
assert len(mock_bleak_scanner_start.mock_calls) == 1
|
||||
|
||||
hkc_device = BLEDevice("44:44:33:11:23:45", "lock")
|
||||
hkc_adv_no_mfr_data = AdvertisementData(
|
||||
local_name="lock",
|
||||
service_uuids=[],
|
||||
manufacturer_data={},
|
||||
)
|
||||
hkc_adv = AdvertisementData(
|
||||
local_name="lock",
|
||||
service_uuids=[],
|
||||
manufacturer_data={76: b"\x06\x02\x03\x99"},
|
||||
)
|
||||
|
||||
# 1st discovery with no manufacturer data
|
||||
# should not trigger config flow
|
||||
_get_underlying_scanner()._callback(hkc_device, hkc_adv_no_mfr_data)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
mock_config_flow.reset_mock()
|
||||
|
||||
# 2nd discovery with manufacturer data
|
||||
# should trigger a config flow
|
||||
_get_underlying_scanner()._callback(hkc_device, hkc_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_config_flow.mock_calls) == 1
|
||||
assert mock_config_flow.mock_calls[0][1][0] == "homekit_controller"
|
||||
mock_config_flow.reset_mock()
|
||||
|
||||
# 2nd discovery should not generate another flow
|
||||
# 3rd discovery should not generate another flow
|
||||
_get_underlying_scanner()._callback(hkc_device, hkc_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
@ -288,6 +301,230 @@ async def test_discovery_match_by_manufacturer_id_and_first_byte(
|
|||
assert len(mock_config_flow.mock_calls) == 0
|
||||
|
||||
|
||||
async def test_discovery_match_by_service_data_uuid_then_others(
|
||||
hass, mock_bleak_scanner_start
|
||||
):
|
||||
"""Test bluetooth discovery match by service_data_uuid and then other fields."""
|
||||
mock_bt = [
|
||||
{
|
||||
"domain": "my_domain",
|
||||
"service_data_uuid": "0000fd3d-0000-1000-8000-00805f9b34fb",
|
||||
},
|
||||
{
|
||||
"domain": "my_domain",
|
||||
"service_uuid": "0000fd3d-0000-1000-8000-00805f9b34fc",
|
||||
},
|
||||
{
|
||||
"domain": "other_domain",
|
||||
"manufacturer_id": 323,
|
||||
},
|
||||
]
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=mock_bt
|
||||
):
|
||||
assert await async_setup_component(
|
||||
hass, bluetooth.DOMAIN, {bluetooth.DOMAIN: {}}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
with patch.object(hass.config_entries.flow, "async_init") as mock_config_flow:
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_bleak_scanner_start.mock_calls) == 1
|
||||
|
||||
device = BLEDevice("44:44:33:11:23:45", "lock")
|
||||
adv_without_service_data_uuid = AdvertisementData(
|
||||
local_name="lock",
|
||||
service_uuids=[],
|
||||
manufacturer_data={},
|
||||
)
|
||||
adv_with_mfr_data = AdvertisementData(
|
||||
local_name="lock",
|
||||
service_uuids=[],
|
||||
manufacturer_data={323: b"\x01\x02\x03"},
|
||||
service_data={},
|
||||
)
|
||||
adv_with_service_data_uuid = AdvertisementData(
|
||||
local_name="lock",
|
||||
service_uuids=[],
|
||||
manufacturer_data={},
|
||||
service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": b"\x01\x02\x03"},
|
||||
)
|
||||
adv_with_service_data_uuid_and_mfr_data = AdvertisementData(
|
||||
local_name="lock",
|
||||
service_uuids=[],
|
||||
manufacturer_data={323: b"\x01\x02\x03"},
|
||||
service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": b"\x01\x02\x03"},
|
||||
)
|
||||
adv_with_service_data_uuid_and_mfr_data_and_service_uuid = AdvertisementData(
|
||||
local_name="lock",
|
||||
manufacturer_data={323: b"\x01\x02\x03"},
|
||||
service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": b"\x01\x02\x03"},
|
||||
service_uuids=["0000fd3d-0000-1000-8000-00805f9b34fd"],
|
||||
)
|
||||
adv_with_service_uuid = AdvertisementData(
|
||||
local_name="lock",
|
||||
manufacturer_data={},
|
||||
service_data={},
|
||||
service_uuids=["0000fd3d-0000-1000-8000-00805f9b34fd"],
|
||||
)
|
||||
# 1st discovery should not generate a flow because the
|
||||
# service_data_uuid is not in the advertisement
|
||||
_get_underlying_scanner()._callback(device, adv_without_service_data_uuid)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
mock_config_flow.reset_mock()
|
||||
|
||||
# 2nd discovery should not generate a flow because the
|
||||
# service_data_uuid is not in the advertisement
|
||||
_get_underlying_scanner()._callback(device, adv_without_service_data_uuid)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
mock_config_flow.reset_mock()
|
||||
|
||||
# 3rd discovery should generate a flow because the
|
||||
# manufacturer_data is in the advertisement
|
||||
_get_underlying_scanner()._callback(device, adv_with_mfr_data)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 1
|
||||
assert mock_config_flow.mock_calls[0][1][0] == "other_domain"
|
||||
mock_config_flow.reset_mock()
|
||||
|
||||
# 4th discovery should generate a flow because the
|
||||
# service_data_uuid is in the advertisement and
|
||||
# we never saw a service_data_uuid before
|
||||
_get_underlying_scanner()._callback(device, adv_with_service_data_uuid)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 1
|
||||
assert mock_config_flow.mock_calls[0][1][0] == "my_domain"
|
||||
mock_config_flow.reset_mock()
|
||||
|
||||
# 5th discovery should not generate a flow because the
|
||||
# we already saw an advertisement with the service_data_uuid
|
||||
_get_underlying_scanner()._callback(device, adv_with_service_data_uuid)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
|
||||
# 6th discovery should not generate a flow because the
|
||||
# manufacturer_data is in the advertisement
|
||||
# and we saw manufacturer_data before
|
||||
_get_underlying_scanner()._callback(
|
||||
device, adv_with_service_data_uuid_and_mfr_data
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
mock_config_flow.reset_mock()
|
||||
|
||||
# 7th discovery should generate a flow because the
|
||||
# service_uuids is in the advertisement
|
||||
# and we never saw service_uuids before
|
||||
_get_underlying_scanner()._callback(
|
||||
device, adv_with_service_data_uuid_and_mfr_data_and_service_uuid
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 2
|
||||
assert {
|
||||
mock_config_flow.mock_calls[0][1][0],
|
||||
mock_config_flow.mock_calls[1][1][0],
|
||||
} == {"my_domain", "other_domain"}
|
||||
mock_config_flow.reset_mock()
|
||||
|
||||
# 8th discovery should not generate a flow
|
||||
# since all fields have been seen at this point
|
||||
_get_underlying_scanner()._callback(
|
||||
device, adv_with_service_data_uuid_and_mfr_data_and_service_uuid
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
mock_config_flow.reset_mock()
|
||||
|
||||
# 9th discovery should not generate a flow
|
||||
# since all fields have been seen at this point
|
||||
_get_underlying_scanner()._callback(device, adv_with_service_uuid)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
|
||||
# 10th discovery should not generate a flow
|
||||
# since all fields have been seen at this point
|
||||
_get_underlying_scanner()._callback(device, adv_with_service_data_uuid)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
|
||||
# 11th discovery should not generate a flow
|
||||
# since all fields have been seen at this point
|
||||
_get_underlying_scanner()._callback(device, adv_without_service_data_uuid)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
|
||||
|
||||
async def test_discovery_match_first_by_service_uuid_and_then_manufacturer_id(
|
||||
hass, mock_bleak_scanner_start
|
||||
):
|
||||
"""Test bluetooth discovery matches twice for service_uuid and then manufacturer_id."""
|
||||
mock_bt = [
|
||||
{
|
||||
"domain": "my_domain",
|
||||
"manufacturer_id": 76,
|
||||
},
|
||||
{
|
||||
"domain": "my_domain",
|
||||
"service_uuid": "0000fd3d-0000-1000-8000-00805f9b34fc",
|
||||
},
|
||||
]
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=mock_bt
|
||||
):
|
||||
assert await async_setup_component(
|
||||
hass, bluetooth.DOMAIN, {bluetooth.DOMAIN: {}}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
with patch.object(hass.config_entries.flow, "async_init") as mock_config_flow:
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_bleak_scanner_start.mock_calls) == 1
|
||||
|
||||
device = BLEDevice("44:44:33:11:23:45", "lock")
|
||||
adv_service_uuids = AdvertisementData(
|
||||
local_name="lock",
|
||||
service_uuids=["0000fd3d-0000-1000-8000-00805f9b34fc"],
|
||||
manufacturer_data={},
|
||||
)
|
||||
adv_manufacturer_data = AdvertisementData(
|
||||
local_name="lock",
|
||||
service_uuids=[],
|
||||
manufacturer_data={76: b"\x06\x02\x03\x99"},
|
||||
)
|
||||
|
||||
# 1st discovery with matches service_uuid
|
||||
# should trigger config flow
|
||||
_get_underlying_scanner()._callback(device, adv_service_uuids)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 1
|
||||
assert mock_config_flow.mock_calls[0][1][0] == "my_domain"
|
||||
mock_config_flow.reset_mock()
|
||||
|
||||
# 2nd discovery with manufacturer data
|
||||
# should trigger a config flow
|
||||
_get_underlying_scanner()._callback(device, adv_manufacturer_data)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 1
|
||||
assert mock_config_flow.mock_calls[0][1][0] == "my_domain"
|
||||
mock_config_flow.reset_mock()
|
||||
|
||||
# 3rd discovery should not generate another flow
|
||||
_get_underlying_scanner()._callback(device, adv_service_uuids)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
|
||||
# 4th discovery should not generate another flow
|
||||
_get_underlying_scanner()._callback(device, adv_manufacturer_data)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
|
||||
|
||||
async def test_async_discovered_device_api(hass, mock_bleak_scanner_start):
|
||||
"""Test the async_discovered_device API."""
|
||||
mock_bt = []
|
||||
|
|
Loading…
Add table
Reference in a new issue