diff --git a/.strict-typing b/.strict-typing
index e1c0d905621..e11374312da 100644
--- a/.strict-typing
+++ b/.strict-typing
@@ -61,6 +61,7 @@ homeassistant.components.scene.*
homeassistant.components.sensor.*
homeassistant.components.slack.*
homeassistant.components.sonos.media_player
+homeassistant.components.ssdp.*
homeassistant.components.sun.*
homeassistant.components.switch.*
homeassistant.components.synology_dsm.*
diff --git a/homeassistant/components/ssdp/__init__.py b/homeassistant/components/ssdp/__init__.py
index d9f74e5e776..5fcd58e14f7 100644
--- a/homeassistant/components/ssdp/__init__.py
+++ b/homeassistant/components/ssdp/__init__.py
@@ -4,18 +4,23 @@ from __future__ import annotations
import asyncio
from collections.abc import Mapping
from datetime import timedelta
+from ipaddress import IPv4Address, IPv6Address
import logging
-from typing import Any
+from typing import Any, Callable
-import aiohttp
-from async_upnp_client.search import async_search
-from defusedxml import ElementTree
-from netdisco import ssdp, util
+from async_upnp_client.search import SSDPListener
+from async_upnp_client.utils import CaseInsensitiveDict
+from homeassistant import config_entries
+from homeassistant.components import network
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP
-from homeassistant.core import callback
+from homeassistant.core import CoreState, HomeAssistant, callback as core_callback
from homeassistant.helpers.event import async_track_time_interval
-from homeassistant.loader import async_get_ssdp
+from homeassistant.helpers.typing import ConfigType
+from homeassistant.loader import async_get_ssdp, bind_hass
+
+from .descriptions import DescriptionManager
+from .flow import FlowDispatcher, SSDPFlow
DOMAIN = "ssdp"
SCAN_INTERVAL = timedelta(seconds=60)
@@ -40,188 +45,321 @@ ATTR_UPNP_UDN = "UDN"
ATTR_UPNP_UPC = "UPC"
ATTR_UPNP_PRESENTATION_URL = "presentationURL"
+
+DISCOVERY_MAPPING = {
+ "usn": ATTR_SSDP_USN,
+ "ext": ATTR_SSDP_EXT,
+ "server": ATTR_SSDP_SERVER,
+ "st": ATTR_SSDP_ST,
+ "location": ATTR_SSDP_LOCATION,
+}
+
+
_LOGGER = logging.getLogger(__name__)
-async def async_setup(hass, config):
+@bind_hass
+def async_register_callback(
+ hass: HomeAssistant,
+ callback: Callable[[dict], None],
+ match_dict: None | dict[str, str] = None,
+) -> Callable[[], None]:
+ """Register to receive a callback on ssdp broadcast.
+
+ Returns a callback that can be used to cancel the registration.
+ """
+ scanner: Scanner = hass.data[DOMAIN]
+ return scanner.async_register_callback(callback, match_dict)
+
+
+@bind_hass
+def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
+ hass: HomeAssistant, udn: str, st: str
+) -> dict[str, str] | None:
+ """Fetch the discovery info cache."""
+ scanner: Scanner = hass.data[DOMAIN]
+ return scanner.async_get_discovery_info_by_udn_st(udn, st)
+
+
+@bind_hass
+def async_get_discovery_info_by_st( # pylint: disable=invalid-name
+ hass: HomeAssistant, st: str
+) -> list[dict[str, str]]:
+ """Fetch all the entries matching the st."""
+ scanner: Scanner = hass.data[DOMAIN]
+ return scanner.async_get_discovery_info_by_st(st)
+
+
+@bind_hass
+def async_get_discovery_info_by_udn(
+ hass: HomeAssistant, udn: str
+) -> list[dict[str, str]]:
+ """Fetch all the entries matching the udn."""
+ scanner: Scanner = hass.data[DOMAIN]
+ return scanner.async_get_discovery_info_by_udn(udn)
+
+
+async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the SSDP integration."""
- async def _async_initialize(_):
- scanner = Scanner(hass, await async_get_ssdp(hass))
- await scanner.async_scan(None)
- cancel_scan = async_track_time_interval(hass, scanner.async_scan, SCAN_INTERVAL)
+ scanner = hass.data[DOMAIN] = Scanner(hass, await async_get_ssdp(hass))
- @callback
- def _async_stop_scans(event):
- cancel_scan()
-
- hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_stop_scans)
-
- hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, _async_initialize)
+ asyncio.create_task(scanner.async_start())
return True
+@core_callback
+def _async_use_default_interface(adapters: list[network.Adapter]) -> bool:
+ for adapter in adapters:
+ if adapter["enabled"] and not adapter["default"]:
+ return False
+ return True
+
+
+@core_callback
+def _async_process_callbacks(
+ callbacks: list[Callable[[dict], None]], discovery_info: dict[str, str]
+) -> None:
+ for callback in callbacks:
+ try:
+ callback(discovery_info)
+ except Exception: # pylint: disable=broad-except
+ _LOGGER.exception("Failed to callback info: %s", discovery_info)
+
+
class Scanner:
"""Class to manage SSDP scanning."""
- def __init__(self, hass, integration_matchers):
+ def __init__(
+ self, hass: HomeAssistant, integration_matchers: dict[str, list[dict[str, str]]]
+ ) -> None:
"""Initialize class."""
self.hass = hass
- self.seen = set()
- self._entries = []
+ self.seen: set[tuple[str, str]] = set()
+ self.cache: dict[tuple[str, str], Mapping[str, str]] = {}
self._integration_matchers = integration_matchers
- self._description_cache = {}
+ self._cancel_scan: Callable[[], None] | None = None
+ self._ssdp_listeners: list[SSDPListener] = []
+ self._callbacks: list[tuple[Callable[[dict], None], dict[str, str]]] = []
+ self.flow_dispatcher: FlowDispatcher | None = None
+ self.description_manager: DescriptionManager | None = None
- async def _on_ssdp_response(self, data: Mapping[str, Any]) -> None:
- """Process an ssdp response."""
- self.async_store_entry(
- ssdp.UPNPEntry({key.lower(): item for key, item in data.items()})
+ @core_callback
+ def async_register_callback(
+ self, callback: Callable[[dict], None], match_dict: None | dict[str, str] = None
+ ) -> Callable[[], None]:
+ """Register a callback."""
+ if match_dict is None:
+ match_dict = {}
+
+ # Make sure any entries that happened
+ # before the callback was registered are fired
+ if self.hass.state != CoreState.running:
+ for headers in self.cache.values():
+ self._async_callback_if_match(callback, headers, match_dict)
+
+ callback_entry = (callback, match_dict)
+ self._callbacks.append(callback_entry)
+
+ @core_callback
+ def _async_remove_callback() -> None:
+ self._callbacks.remove(callback_entry)
+
+ return _async_remove_callback
+
+ @core_callback
+ def _async_callback_if_match(
+ self,
+ callback: Callable[[dict], None],
+ headers: Mapping[str, str],
+ match_dict: dict[str, str],
+ ) -> None:
+ """Fire a callback if info matches the match dict."""
+ if not all(headers.get(k) == v for (k, v) in match_dict.items()):
+ return
+ _async_process_callbacks(
+ [callback], self._async_headers_to_discovery_info(headers)
)
- @callback
- def async_store_entry(self, entry):
- """Save an entry for later processing."""
- self._entries.append(entry)
+ @core_callback
+ def async_stop(self, *_: Any) -> None:
+ """Stop the scanner."""
+ assert self._cancel_scan is not None
+ self._cancel_scan()
+ for listener in self._ssdp_listeners:
+ listener.async_stop()
+ self._ssdp_listeners = []
- async def async_scan(self, _):
+ async def _async_build_source_set(self) -> set[IPv4Address | IPv6Address]:
+ """Build the list of ssdp sources."""
+ adapters = await network.async_get_adapters(self.hass)
+ sources: set[IPv4Address | IPv6Address] = set()
+ if _async_use_default_interface(adapters):
+ sources.add(IPv4Address("0.0.0.0"))
+ return sources
+
+ for adapter in adapters:
+ if not adapter["enabled"]:
+ continue
+ if adapter["ipv4"]:
+ ipv4 = adapter["ipv4"][0]
+ sources.add(IPv4Address(ipv4["address"]))
+ if adapter["ipv6"]:
+ ipv6 = adapter["ipv6"][0]
+ # With python 3.9 add scope_ids can be
+ # added by enumerating adapter["ipv6"]s
+ # IPv6Address(f"::%{ipv6['scope_id']}")
+ sources.add(IPv6Address(ipv6["address"]))
+
+ return sources
+
+ @core_callback
+ def async_scan(self, *_: Any) -> None:
"""Scan for new entries."""
+ for listener in self._ssdp_listeners:
+ listener.async_search()
- await async_search(async_callback=self._on_ssdp_response)
- await self._process_entries()
-
- # We clear the cache after each run. We track discovered entries
- # so will never need a description twice.
- self._description_cache.clear()
- self._entries.clear()
-
- async def _process_entries(self):
- """Process SSDP entries."""
- entries_to_process = []
- unseen_locations = set()
-
- for entry in self._entries:
- key = (entry.st, entry.location)
-
- if key in self.seen:
- continue
-
- self.seen.add(key)
-
- entries_to_process.append(entry)
-
- if (
- entry.location is not None
- and entry.location not in self._description_cache
- ):
- unseen_locations.add(entry.location)
-
- if not entries_to_process:
- return
-
- if unseen_locations:
- await self._fetch_descriptions(list(unseen_locations))
-
- tasks = []
-
- for entry in entries_to_process:
- info, domains = self._process_entry(entry)
- for domain in domains:
- _LOGGER.debug("Discovered %s at %s", domain, entry.location)
- tasks.append(
- self.hass.config_entries.flow.async_init(
- domain, context={"source": DOMAIN}, data=info
- )
+ async def async_start(self) -> None:
+ """Start the scanner."""
+ self.description_manager = DescriptionManager(self.hass)
+ self.flow_dispatcher = FlowDispatcher(self.hass)
+ for source_ip in await self._async_build_source_set():
+ self._ssdp_listeners.append(
+ SSDPListener(
+ async_callback=self._async_process_entry, source_ip=source_ip
)
-
- if tasks:
- await asyncio.gather(*tasks)
-
- async def _fetch_descriptions(self, locations):
- """Fetch descriptions from locations."""
-
- for idx, result in enumerate(
- await asyncio.gather(
- *[self._fetch_description(location) for location in locations],
- return_exceptions=True,
)
- ):
- location = locations[idx]
- if isinstance(result, Exception):
- _LOGGER.exception(
- "Failed to fetch ssdp data from: %s", location, exc_info=result
- )
- continue
+ self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stop)
+ self.hass.bus.async_listen_once(
+ EVENT_HOMEASSISTANT_STARTED, self.flow_dispatcher.async_start
+ )
+ await asyncio.gather(
+ *[listener.async_start() for listener in self._ssdp_listeners]
+ )
+ self._cancel_scan = async_track_time_interval(
+ self.hass, self.async_scan, SCAN_INTERVAL
+ )
- self._description_cache[location] = result
-
- def _process_entry(self, entry):
- """Process a single entry."""
-
- info = {"st": entry.st}
- for key in "usn", "ext", "server":
- if key in entry.values:
- info[key] = entry.values[key]
-
- if entry.location:
- # Multiple entries usually share same location. Make sure
- # we fetch it only once.
- info_req = self._description_cache.get(entry.location)
- if info_req is None:
- return (None, [])
-
- info.update(info_req)
+ @core_callback
+ def _async_get_matching_callbacks(
+ self, headers: Mapping[str, str]
+ ) -> list[Callable[[dict], None]]:
+ """Return a list of callbacks that match."""
+ return [
+ callback
+ for callback, match_dict in self._callbacks
+ if all(headers.get(k) == v for (k, v) in match_dict.items())
+ ]
+ @core_callback
+ def _async_matching_domains(self, info_with_req: CaseInsensitiveDict) -> set[str]:
domains = set()
for domain, matchers in self._integration_matchers.items():
for matcher in matchers:
- if all(info.get(k) == v for (k, v) in matcher.items()):
+ if all(info_with_req.get(k) == v for (k, v) in matcher.items()):
domains.add(domain)
+ return domains
- if domains:
- return (info_from_entry(entry, info), domains)
+ async def _async_process_entry(self, headers: Mapping[str, str]) -> None:
+ """Process SSDP entries."""
+ _LOGGER.debug("_async_process_entry: %s", headers)
+ if "st" not in headers or "location" not in headers:
+ return
+ h_st = headers["st"]
+ h_location = headers["location"]
+ key = (h_st, h_location)
- return (None, [])
+ if udn := _udn_from_usn(headers.get("usn")):
+ self.cache[(udn, h_st)] = headers
- async def _fetch_description(self, xml_location):
- """Fetch an XML description."""
- session = self.hass.helpers.aiohttp_client.async_get_clientsession()
- try:
- for _ in range(2):
- resp = await session.get(xml_location, timeout=5)
- xml = await resp.text(errors="replace")
- # Samsung Smart TV sometimes returns an empty document the
- # first time. Retry once.
- if xml:
- break
- except (aiohttp.ClientError, asyncio.TimeoutError) as err:
- _LOGGER.debug("Error fetching %s: %s", xml_location, err)
- return {}
+ callbacks = self._async_get_matching_callbacks(headers)
+ if key in self.seen and not callbacks:
+ return
- try:
- tree = ElementTree.fromstring(xml)
- except ElementTree.ParseError as err:
- _LOGGER.debug("Error parsing %s: %s", xml_location, err)
- return {}
+ assert self.description_manager is not None
+ info_req = await self.description_manager.fetch_description(h_location) or {}
+ info_with_req = CaseInsensitiveDict(**headers, **info_req)
+ discovery_info = discovery_info_from_headers_and_request(info_with_req)
- return util.etree_to_dict(tree).get("root", {}).get("device", {})
+ _async_process_callbacks(callbacks, discovery_info)
+ if key in self.seen:
+ return
+ self.seen.add(key)
+
+ for domain in self._async_matching_domains(info_with_req):
+ _LOGGER.debug("Discovered %s at %s", domain, h_location)
+ flow: SSDPFlow = {
+ "domain": domain,
+ "context": {"source": config_entries.SOURCE_SSDP},
+ "data": discovery_info,
+ }
+ assert self.flow_dispatcher is not None
+ self.flow_dispatcher.create(flow)
+
+ @core_callback
+ def _async_headers_to_discovery_info(
+ self, headers: Mapping[str, str]
+ ) -> dict[str, str]:
+ """Combine the headers and description into discovery_info.
+
+ Building this is a bit expensive so we only do it on demand.
+ """
+ assert self.description_manager is not None
+ location = headers["location"]
+ info_req = self.description_manager.async_cached_description(location) or {}
+ return discovery_info_from_headers_and_request(
+ CaseInsensitiveDict(**headers, **info_req)
+ )
+
+ @core_callback
+ def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
+ self, udn: str, st: str
+ ) -> dict[str, str] | None:
+ """Return discovery_info for a udn and st."""
+ if headers := self.cache.get((udn, st)):
+ return self._async_headers_to_discovery_info(headers)
+ return None
+
+ @core_callback
+ def async_get_discovery_info_by_st( # pylint: disable=invalid-name
+ self, st: str
+ ) -> list[dict[str, str]]:
+ """Return matching discovery_infos for a st."""
+ return [
+ self._async_headers_to_discovery_info(headers)
+ for udn_st, headers in self.cache.items()
+ if udn_st[1] == st
+ ]
+
+ @core_callback
+ def async_get_discovery_info_by_udn(self, udn: str) -> list[dict[str, str]]:
+ """Return matching discovery_infos for a udn."""
+ return [
+ self._async_headers_to_discovery_info(headers)
+ for udn_st, headers in self.cache.items()
+ if udn_st[0] == udn
+ ]
-def info_from_entry(entry, device_info):
- """Get info from an entry."""
- info = {
- ATTR_SSDP_LOCATION: entry.location,
- ATTR_SSDP_ST: entry.st,
- }
- if device_info:
- info.update(device_info)
- info.pop("st", None)
- if "usn" in info:
- info[ATTR_SSDP_USN] = info.pop("usn")
- if "ext" in info:
- info[ATTR_SSDP_EXT] = info.pop("ext")
- if "server" in info:
- info[ATTR_SSDP_SERVER] = info.pop("server")
+def discovery_info_from_headers_and_request(
+ info_with_req: CaseInsensitiveDict,
+) -> dict[str, str]:
+ """Convert headers and description to discovery_info."""
+ info = {DISCOVERY_MAPPING.get(k.lower(), k): v for k, v in info_with_req.items()}
+
+ if ATTR_UPNP_UDN not in info and ATTR_SSDP_USN in info:
+ if udn := _udn_from_usn(info[ATTR_SSDP_USN]):
+ info[ATTR_UPNP_UDN] = udn
return info
+
+
+def _udn_from_usn(usn: str | None) -> str | None:
+ """Get the UDN from the USN."""
+ if usn is None:
+ return None
+ if usn.startswith("uuid:"):
+ return usn.split("::")[0]
+ return None
diff --git a/homeassistant/components/ssdp/descriptions.py b/homeassistant/components/ssdp/descriptions.py
new file mode 100644
index 00000000000..a6fda3685f2
--- /dev/null
+++ b/homeassistant/components/ssdp/descriptions.py
@@ -0,0 +1,70 @@
+"""The SSDP integration."""
+from __future__ import annotations
+
+import asyncio
+import logging
+
+import aiohttp
+from defusedxml import ElementTree
+from netdisco import util
+
+from homeassistant.core import HomeAssistant, callback
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class DescriptionManager:
+ """Class to cache and manage fetching descriptions."""
+
+ def __init__(self, hass: HomeAssistant) -> None:
+ """Init the manager."""
+ self.hass = hass
+ self._description_cache: dict[str, None | dict[str, str]] = {}
+
+ async def fetch_description(
+ self, xml_location: str | None
+ ) -> None | dict[str, str]:
+ """Fetch the location or get it from the cache."""
+ if xml_location is None:
+ return None
+ if xml_location not in self._description_cache:
+ try:
+ self._description_cache[xml_location] = await self._fetch_description(
+ xml_location
+ )
+ except Exception: # pylint: disable=broad-except
+ # If it fails, cache the failure so we do not keep trying over and over
+ self._description_cache[xml_location] = None
+ _LOGGER.exception("Failed to fetch ssdp data from: %s", xml_location)
+
+ return self._description_cache[xml_location]
+
+ @callback
+ def async_cached_description(self, xml_location: str) -> None | dict[str, str]:
+ """Fetch the description from the cache."""
+ return self._description_cache[xml_location]
+
+ async def _fetch_description(self, xml_location: str) -> None | dict[str, str]:
+ """Fetch an XML description."""
+ session = self.hass.helpers.aiohttp_client.async_get_clientsession()
+ try:
+ for _ in range(2):
+ resp = await session.get(xml_location, timeout=5)
+ # Samsung Smart TV sometimes returns an empty document the
+ # first time. Retry once.
+ if xml := await resp.text(errors="replace"):
+ break
+ except (aiohttp.ClientError, asyncio.TimeoutError) as err:
+ _LOGGER.debug("Error fetching %s: %s", xml_location, err)
+ return None
+
+ try:
+ tree = ElementTree.fromstring(xml)
+ except ElementTree.ParseError as err:
+ _LOGGER.debug("Error parsing %s: %s", xml_location, err)
+ return None
+
+ parsed: dict[str, str] = (
+ util.etree_to_dict(tree).get("root", {}).get("device", {})
+ )
+ return parsed
diff --git a/homeassistant/components/ssdp/flow.py b/homeassistant/components/ssdp/flow.py
new file mode 100644
index 00000000000..77f4cb107b8
--- /dev/null
+++ b/homeassistant/components/ssdp/flow.py
@@ -0,0 +1,50 @@
+"""The SSDP integration."""
+from __future__ import annotations
+
+from collections.abc import Coroutine
+from typing import Any, TypedDict
+
+from homeassistant.core import HomeAssistant, callback
+from homeassistant.data_entry_flow import FlowResult
+
+
+class SSDPFlow(TypedDict):
+ """A queued ssdp discovery flow."""
+
+ domain: str
+ context: dict[str, Any]
+ data: dict
+
+
+class FlowDispatcher:
+ """Dispatch discovery flows."""
+
+ def __init__(self, hass: HomeAssistant) -> None:
+ """Init the discovery dispatcher."""
+ self.hass = hass
+ self.pending_flows: list[SSDPFlow] = []
+ self.started = False
+
+ @callback
+ def async_start(self, *_: Any) -> None:
+ """Start processing pending flows."""
+ self.started = True
+ self.hass.loop.call_soon(self._async_process_pending_flows)
+
+ def _async_process_pending_flows(self) -> None:
+ for flow in self.pending_flows:
+ self.hass.async_create_task(self._init_flow(flow))
+ self.pending_flows = []
+
+ def create(self, flow: SSDPFlow) -> None:
+ """Create and add or queue a flow."""
+ if self.started:
+ self.hass.async_create_task(self._init_flow(flow))
+ else:
+ self.pending_flows.append(flow)
+
+ def _init_flow(self, flow: SSDPFlow) -> Coroutine[None, None, FlowResult]:
+ """Create a flow."""
+ return self.hass.config_entries.flow.async_init(
+ flow["domain"], context=flow["context"], data=flow["data"]
+ )
diff --git a/homeassistant/components/ssdp/manifest.json b/homeassistant/components/ssdp/manifest.json
index d3dbc0c920e..59c992cd34d 100644
--- a/homeassistant/components/ssdp/manifest.json
+++ b/homeassistant/components/ssdp/manifest.json
@@ -4,9 +4,9 @@
"documentation": "https://www.home-assistant.io/integrations/ssdp",
"requirements": [
"defusedxml==0.7.1",
- "netdisco==2.8.3",
"async-upnp-client==0.18.0"
],
+ "dependencies": ["network"],
"after_dependencies": ["zeroconf"],
"codeowners": [],
"quality_scale": "internal",
diff --git a/homeassistant/package_constraints.txt b/homeassistant/package_constraints.txt
index 7e6be2f0016..cfc0585fc84 100644
--- a/homeassistant/package_constraints.txt
+++ b/homeassistant/package_constraints.txt
@@ -21,7 +21,6 @@ home-assistant-frontend==20210528.0
httpx==0.18.0
ifaddr==0.1.7
jinja2>=3.0.1
-netdisco==2.8.3
paho-mqtt==1.5.1
pillow==8.1.2
pip>=8.0.3,<20.3
diff --git a/mypy.ini b/mypy.ini
index 8de49a59259..43468d5b173 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -682,6 +682,17 @@ no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
+[mypy-homeassistant.components.ssdp.*]
+check_untyped_defs = true
+disallow_incomplete_defs = true
+disallow_subclassing_any = true
+disallow_untyped_calls = true
+disallow_untyped_decorators = true
+disallow_untyped_defs = true
+no_implicit_optional = true
+warn_return_any = true
+warn_unreachable = true
+
[mypy-homeassistant.components.sun.*]
check_untyped_defs = true
disallow_incomplete_defs = true
diff --git a/requirements_all.txt b/requirements_all.txt
index 7e207b9499f..4142f41a306 100644
--- a/requirements_all.txt
+++ b/requirements_all.txt
@@ -993,7 +993,6 @@ nessclient==0.9.15
netdata==0.2.0
# homeassistant.components.discovery
-# homeassistant.components.ssdp
netdisco==2.8.3
# homeassistant.components.nam
diff --git a/requirements_test_all.txt b/requirements_test_all.txt
index 8f4e63fa0fd..ceb880c0ccd 100644
--- a/requirements_test_all.txt
+++ b/requirements_test_all.txt
@@ -547,7 +547,6 @@ ndms2_client==0.1.1
nessclient==0.9.15
# homeassistant.components.discovery
-# homeassistant.components.ssdp
netdisco==2.8.3
# homeassistant.components.nam
diff --git a/tests/components/ssdp/test_init.py b/tests/components/ssdp/test_init.py
index 78b0f9e05b6..6dfdc02f6e7 100644
--- a/tests/components/ssdp/test_init.py
+++ b/tests/components/ssdp/test_init.py
@@ -1,42 +1,70 @@
"""Test the SSDP integration."""
import asyncio
from datetime import timedelta
+from ipaddress import IPv4Address, IPv6Address
from unittest.mock import patch
import aiohttp
+from async_upnp_client.search import SSDPListener
+from async_upnp_client.utils import CaseInsensitiveDict
import pytest
from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP
+from homeassistant.core import CoreState, callback
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.common import async_fire_time_changed, mock_coro
-async def test_scan_match_st(hass, caplog):
- """Test matching based on ST."""
- scanner = ssdp.Scanner(hass, {"mock-domain": [{"st": "mock-st"}]})
+def _patched_ssdp_listener(info, *args, **kwargs):
+ listener = SSDPListener(*args, **kwargs)
- async def _mock_async_scan(*args, async_callback=None, **kwargs):
- await async_callback(
- {
- "st": "mock-st",
- "location": None,
- "usn": "mock-usn",
- "server": "mock-server",
- "ext": "",
- }
+ async def _async_callback(*_):
+ await listener.async_callback(info)
+
+ listener.async_start = _async_callback
+ return listener
+
+
+async def _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp):
+ def _generate_fake_ssdp_listener(*args, **kwargs):
+ return _patched_ssdp_listener(
+ mock_ssdp_response,
+ *args,
+ **kwargs,
)
with patch(
- "homeassistant.components.ssdp.async_search",
- side_effect=_mock_async_scan,
+ "homeassistant.components.ssdp.async_get_ssdp",
+ return_value=mock_get_ssdp,
+ ), patch(
+ "homeassistant.components.ssdp.SSDPListener",
+ new=_generate_fake_ssdp_listener,
), patch.object(
hass.config_entries.flow, "async_init", return_value=mock_coro()
) as mock_init:
- await scanner.async_scan(None)
+ assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
+ hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
+ await hass.async_block_till_done()
+ await hass.async_block_till_done()
+
+ return mock_init
+
+
+async def test_scan_match_st(hass, caplog):
+ """Test matching based on ST."""
+ mock_ssdp_response = {
+ "st": "mock-st",
+ "location": None,
+ "usn": "mock-usn",
+ "server": "mock-server",
+ "ext": "",
+ }
+ mock_get_ssdp = {"mock-domain": [{"st": "mock-st"}]}
+ mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
assert len(mock_init.mock_calls) == 1
assert mock_init.mock_calls[0][1][0] == "mock-domain"
@@ -53,6 +81,19 @@ async def test_scan_match_st(hass, caplog):
assert "Failed to fetch ssdp data" not in caplog.text
+async def test_partial_response(hass, caplog):
+ """Test location and st missing."""
+ mock_ssdp_response = {
+ "usn": "mock-usn",
+ "server": "mock-server",
+ "ext": "",
+ }
+ mock_get_ssdp = {"mock-domain": [{"st": "mock-st"}]}
+ mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
+
+ assert len(mock_init.mock_calls) == 0
+
+
@pytest.mark.parametrize(
"key", (ssdp.ATTR_UPNP_MANUFACTURER, ssdp.ATTR_UPNP_DEVICE_TYPE)
)
@@ -68,25 +109,12 @@ async def test_scan_match_upnp_devicedesc(hass, aioclient_mock, key):
""",
)
- scanner = ssdp.Scanner(hass, {"mock-domain": [{key: "Paulus"}]})
-
- async def _mock_async_scan(*args, async_callback=None, **kwargs):
- for _ in range(5):
- await async_callback(
- {
- "st": "mock-st",
- "location": "http://1.1.1.1",
- }
- )
-
- with patch(
- "homeassistant.components.ssdp.async_search",
- side_effect=_mock_async_scan,
- ), patch.object(
- hass.config_entries.flow, "async_init", return_value=mock_coro()
- ) as mock_init:
- await scanner.async_scan(None)
-
+ mock_get_ssdp = {"mock-domain": [{key: "Paulus"}]}
+ mock_ssdp_response = {
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ }
+ mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
# If we get duplicate respones, ensure we only look it up once
assert len(aioclient_mock.mock_calls) == 1
assert len(mock_init.mock_calls) == 1
@@ -108,33 +136,19 @@ async def test_scan_not_all_present(hass, aioclient_mock):
""",
)
- scanner = ssdp.Scanner(
- hass,
- {
- "mock-domain": [
- {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
- ssdp.ATTR_UPNP_MANUFACTURER: "Paulus",
- }
- ]
- },
- )
-
- async def _mock_async_scan(*args, async_callback=None, **kwargs):
- await async_callback(
+ mock_ssdp_response = {
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ }
+ mock_get_ssdp = {
+ "mock-domain": [
{
- "st": "mock-st",
- "location": "http://1.1.1.1",
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
+ ssdp.ATTR_UPNP_MANUFACTURER: "Paulus",
}
- )
-
- with patch(
- "homeassistant.components.ssdp.async_search",
- side_effect=_mock_async_scan,
- ), patch.object(
- hass.config_entries.flow, "async_init", return_value=mock_coro()
- ) as mock_init:
- await scanner.async_scan(None)
+ ]
+ }
+ mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
assert not mock_init.mock_calls
@@ -152,33 +166,19 @@ async def test_scan_not_all_match(hass, aioclient_mock):
""",
)
- scanner = ssdp.Scanner(
- hass,
- {
- "mock-domain": [
- {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
- ssdp.ATTR_UPNP_MANUFACTURER: "Not-Paulus",
- }
- ]
- },
- )
-
- async def _mock_async_scan(*args, async_callback=None, **kwargs):
- await async_callback(
+ mock_ssdp_response = {
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ }
+ mock_get_ssdp = {
+ "mock-domain": [
{
- "st": "mock-st",
- "location": "http://1.1.1.1",
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
+ ssdp.ATTR_UPNP_MANUFACTURER: "Not-Paulus",
}
- )
-
- with patch(
- "homeassistant.components.ssdp.async_search",
- side_effect=_mock_async_scan,
- ), patch.object(
- hass.config_entries.flow, "async_init", return_value=mock_coro()
- ) as mock_init:
- await scanner.async_scan(None)
+ ]
+ }
+ mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
assert not mock_init.mock_calls
@@ -187,21 +187,21 @@ async def test_scan_not_all_match(hass, aioclient_mock):
async def test_scan_description_fetch_fail(hass, aioclient_mock, exc):
"""Test failing to fetch description."""
aioclient_mock.get("http://1.1.1.1", exc=exc)
- scanner = ssdp.Scanner(hass, {})
-
- async def _mock_async_scan(*args, async_callback=None, **kwargs):
- await async_callback(
+ mock_ssdp_response = {
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ }
+ mock_get_ssdp = {
+ "mock-domain": [
{
- "st": "mock-st",
- "location": "http://1.1.1.1",
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
+ ssdp.ATTR_UPNP_MANUFACTURER: "Paulus",
}
- )
+ ]
+ }
+ mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
- with patch(
- "homeassistant.components.ssdp.async_search",
- side_effect=_mock_async_scan,
- ):
- await scanner.async_scan(None)
+ assert not mock_init.mock_calls
async def test_scan_description_parse_fail(hass, aioclient_mock):
@@ -212,21 +212,22 @@ async def test_scan_description_parse_fail(hass, aioclient_mock):
INVALIDXML
""",
)
- scanner = ssdp.Scanner(hass, {})
- async def _mock_async_scan(*args, async_callback=None, **kwargs):
- await async_callback(
+ mock_ssdp_response = {
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ }
+ mock_get_ssdp = {
+ "mock-domain": [
{
- "st": "mock-st",
- "location": "http://1.1.1.1",
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
+ ssdp.ATTR_UPNP_MANUFACTURER: "Paulus",
}
- )
+ ]
+ }
+ mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
- with patch(
- "homeassistant.components.ssdp.async_search",
- side_effect=_mock_async_scan,
- ):
- await scanner.async_scan(None)
+ assert not mock_init.mock_calls
async def test_invalid_characters(hass, aioclient_mock):
@@ -242,32 +243,20 @@ async def test_invalid_characters(hass, aioclient_mock):
""",
)
- scanner = ssdp.Scanner(
- hass,
- {
- "mock-domain": [
- {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
- }
- ]
- },
- )
- async def _mock_async_scan(*args, async_callback=None, **kwargs):
- await async_callback(
+ mock_ssdp_response = {
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ }
+ mock_get_ssdp = {
+ "mock-domain": [
{
- "st": "mock-st",
- "location": "http://1.1.1.1",
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
}
- )
+ ]
+ }
- with patch(
- "homeassistant.components.ssdp.async_search",
- side_effect=_mock_async_scan,
- ), patch.object(
- hass.config_entries.flow, "async_init", return_value=mock_coro()
- ) as mock_init:
- await scanner.async_scan(None)
+ mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
assert len(mock_init.mock_calls) == 1
assert mock_init.mock_calls[0][1][0] == "mock-domain"
@@ -282,8 +271,9 @@ async def test_invalid_characters(hass, aioclient_mock):
}
-@patch("homeassistant.components.ssdp.async_search")
-async def test_start_stop_scanner(async_search_mock, hass):
+@patch("homeassistant.components.ssdp.SSDPListener.async_start")
+@patch("homeassistant.components.ssdp.SSDPListener.async_search")
+async def test_start_stop_scanner(async_start_mock, async_search_mock, hass):
"""Test we start and stop the scanner."""
assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
@@ -291,13 +281,15 @@ async def test_start_stop_scanner(async_search_mock, hass):
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
- assert async_search_mock.call_count == 2
+ assert async_start_mock.call_count == 1
+ assert async_search_mock.call_count == 1
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
- assert async_search_mock.call_count == 2
+ assert async_start_mock.call_count == 1
+ assert async_search_mock.call_count == 1
async def test_unexpected_exception_while_fetching(hass, aioclient_mock, caplog):
@@ -313,34 +305,357 @@ async def test_unexpected_exception_while_fetching(hass, aioclient_mock, caplog)
""",
)
- scanner = ssdp.Scanner(
- hass,
- {
- "mock-domain": [
- {
- ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
- }
- ]
- },
- )
-
- async def _mock_async_scan(*args, async_callback=None, **kwargs):
- await async_callback(
+ mock_ssdp_response = {
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ }
+ mock_get_ssdp = {
+ "mock-domain": [
{
- "st": "mock-st",
- "location": "http://1.1.1.1",
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
}
- )
+ ]
+ }
with patch(
- "homeassistant.components.ssdp.ElementTree.fromstring", side_effect=ValueError
- ), patch(
- "homeassistant.components.ssdp.async_search",
- side_effect=_mock_async_scan,
- ), patch.object(
- hass.config_entries.flow, "async_init", return_value=mock_coro()
- ) as mock_init:
- await scanner.async_scan(None)
+ "homeassistant.components.ssdp.descriptions.ElementTree.fromstring",
+ side_effect=ValueError,
+ ):
+ mock_init = await _async_run_mocked_scan(
+ hass, mock_ssdp_response, mock_get_ssdp
+ )
assert len(mock_init.mock_calls) == 0
assert "Failed to fetch ssdp data from: http://1.1.1.1" in caplog.text
+
+
+async def test_scan_with_registered_callback(hass, aioclient_mock, caplog):
+ """Test matching based on callback."""
+ aioclient_mock.get(
+ "http://1.1.1.1",
+ text="""
+
+
+ Paulus
+
+
+ """,
+ )
+ mock_ssdp_response = {
+ "st": "mock-st",
+ "location": "http://1.1.1.1",
+ "usn": "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
+ "server": "mock-server",
+ "ext": "",
+ }
+ not_matching_intergration_callbacks = []
+ intergration_callbacks = []
+ intergration_callbacks_from_cache = []
+ match_any_callbacks = []
+
+ @callback
+ def _async_exception_callbacks(info):
+ raise ValueError
+
+ @callback
+ def _async_intergration_callbacks(info):
+ intergration_callbacks.append(info)
+
+ @callback
+ def _async_intergration_callbacks_from_cache(info):
+ intergration_callbacks_from_cache.append(info)
+
+ @callback
+ def _async_not_matching_intergration_callbacks(info):
+ not_matching_intergration_callbacks.append(info)
+
+ @callback
+ def _async_match_any_callbacks(info):
+ match_any_callbacks.append(info)
+
+ def _generate_fake_ssdp_listener(*args, **kwargs):
+ listener = SSDPListener(*args, **kwargs)
+
+ async def _async_callback(*_):
+ await listener.async_callback(mock_ssdp_response)
+
+ @callback
+ def _callback(*_):
+ hass.async_create_task(listener.async_callback(mock_ssdp_response))
+
+ listener.async_start = _async_callback
+ listener.async_search = _callback
+ return listener
+
+ with patch(
+ "homeassistant.components.ssdp.SSDPListener",
+ new=_generate_fake_ssdp_listener,
+ ):
+ hass.state = CoreState.stopped
+ assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
+ await hass.async_block_till_done()
+ ssdp.async_register_callback(hass, _async_exception_callbacks, {})
+ ssdp.async_register_callback(
+ hass,
+ _async_intergration_callbacks,
+ {"st": "mock-st"},
+ )
+ ssdp.async_register_callback(
+ hass,
+ _async_not_matching_intergration_callbacks,
+ {"st": "not-match-mock-st"},
+ )
+ ssdp.async_register_callback(
+ hass,
+ _async_match_any_callbacks,
+ )
+ await hass.async_block_till_done()
+ async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
+ ssdp.async_register_callback(
+ hass,
+ _async_intergration_callbacks_from_cache,
+ {"st": "mock-st"},
+ )
+ await hass.async_block_till_done()
+ hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
+ hass.state = CoreState.running
+ await hass.async_block_till_done()
+ async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
+ await hass.async_block_till_done()
+ assert hass.state == CoreState.running
+
+ assert len(intergration_callbacks) == 3
+ assert len(intergration_callbacks_from_cache) == 3
+ assert len(match_any_callbacks) == 3
+ assert len(not_matching_intergration_callbacks) == 0
+ assert intergration_callbacks[0] == {
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
+ ssdp.ATTR_SSDP_EXT: "",
+ ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
+ ssdp.ATTR_SSDP_SERVER: "mock-server",
+ ssdp.ATTR_SSDP_ST: "mock-st",
+ ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
+ ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
+ }
+ assert "Failed to callback info" in caplog.text
+
+
+async def test_scan_second_hit(hass, aioclient_mock, caplog):
+ """Test matching on second scan."""
+ aioclient_mock.get(
+ "http://1.1.1.1",
+ text="""
+
+
+ Paulus
+
+
+ """,
+ )
+
+ mock_ssdp_response = CaseInsensitiveDict(
+ **{
+ "ST": "mock-st",
+ "LOCATION": "http://1.1.1.1",
+ "USN": "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
+ "SERVER": "mock-server",
+ "EXT": "",
+ }
+ )
+ mock_get_ssdp = {"mock-domain": [{"st": "mock-st"}]}
+ intergration_callbacks = []
+
+ @callback
+ def _async_intergration_callbacks(info):
+ intergration_callbacks.append(info)
+
+ def _generate_fake_ssdp_listener(*args, **kwargs):
+ listener = SSDPListener(*args, **kwargs)
+
+ async def _async_callback(*_):
+ pass
+
+ @callback
+ def _callback(*_):
+ hass.async_create_task(listener.async_callback(mock_ssdp_response))
+
+ listener.async_start = _async_callback
+ listener.async_search = _callback
+ return listener
+
+ with patch(
+ "homeassistant.components.ssdp.async_get_ssdp",
+ return_value=mock_get_ssdp,
+ ), patch(
+ "homeassistant.components.ssdp.SSDPListener",
+ new=_generate_fake_ssdp_listener,
+ ), patch.object(
+ hass.config_entries.flow, "async_init", return_value=mock_coro()
+ ) as mock_init:
+ assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
+ await hass.async_block_till_done()
+ remove = ssdp.async_register_callback(
+ hass,
+ _async_intergration_callbacks,
+ {"st": "mock-st"},
+ )
+ hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
+ await hass.async_block_till_done()
+ async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
+ await hass.async_block_till_done()
+ async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
+ await hass.async_block_till_done()
+ remove()
+ async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
+ await hass.async_block_till_done()
+
+ assert len(intergration_callbacks) == 2
+ assert intergration_callbacks[0] == {
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
+ ssdp.ATTR_SSDP_EXT: "",
+ ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
+ ssdp.ATTR_SSDP_SERVER: "mock-server",
+ ssdp.ATTR_SSDP_ST: "mock-st",
+ ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
+ ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
+ }
+ assert len(mock_init.mock_calls) == 1
+ assert mock_init.mock_calls[0][1][0] == "mock-domain"
+ assert mock_init.mock_calls[0][2]["context"] == {
+ "source": config_entries.SOURCE_SSDP
+ }
+ assert mock_init.mock_calls[0][2]["data"] == {
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
+ ssdp.ATTR_SSDP_ST: "mock-st",
+ ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
+ ssdp.ATTR_SSDP_SERVER: "mock-server",
+ ssdp.ATTR_SSDP_EXT: "",
+ ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
+ ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
+ }
+ assert "Failed to fetch ssdp data" not in caplog.text
+ udn_discovery_info = ssdp.async_get_discovery_info_by_st(hass, "mock-st")
+ discovery_info = udn_discovery_info[0]
+ assert discovery_info[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
+ assert discovery_info[ssdp.ATTR_SSDP_ST] == "mock-st"
+ assert (
+ discovery_info[ssdp.ATTR_UPNP_UDN]
+ == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
+ )
+ assert (
+ discovery_info[ssdp.ATTR_SSDP_USN]
+ == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
+ )
+
+ st_discovery_info = ssdp.async_get_discovery_info_by_udn(
+ hass, "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
+ )
+ discovery_info = st_discovery_info[0]
+ assert discovery_info[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
+ assert discovery_info[ssdp.ATTR_SSDP_ST] == "mock-st"
+ assert (
+ discovery_info[ssdp.ATTR_UPNP_UDN]
+ == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
+ )
+ assert (
+ discovery_info[ssdp.ATTR_SSDP_USN]
+ == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
+ )
+
+ discovery_info = ssdp.async_get_discovery_info_by_udn_st(
+ hass, "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", "mock-st"
+ )
+ assert discovery_info[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
+ assert discovery_info[ssdp.ATTR_SSDP_ST] == "mock-st"
+ assert (
+ discovery_info[ssdp.ATTR_UPNP_UDN]
+ == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
+ )
+ assert (
+ discovery_info[ssdp.ATTR_SSDP_USN]
+ == "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
+ )
+
+ assert ssdp.async_get_discovery_info_by_udn_st(hass, "wrong", "mock-st") is None
+
+
+_ADAPTERS_WITH_MANUAL_CONFIG = [
+ {
+ "auto": True,
+ "default": False,
+ "enabled": True,
+ "ipv4": [],
+ "ipv6": [
+ {
+ "address": "2001:db8::",
+ "network_prefix": 8,
+ "flowinfo": 1,
+ "scope_id": 1,
+ }
+ ],
+ "name": "eth0",
+ },
+ {
+ "auto": True,
+ "default": False,
+ "enabled": True,
+ "ipv4": [{"address": "192.168.1.5", "network_prefix": 23}],
+ "ipv6": [],
+ "name": "eth1",
+ },
+ {
+ "auto": False,
+ "default": False,
+ "enabled": False,
+ "ipv4": [{"address": "169.254.3.2", "network_prefix": 16}],
+ "ipv6": [],
+ "name": "vtun0",
+ },
+]
+
+
+async def test_async_detect_interfaces_setting_empty_route(hass):
+ """Test without default interface config and the route returns nothing."""
+ mock_get_ssdp = {
+ "mock-domain": [
+ {
+ ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
+ }
+ ]
+ }
+ create_args = []
+
+ def _generate_fake_ssdp_listener(*args, **kwargs):
+ create_args.append([args, kwargs])
+ listener = SSDPListener(*args, **kwargs)
+
+ async def _async_callback(*_):
+ pass
+
+ @callback
+ def _callback(*_):
+ pass
+
+ listener.async_start = _async_callback
+ listener.async_search = _callback
+ return listener
+
+ with patch(
+ "homeassistant.components.ssdp.async_get_ssdp",
+ return_value=mock_get_ssdp,
+ ), patch(
+ "homeassistant.components.ssdp.SSDPListener",
+ new=_generate_fake_ssdp_listener,
+ ), patch(
+ "homeassistant.components.ssdp.network.async_get_adapters",
+ return_value=_ADAPTERS_WITH_MANUAL_CONFIG,
+ ):
+ assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
+ await hass.async_block_till_done()
+ hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
+ await hass.async_block_till_done()
+
+ assert {create_args[0][1]["source_ip"], create_args[1][1]["source_ip"]} == {
+ IPv4Address("192.168.1.5"),
+ IPv6Address("2001:db8::"),
+ }
diff --git a/tests/test_requirements.py b/tests/test_requirements.py
index f68601e889e..26f3603910d 100644
--- a/tests/test_requirements.py
+++ b/tests/test_requirements.py
@@ -364,7 +364,11 @@ async def test_discovery_requirements_ssdp(hass):
assert len(mock_process.mock_calls) == 4
assert mock_process.mock_calls[0][1][2] == ssdp.requirements
# Ensure zeroconf is a dep for ssdp
- assert mock_process.mock_calls[1][1][1] == "zeroconf"
+ assert {
+ mock_process.mock_calls[1][1][1],
+ mock_process.mock_calls[2][1][1],
+ mock_process.mock_calls[3][1][1],
+ } == {"network", "zeroconf", "http"}
@pytest.mark.parametrize(