diff --git a/homeassistant/components/zeroconf/__init__.py b/homeassistant/components/zeroconf/__init__.py index 9d7dd73be01..f12752dc5c3 100644 --- a/homeassistant/components/zeroconf/__init__.py +++ b/homeassistant/components/zeroconf/__init__.py @@ -55,6 +55,7 @@ HOMEKIT_TYPES = [ # Thread based devices "_hap._udp.local.", ] +_HOMEKIT_MODEL_SPLITS = (None, " ", "-") # Top level keys we support matching against in properties that are always matched in # lower case. ex: ZeroconfServiceInfo.name @@ -66,7 +67,8 @@ DEFAULT_DEFAULT_INTERFACE = True DEFAULT_IPV6 = True HOMEKIT_PAIRED_STATUS_FLAG = "sf" -HOMEKIT_MODEL = "md" +HOMEKIT_MODEL_LOWER = "md" +HOMEKIT_MODEL_UPPER = "MD" # Property key=value has a max length of 255 # so we use 230 to leave space for key= @@ -192,7 +194,17 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: zeroconf_types, homekit_models = await asyncio.gather( async_get_zeroconf(hass), async_get_homekit(hass) ) - discovery = ZeroconfDiscovery(hass, zeroconf, zeroconf_types, homekit_models, ipv6) + homekit_model_lookup, homekit_model_matchers = _build_homekit_model_lookups( + homekit_models + ) + discovery = ZeroconfDiscovery( + hass, + zeroconf, + zeroconf_types, + homekit_model_lookup, + homekit_model_matchers, + ipv6, + ) await discovery.async_setup() async def _async_zeroconf_hass_start(hass: HomeAssistant, comp: str) -> None: @@ -212,6 +224,25 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: return True +def _build_homekit_model_lookups( + homekit_models: dict[str, HomeKitDiscoveredIntegration] +) -> tuple[ + dict[str, HomeKitDiscoveredIntegration], + dict[re.Pattern, HomeKitDiscoveredIntegration], +]: + """Build lookups for homekit models.""" + homekit_model_lookup: dict[str, HomeKitDiscoveredIntegration] = {} + homekit_model_matchers: dict[re.Pattern, HomeKitDiscoveredIntegration] = {} + + for model, discovery in homekit_models.items(): + if "*" in model or "?" in model or "[" in model: + homekit_model_matchers[_compile_fnmatch(model)] = discovery + else: + homekit_model_lookup[model] = discovery + + return homekit_model_lookup, homekit_model_matchers + + def _get_announced_addresses( adapters: list[Adapter], first_ip: bytes | None = None, @@ -347,14 +378,17 @@ class ZeroconfDiscovery: hass: HomeAssistant, zeroconf: HaZeroconf, zeroconf_types: dict[str, list[dict[str, str | dict[str, str]]]], - homekit_models: dict[str, HomeKitDiscoveredIntegration], + homekit_model_lookups: dict[str, HomeKitDiscoveredIntegration], + homekit_model_matchers: dict[re.Pattern, HomeKitDiscoveredIntegration], ipv6: bool, ) -> None: """Init discovery.""" self.hass = hass self.zeroconf = zeroconf self.zeroconf_types = zeroconf_types - self.homekit_models = homekit_models + self.homekit_model_lookups = homekit_model_lookups + self.homekit_model_matchers = homekit_model_matchers + self.ipv6 = ipv6 self.async_service_browser: HaAsyncServiceBrowser | None = None @@ -450,14 +484,14 @@ class ZeroconfDiscovery: # If we can handle it as a HomeKit discovery, we do that here. if service_type in HOMEKIT_TYPES and ( - homekit_model := async_get_homekit_discovery_domain( - self.homekit_models, props + homekit_discovery := async_get_homekit_discovery( + self.homekit_model_lookups, self.homekit_model_matchers, props ) ): - domain = homekit_model.domain + domain = homekit_discovery.domain discovery_flow.async_create_flow( self.hass, - homekit_model.domain, + homekit_discovery.domain, {"source": config_entries.SOURCE_HOMEKIT}, info, ) @@ -468,7 +502,7 @@ class ZeroconfDiscovery: # We only send updates to homekit_controller # if the device is already paired in order to avoid # offering a second discovery for the same device - if not is_homekit_paired(props) and not homekit_model.always_discover: + if not is_homekit_paired(props) and not homekit_discovery.always_discover: # If the device is paired with HomeKit we must send on # the update to homekit_controller so it can see when # the 'c#' field is updated. This is used to detect @@ -513,31 +547,28 @@ class ZeroconfDiscovery: ) -def async_get_homekit_discovery_domain( - homekit_models: dict[str, HomeKitDiscoveredIntegration], props: dict[str, Any] +def async_get_homekit_discovery( + homekit_model_lookups: dict[str, HomeKitDiscoveredIntegration], + homekit_model_matchers: dict[re.Pattern, HomeKitDiscoveredIntegration], + props: dict[str, Any], ) -> HomeKitDiscoveredIntegration | None: """Handle a HomeKit discovery. Return the domain to forward the discovery data to """ - model = None - for key in props: - if key.lower() == HOMEKIT_MODEL: - model = props[key] - break - - if model is None: + if not (model := props.get(HOMEKIT_MODEL_LOWER) or props.get(HOMEKIT_MODEL_UPPER)): return None - for test_model in homekit_models: - if ( - model != test_model - and not model.startswith((f"{test_model} ", f"{test_model}-")) - and not _memorized_fnmatch(model, test_model) - ): - continue + assert isinstance(model, str) - return homekit_models[test_model] + for split_str in _HOMEKIT_MODEL_SPLITS: + key = (model.split(split_str))[0] if split_str else model + if discovery := homekit_model_lookups.get(key): + return discovery + + for pattern, discovery in homekit_model_matchers.items(): + if pattern.match(model): + return discovery return None