Use ssdp callbacks in upnp (#53840)

This commit is contained in:
Steven Looman 2021-08-13 18:13:25 +02:00 committed by GitHub
parent 3454102dc8
commit 2c1728022d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 531 additions and 444 deletions

View file

@ -2,7 +2,7 @@
"domain": "dlna_dmr",
"name": "DLNA Digital Media Renderer",
"documentation": "https://www.home-assistant.io/integrations/dlna_dmr",
"requirements": ["async-upnp-client==0.19.1"],
"requirements": ["async-upnp-client==0.19.2"],
"dependencies": ["network"],
"codeowners": [],
"iot_class": "local_push"

View file

@ -9,6 +9,7 @@ import logging
from typing import Any, Callable
from async_upnp_client.search import SSDPListener
from async_upnp_client.ssdp import SSDP_PORT
from async_upnp_client.utils import CaseInsensitiveDict
from homeassistant import config_entries
@ -228,6 +229,21 @@ class Scanner:
for listener in self._ssdp_listeners:
listener.async_search()
self.async_scan_broadcast()
@core_callback
def async_scan_broadcast(self, *_: Any) -> None:
"""Scan for new entries using broadcast target."""
# Some sonos devices only seem to respond if we send to the broadcast
# address. This matches pysonos' behavior
# https://github.com/amelchio/pysonos/blob/d4329b4abb657d106394ae69357805269708c996/pysonos/discovery.py#L120
for listener in self._ssdp_listeners:
try:
IPv4Address(listener.source_ip)
except ValueError:
continue
listener.async_search((str(IPV4_BROADCAST), SSDP_PORT))
async def async_start(self) -> None:
"""Start the scanner."""
self.description_manager = DescriptionManager(self.hass)
@ -238,20 +254,6 @@ class Scanner:
async_callback=self._async_process_entry, source_ip=source_ip
)
)
try:
IPv4Address(source_ip)
except ValueError:
continue
# Some sonos devices only seem to respond if we send to the broadcast
# address. This matches pysonos' behavior
# https://github.com/amelchio/pysonos/blob/d4329b4abb657d106394ae69357805269708c996/pysonos/discovery.py#L120
self._ssdp_listeners.append(
SSDPListener(
async_callback=self._async_process_entry,
source_ip=source_ip,
target_ip=IPV4_BROADCAST,
)
)
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stop)
self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STARTED, self.flow_dispatcher.async_start
@ -275,6 +277,10 @@ class Scanner:
self.hass, self.async_scan, SCAN_INTERVAL
)
# Trigger a broadcast-scan. Regular scan is implicitly triggered
# by SSDPListener.
self.async_scan_broadcast()
@core_callback
def _async_get_matching_callbacks(
self, headers: Mapping[str, str]

View file

@ -4,7 +4,7 @@
"documentation": "https://www.home-assistant.io/integrations/ssdp",
"requirements": [
"defusedxml==0.7.1",
"async-upnp-client==0.19.1"
"async-upnp-client==0.19.2"
],
"dependencies": ["network"],
"after_dependencies": ["zeroconf"],

View file

@ -1,6 +1,10 @@
"""Open ports in your router for Home Assistant and provide statistics."""
from __future__ import annotations
import asyncio
from collections.abc import Mapping
from ipaddress import ip_address
from typing import Any
import voluptuous as vol
@ -9,7 +13,7 @@ from homeassistant.components import ssdp
from homeassistant.components.network import async_get_source_ip
from homeassistant.components.network.const import PUBLIC_TARGET_IP
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv, device_registry as dr
from homeassistant.helpers.typing import ConfigType
@ -44,21 +48,6 @@ CONFIG_SCHEMA = vol.Schema(
)
async def async_construct_device(hass: HomeAssistant, udn: str, st: str) -> Device:
"""Discovery devices and construct a Device for one."""
# pylint: disable=invalid-name
_LOGGER.debug("Constructing device: %s::%s", udn, st)
discovery_info = ssdp.async_get_discovery_info_by_udn_st(hass, udn, st)
if not discovery_info:
_LOGGER.info("Device not discovered")
return None
return await Device.async_create_device(
hass, discovery_info[ssdp.ATTR_SSDP_LOCATION]
)
async def async_setup(hass: HomeAssistant, config: ConfigType):
"""Set up UPnP component."""
_LOGGER.debug("async_setup, config: %s", config)
@ -86,20 +75,47 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up UPnP/IGD device from a config entry."""
_LOGGER.debug("Setting up config entry: %s", entry.unique_id)
# Discover and construct.
udn = entry.data[CONFIG_ENTRY_UDN]
st = entry.data[CONFIG_ENTRY_ST] # pylint: disable=invalid-name
try:
device = await async_construct_device(hass, udn, st)
except asyncio.TimeoutError as err:
raise ConfigEntryNotReady from err
usn = f"{udn}::{st}"
if not device:
_LOGGER.info("Unable to create UPnP/IGD, aborting")
raise ConfigEntryNotReady
# Register device discovered-callback.
device_discovered_event = asyncio.Event()
discovery_info: Mapping[str, Any] | None = None
@callback
def device_discovered(info: Mapping[str, Any]) -> None:
nonlocal discovery_info
_LOGGER.debug(
"Device discovered: %s, at: %s", usn, info[ssdp.ATTR_SSDP_LOCATION]
)
discovery_info = info
device_discovered_event.set()
cancel_discovered_callback = ssdp.async_register_callback(
hass,
device_discovered,
{
"usn": usn,
},
)
try:
await asyncio.wait_for(device_discovered_event.wait(), timeout=10)
except asyncio.TimeoutError as err:
_LOGGER.debug("Device not discovered: %s", usn)
raise ConfigEntryNotReady from err
finally:
cancel_discovered_callback()
# Create device.
location = discovery_info[ # pylint: disable=unsubscriptable-object
ssdp.ATTR_SSDP_LOCATION
]
device = await Device.async_create_device(hass, location)
# Save device.
hass.data[DOMAIN][DOMAIN_DEVICES][device.udn] = device
hass.data[DOMAIN][DOMAIN_DEVICES][udn] = device
# Ensure entry has a unique_id.
if not entry.unique_id:

View file

@ -1,6 +1,7 @@
"""Config flow for UPNP."""
from __future__ import annotations
import asyncio
from collections.abc import Mapping
from datetime import timedelta
from typing import Any
@ -10,7 +11,7 @@ import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.const import CONF_SCAN_INTERVAL
from homeassistant.core import callback
from homeassistant.core import HomeAssistant, callback
from .const import (
CONFIG_ENTRY_HOSTNAME,
@ -18,18 +19,70 @@ from .const import (
CONFIG_ENTRY_ST,
CONFIG_ENTRY_UDN,
DEFAULT_SCAN_INTERVAL,
DISCOVERY_HOSTNAME,
DISCOVERY_LOCATION,
DISCOVERY_NAME,
DISCOVERY_ST,
DISCOVERY_UDN,
DISCOVERY_UNIQUE_ID,
DISCOVERY_USN,
DOMAIN,
DOMAIN_DEVICES,
LOGGER as _LOGGER,
SSDP_SEARCH_TIMEOUT,
ST_IGD_V1,
ST_IGD_V2,
)
from .device import Device, discovery_info_to_discovery
def _friendly_name_from_discovery(discovery_info: Mapping[str, Any]) -> str:
"""Extract user-friendly name from discovery."""
return (
discovery_info.get("friendlyName")
or discovery_info.get("modeName")
or discovery_info.get("_host", "")
)
async def _async_wait_for_discoveries(hass: HomeAssistant) -> bool:
"""Wait for a device to be discovered."""
device_discovered_event = asyncio.Event()
@callback
def device_discovered(info: Mapping[str, Any]) -> None:
_LOGGER.info(
"Device discovered: %s, at: %s",
info[ssdp.ATTR_SSDP_USN],
info[ssdp.ATTR_SSDP_LOCATION],
)
device_discovered_event.set()
cancel_discovered_callback_1 = ssdp.async_register_callback(
hass,
device_discovered,
{
ssdp.ATTR_SSDP_ST: ST_IGD_V1,
},
)
cancel_discovered_callback_2 = ssdp.async_register_callback(
hass,
device_discovered,
{
ssdp.ATTR_SSDP_ST: ST_IGD_V2,
},
)
try:
await asyncio.wait_for(
device_discovered_event.wait(), timeout=SSDP_SEARCH_TIMEOUT
)
except asyncio.TimeoutError:
return False
finally:
cancel_discovered_callback_1()
cancel_discovered_callback_2()
return True
def _discovery_igd_devices(hass: HomeAssistant) -> list[Mapping[str, Any]]:
"""Discovery IGD devices."""
return ssdp.async_get_discovery_info_by_st(
hass, ST_IGD_V1
) + ssdp.async_get_discovery_info_by_st(hass, ST_IGD_V2)
class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
@ -57,22 +110,19 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
matching_discoveries = [
discovery
for discovery in self._discoveries
if discovery[DISCOVERY_UNIQUE_ID] == user_input["unique_id"]
if discovery[ssdp.ATTR_SSDP_USN] == user_input["unique_id"]
]
if not matching_discoveries:
return self.async_abort(reason="no_devices_found")
discovery = matching_discoveries[0]
await self.async_set_unique_id(
discovery[DISCOVERY_UNIQUE_ID], raise_on_progress=False
discovery[ssdp.ATTR_SSDP_USN], raise_on_progress=False
)
return await self._async_create_entry_from_discovery(discovery)
# Discover devices.
discoveries = [
await Device.async_supplement_discovery(self.hass, discovery)
for discovery in await Device.async_discover(self.hass)
]
discoveries = _discovery_igd_devices(self.hass)
# Store discoveries which have not been configured.
current_unique_ids = {
@ -81,7 +131,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
self._discoveries = [
discovery
for discovery in discoveries
if discovery[DISCOVERY_UNIQUE_ID] not in current_unique_ids
if discovery[ssdp.ATTR_SSDP_USN] not in current_unique_ids
]
# Ensure anything to add.
@ -92,7 +142,9 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
{
vol.Required("unique_id"): vol.In(
{
discovery[DISCOVERY_UNIQUE_ID]: discovery[DISCOVERY_NAME]
discovery[ssdp.ATTR_SSDP_USN]: _friendly_name_from_discovery(
discovery
)
for discovery in self._discoveries
}
),
@ -119,27 +171,27 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return self.async_abort(reason="already_configured")
# Discover devices.
self._discoveries = await Device.async_discover(self.hass)
await _async_wait_for_discoveries(self.hass)
discoveries = _discovery_igd_devices(self.hass)
# Ensure anything to add. If not, silently abort.
if not self._discoveries:
if not discoveries:
_LOGGER.info("No UPnP devices discovered, aborting")
return self.async_abort(reason="no_devices_found")
# Ensure complete discovery.
discovery = self._discoveries[0]
discovery = discoveries[0]
if (
DISCOVERY_UDN not in discovery
or DISCOVERY_ST not in discovery
or DISCOVERY_LOCATION not in discovery
or DISCOVERY_USN not in discovery
ssdp.ATTR_UPNP_UDN not in discovery
or ssdp.ATTR_SSDP_ST not in discovery
or ssdp.ATTR_SSDP_LOCATION not in discovery
or ssdp.ATTR_SSDP_USN not in discovery
):
_LOGGER.debug("Incomplete discovery, ignoring")
return self.async_abort(reason="incomplete_discovery")
# Ensure not already configuring/configured.
discovery = await Device.async_supplement_discovery(self.hass, discovery)
unique_id = discovery[DISCOVERY_UNIQUE_ID]
unique_id = discovery[ssdp.ATTR_SSDP_USN]
await self.async_set_unique_id(unique_id)
return await self._async_create_entry_from_discovery(discovery)
@ -162,35 +214,28 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
_LOGGER.debug("Incomplete discovery, ignoring")
return self.async_abort(reason="incomplete_discovery")
# Convert to something we understand/speak.
discovery = discovery_info_to_discovery(discovery_info)
# Ensure not already configuring/configured.
unique_id = discovery[DISCOVERY_USN]
unique_id = discovery_info[ssdp.ATTR_SSDP_USN]
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured(
updates={CONFIG_ENTRY_HOSTNAME: discovery[DISCOVERY_HOSTNAME]}
)
hostname = discovery_info["_host"]
self._abort_if_unique_id_configured(updates={CONFIG_ENTRY_HOSTNAME: hostname})
# Handle devices changing their UDN, only allow a single
# Handle devices changing their UDN, only allow a single host.
existing_entries = self._async_current_entries()
for config_entry in existing_entries:
entry_hostname = config_entry.data.get(CONFIG_ENTRY_HOSTNAME)
if entry_hostname == discovery[DISCOVERY_HOSTNAME]:
if entry_hostname == hostname:
_LOGGER.debug(
"Found existing config_entry with same hostname, discovery ignored"
)
return self.async_abort(reason="discovery_ignored")
# Get more data about the device.
discovery = await Device.async_supplement_discovery(self.hass, discovery)
# Store discovery.
self._discoveries = [discovery]
self._discoveries = [discovery_info]
# Ensure user recognizable.
self.context["title_placeholders"] = {
"name": discovery[DISCOVERY_NAME],
"name": _friendly_name_from_discovery(discovery_info),
}
return await self.async_step_ssdp_confirm()
@ -224,11 +269,11 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
discovery,
)
title = discovery.get(DISCOVERY_NAME, "")
title = _friendly_name_from_discovery(discovery)
data = {
CONFIG_ENTRY_UDN: discovery[DISCOVERY_UDN],
CONFIG_ENTRY_ST: discovery[DISCOVERY_ST],
CONFIG_ENTRY_HOSTNAME: discovery[DISCOVERY_HOSTNAME],
CONFIG_ENTRY_UDN: discovery["_udn"],
CONFIG_ENTRY_ST: discovery[ssdp.ATTR_SSDP_ST],
CONFIG_ENTRY_HOSTNAME: discovery["_host"],
}
return self.async_create_entry(title=title, data=data)

View file

@ -20,15 +20,11 @@ DATA_PACKETS = "packets"
DATA_RATE_PACKETS_PER_SECOND = f"{DATA_PACKETS}/{TIME_SECONDS}"
KIBIBYTE = 1024
UPDATE_INTERVAL = timedelta(seconds=30)
DISCOVERY_HOSTNAME = "hostname"
DISCOVERY_LOCATION = "location"
DISCOVERY_NAME = "name"
DISCOVERY_ST = "st"
DISCOVERY_UDN = "udn"
DISCOVERY_UNIQUE_ID = "unique_id"
DISCOVERY_USN = "usn"
CONFIG_ENTRY_SCAN_INTERVAL = "scan_interval"
CONFIG_ENTRY_ST = "st"
CONFIG_ENTRY_UDN = "udn"
CONFIG_ENTRY_HOSTNAME = "hostname"
DEFAULT_SCAN_INTERVAL = timedelta(seconds=30).total_seconds()
ST_IGD_V1 = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
ST_IGD_V2 = "urn:schemas-upnp-org:device:InternetGatewayDevice:2"
SSDP_SEARCH_TIMEOUT = 4

View file

@ -12,7 +12,6 @@ from async_upnp_client.aiohttp import AiohttpSessionRequester
from async_upnp_client.device_updater import DeviceUpdater
from async_upnp_client.profiles.igd import IgdDevice
from homeassistant.components import ssdp
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
@ -22,13 +21,6 @@ from .const import (
BYTES_RECEIVED,
BYTES_SENT,
CONF_LOCAL_IP,
DISCOVERY_HOSTNAME,
DISCOVERY_LOCATION,
DISCOVERY_NAME,
DISCOVERY_ST,
DISCOVERY_UDN,
DISCOVERY_UNIQUE_ID,
DISCOVERY_USN,
DOMAIN,
DOMAIN_CONFIG,
LOGGER as _LOGGER,
@ -38,20 +30,6 @@ from .const import (
)
def discovery_info_to_discovery(discovery_info: Mapping) -> Mapping:
"""Convert a SSDP-discovery to 'our' discovery."""
location = discovery_info[ssdp.ATTR_SSDP_LOCATION]
parsed = urlparse(location)
hostname = parsed.hostname
return {
DISCOVERY_UDN: discovery_info[ssdp.ATTR_UPNP_UDN],
DISCOVERY_ST: discovery_info[ssdp.ATTR_SSDP_ST],
DISCOVERY_LOCATION: discovery_info[ssdp.ATTR_SSDP_LOCATION],
DISCOVERY_USN: discovery_info[ssdp.ATTR_SSDP_USN],
DISCOVERY_HOSTNAME: hostname,
}
def _get_local_ip(hass: HomeAssistant) -> IPv4Address | None:
"""Get the configured local ip."""
if DOMAIN in hass.data and DOMAIN_CONFIG in hass.data[DOMAIN]:
@ -70,29 +48,6 @@ class Device:
self._device_updater = device_updater
self.coordinator: DataUpdateCoordinator = None
@classmethod
async def async_discover(cls, hass: HomeAssistant) -> list[Mapping]:
"""Discover UPnP/IGD devices."""
_LOGGER.debug("Discovering UPnP/IGD devices")
discoveries = []
for ssdp_st in IgdDevice.DEVICE_TYPES:
for discovery_info in ssdp.async_get_discovery_info_by_st(hass, ssdp_st):
discoveries.append(discovery_info_to_discovery(discovery_info))
return discoveries
@classmethod
async def async_supplement_discovery(
cls, hass: HomeAssistant, discovery: Mapping
) -> Mapping:
"""Get additional data from device and supplement discovery."""
location = discovery[DISCOVERY_LOCATION]
device = await Device.async_create_device(hass, location)
discovery[DISCOVERY_NAME] = device.name
discovery[DISCOVERY_HOSTNAME] = device.hostname
discovery[DISCOVERY_UNIQUE_ID] = discovery[DISCOVERY_USN]
return discovery
@classmethod
async def async_create_device(
cls, hass: HomeAssistant, ssdp_location: str

View file

@ -3,7 +3,7 @@
"name": "UPnP/IGD",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/upnp",
"requirements": ["async-upnp-client==0.19.1"],
"requirements": ["async-upnp-client==0.19.2"],
"dependencies": ["network", "ssdp"],
"codeowners": ["@StevenLooman"],
"ssdp": [

View file

@ -4,7 +4,7 @@ aiodiscover==1.4.2
aiohttp==3.7.4.post0
aiohttp_cors==0.7.0
astral==2.2
async-upnp-client==0.19.1
async-upnp-client==0.19.2
async_timeout==3.0.1
attrs==21.2.0
awesomeversion==21.4.0

View file

@ -311,7 +311,7 @@ asterisk_mbox==0.5.0
# homeassistant.components.dlna_dmr
# homeassistant.components.ssdp
# homeassistant.components.upnp
async-upnp-client==0.19.1
async-upnp-client==0.19.2
# homeassistant.components.supla
asyncpysupla==0.0.5

View file

@ -202,7 +202,7 @@ arcam-fmj==0.7.0
# homeassistant.components.dlna_dmr
# homeassistant.components.ssdp
# homeassistant.components.upnp
async-upnp-client==0.19.1
async-upnp-client==0.19.2
# homeassistant.components.aurora
auroranoaa==0.0.2

View file

@ -29,7 +29,13 @@ def _patched_ssdp_listener(info, *args, **kwargs):
async def _async_callback(*_):
await listener.async_callback(info)
@callback
def _async_search(*_):
# Prevent an actual scan.
pass
listener.async_start = _async_callback
listener.async_search = _async_search
return listener
@ -287,7 +293,10 @@ async def test_invalid_characters(hass, aioclient_mock):
@patch("homeassistant.components.ssdp.SSDPListener.async_start")
@patch("homeassistant.components.ssdp.SSDPListener.async_search")
async def test_start_stop_scanner(async_start_mock, async_search_mock, hass):
@patch("homeassistant.components.ssdp.SSDPListener.async_stop")
async def test_start_stop_scanner(
async_stop_mock, async_search_mock, async_start_mock, hass
):
"""Test we start and stop the scanner."""
assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
@ -295,15 +304,18 @@ async def test_start_stop_scanner(async_start_mock, async_search_mock, hass):
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
assert async_start_mock.call_count == 2
assert async_search_mock.call_count == 2
assert async_start_mock.call_count == 1
# Next is 3, as async_upnp_client triggers 1 SSDPListener._async_on_connect
assert async_search_mock.call_count == 3
assert async_stop_mock.call_count == 0
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
assert async_start_mock.call_count == 2
assert async_search_mock.call_count == 2
assert async_start_mock.call_count == 1
assert async_search_mock.call_count == 3
assert async_stop_mock.call_count == 1
async def test_unexpected_exception_while_fetching(hass, aioclient_mock, caplog):
@ -787,7 +799,6 @@ async def test_async_detect_interfaces_setting_empty_route(hass):
assert argset == {
(IPv6Address("2001:db8::"), None),
(IPv4Address("192.168.1.5"), IPv4Address("255.255.255.255")),
(IPv4Address("192.168.1.5"), None),
}
@ -802,12 +813,12 @@ async def test_bind_failure_skips_adapter(hass, caplog):
]
}
create_args = []
did_search = 0
search_args = []
@callback
def _callback(*_):
nonlocal did_search
did_search += 1
def _callback(*args):
nonlocal search_args
search_args.append(args)
pass
def _generate_failing_ssdp_listener(*args, **kwargs):
@ -844,11 +855,74 @@ async def test_bind_failure_skips_adapter(hass, caplog):
assert argset == {
(IPv6Address("2001:db8::"), None),
(IPv4Address("192.168.1.5"), IPv4Address("255.255.255.255")),
(IPv4Address("192.168.1.5"), None),
}
assert "Failed to setup listener for" in caplog.text
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
assert did_search == 2
assert set(search_args) == {
(),
(
(
"255.255.255.255",
1900,
),
),
}
async def test_ipv4_does_additional_search_for_sonos(hass, caplog):
"""Test that only ipv4 does an additional search for Sonos."""
mock_get_ssdp = {
"mock-domain": [
{
ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
}
]
}
search_args = []
def _generate_fake_ssdp_listener(*args, **kwargs):
listener = SSDPListener(*args, **kwargs)
async def _async_callback(*_):
pass
@callback
def _callback(*args):
nonlocal search_args
search_args.append(args)
pass
listener.async_start = _async_callback
listener.async_search = _callback
return listener
with patch(
"homeassistant.components.ssdp.async_get_ssdp",
return_value=mock_get_ssdp,
), patch(
"homeassistant.components.ssdp.SSDPListener",
new=_generate_fake_ssdp_listener,
), patch(
"homeassistant.components.ssdp.network.async_get_adapters",
return_value=_ADAPTERS_WITH_MANUAL_CONFIG,
):
assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
await hass.async_block_till_done()
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
assert set(search_args) == {
(),
(
(
"255.255.255.255",
1900,
),
),
}

View file

@ -0,0 +1,23 @@
"""Common for upnp."""
from urllib.parse import urlparse
from homeassistant.components import ssdp
TEST_UDN = "uuid:device"
TEST_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
TEST_USN = f"{TEST_UDN}::{TEST_ST}"
TEST_LOCATION = "http://192.168.1.1/desc.xml"
TEST_HOSTNAME = urlparse(TEST_LOCATION).hostname
TEST_FRIENDLY_NAME = "friendly name"
TEST_DISCOVERY = {
ssdp.ATTR_SSDP_LOCATION: TEST_LOCATION,
ssdp.ATTR_SSDP_ST: TEST_ST,
ssdp.ATTR_SSDP_USN: TEST_USN,
ssdp.ATTR_UPNP_UDN: TEST_UDN,
"usn": TEST_USN,
"location": TEST_LOCATION,
"_host": TEST_HOSTNAME,
"_udn": TEST_UDN,
"friendlyName": TEST_FRIENDLY_NAME,
}

View file

@ -0,0 +1,49 @@
"""Mock ssdp.Scanner."""
from __future__ import annotations
from typing import Any
from unittest.mock import patch
import pytest
from homeassistant.components import ssdp
from homeassistant.core import callback
class MockSsdpDescriptionManager(ssdp.DescriptionManager):
"""Mocked ssdp DescriptionManager."""
async def fetch_description(
self, xml_location: str | None
) -> None | dict[str, str]:
"""Fetch the location or get it from the cache."""
if xml_location is None:
return None
return {}
class MockSsdpScanner(ssdp.Scanner):
"""Mocked ssdp Scanner."""
@callback
def async_stop(self, *_: Any) -> None:
"""Stop the scanner."""
# Do nothing.
async def async_start(self) -> None:
"""Start the scanner."""
self.description_manager = MockSsdpDescriptionManager(self.hass)
@callback
def async_scan(self, *_: Any) -> None:
"""Scan for new entries."""
# Do nothing.
@pytest.fixture
def mock_ssdp_scanner():
"""Mock ssdp Scanner."""
with patch(
"homeassistant.components.ssdp.Scanner", new=MockSsdpScanner
) as mock_ssdp_scanner:
yield mock_ssdp_scanner

View file

@ -1,7 +1,9 @@
"""Mock device for testing purposes."""
from typing import Any, Mapping
from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, patch
import pytest
from homeassistant.components.upnp.const import (
BYTES_RECEIVED,
@ -13,6 +15,8 @@ from homeassistant.components.upnp.const import (
from homeassistant.components.upnp.device import Device
from homeassistant.util import dt
from .common import TEST_UDN
class MockDevice(Device):
"""Mock device for Device."""
@ -28,7 +32,7 @@ class MockDevice(Device):
@classmethod
async def async_create_device(cls, hass, ssdp_location) -> "MockDevice":
"""Return self."""
return cls("UDN")
return cls(TEST_UDN)
@property
def udn(self) -> str:
@ -70,3 +74,18 @@ class MockDevice(Device):
PACKETS_RECEIVED: 0,
PACKETS_SENT: 0,
}
async def async_start(self) -> None:
"""Start the device updater."""
async def async_stop(self) -> None:
"""Stop the device updater."""
@pytest.fixture
def mock_upnp_device():
"""Mock upnp Device.async_create_device."""
with patch(
"homeassistant.components.upnp.Device", new=MockDevice
) as mock_async_create_device:
yield mock_async_create_device

View file

@ -1,8 +1,9 @@
"""Test UPnP/IGD config flow."""
from datetime import timedelta
from unittest.mock import AsyncMock, Mock, patch
from urllib.parse import urlparse
from unittest.mock import patch
import pytest
from homeassistant import config_entries, data_entry_flow
from homeassistant.components import ssdp
@ -12,119 +13,92 @@ from homeassistant.components.upnp.const import (
CONFIG_ENTRY_ST,
CONFIG_ENTRY_UDN,
DEFAULT_SCAN_INTERVAL,
DISCOVERY_HOSTNAME,
DISCOVERY_LOCATION,
DISCOVERY_NAME,
DISCOVERY_ST,
DISCOVERY_UDN,
DISCOVERY_UNIQUE_ID,
DISCOVERY_USN,
DOMAIN,
DOMAIN_DEVICES,
)
from homeassistant.components.upnp.device import Device
from homeassistant.core import HomeAssistant
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.setup import async_setup_component
from homeassistant.util import dt
from .mock_device import MockDevice
from .common import (
TEST_DISCOVERY,
TEST_FRIENDLY_NAME,
TEST_HOSTNAME,
TEST_LOCATION,
TEST_ST,
TEST_UDN,
TEST_USN,
)
from .mock_ssdp_scanner import mock_ssdp_scanner # noqa: F401
from .mock_upnp_device import mock_upnp_device # noqa: F401
from tests.common import MockConfigEntry, async_fire_time_changed
async def test_flow_ssdp_discovery(hass: HomeAssistant):
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device")
async def test_flow_ssdp_discovery(
hass: HomeAssistant,
):
"""Test config flow: discovered + configured through ssdp."""
udn = "uuid:device_1"
location = "http://dummy"
mock_device = MockDevice(udn)
ssdp_discoveries = [
{
ssdp.ATTR_SSDP_LOCATION: location,
ssdp.ATTR_SSDP_ST: mock_device.device_type,
ssdp.ATTR_UPNP_UDN: mock_device.udn,
ssdp.ATTR_SSDP_USN: mock_device.usn,
}
]
discoveries = [
{
DISCOVERY_LOCATION: location,
DISCOVERY_NAME: mock_device.name,
DISCOVERY_ST: mock_device.device_type,
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_UNIQUE_ID: mock_device.unique_id,
DISCOVERY_USN: mock_device.usn,
DISCOVERY_HOSTNAME: mock_device.hostname,
}
]
with patch.object(
Device, "async_create_device", AsyncMock(return_value=mock_device)
), patch.object(
ssdp, "async_get_discovery_info_by_st", Mock(return_value=ssdp_discoveries)
), patch.object(
Device, "async_supplement_discovery", AsyncMock(return_value=discoveries[0])
):
# Discovered via step ssdp.
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data={
ssdp.ATTR_SSDP_LOCATION: location,
ssdp.ATTR_SSDP_ST: mock_device.device_type,
ssdp.ATTR_SSDP_USN: mock_device.usn,
ssdp.ATTR_UPNP_UDN: mock_device.udn,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "ssdp_confirm"
# Confirm via step ssdp_confirm.
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == mock_device.name
assert result["data"] == {
CONFIG_ENTRY_ST: mock_device.device_type,
CONFIG_ENTRY_UDN: mock_device.udn,
CONFIG_ENTRY_HOSTNAME: mock_device.hostname,
}
async def test_flow_ssdp_incomplete_discovery(hass: HomeAssistant):
"""Test config flow: incomplete discovery through ssdp."""
udn = "uuid:device_1"
location = "http://dummy"
mock_device = MockDevice(udn)
# Ensure we have a ssdp Scanner.
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
# Speed up callback in ssdp.async_register_callback.
hass.state = CoreState.not_running
# Discovered via step ssdp.
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=TEST_DISCOVERY,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "ssdp_confirm"
# Confirm via step ssdp_confirm.
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == TEST_FRIENDLY_NAME
assert result["data"] == {
CONFIG_ENTRY_ST: TEST_ST,
CONFIG_ENTRY_UDN: TEST_UDN,
CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME,
}
@pytest.mark.usefixtures("mock_ssdp_scanner")
async def test_flow_ssdp_incomplete_discovery(hass: HomeAssistant):
"""Test config flow: incomplete discovery through ssdp."""
# Discovered via step ssdp.
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data={
ssdp.ATTR_SSDP_LOCATION: location,
ssdp.ATTR_SSDP_ST: mock_device.device_type,
ssdp.ATTR_SSDP_USN: mock_device.usn,
# ssdp.ATTR_UPNP_UDN: mock_device.udn, # Not provided.
ssdp.ATTR_SSDP_LOCATION: TEST_LOCATION,
ssdp.ATTR_SSDP_ST: TEST_ST,
ssdp.ATTR_SSDP_USN: TEST_USN,
# ssdp.ATTR_UPNP_UDN: TEST_UDN, # Not provided.
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "incomplete_discovery"
@pytest.mark.usefixtures("mock_ssdp_scanner")
async def test_flow_ssdp_discovery_ignored(hass: HomeAssistant):
"""Test config flow: discovery through ssdp, but ignored, as hostname is used by existing config entry."""
udn = "uuid:device_random_1"
location = "http://dummy"
mock_device = MockDevice(udn)
# Existing entry.
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONFIG_ENTRY_UDN: "uuid:device_random_2",
CONFIG_ENTRY_ST: mock_device.device_type,
CONFIG_ENTRY_HOSTNAME: urlparse(location).hostname,
CONFIG_ENTRY_UDN: TEST_UDN + "2",
CONFIG_ENTRY_ST: TEST_ST,
CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME,
},
options={CONFIG_ENTRY_SCAN_INTERVAL: DEFAULT_SCAN_INTERVAL},
)
@ -134,129 +108,78 @@ async def test_flow_ssdp_discovery_ignored(hass: HomeAssistant):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data={
ssdp.ATTR_SSDP_LOCATION: location,
ssdp.ATTR_SSDP_ST: mock_device.device_type,
ssdp.ATTR_SSDP_USN: mock_device.usn,
ssdp.ATTR_UPNP_UDN: mock_device.udn,
},
data=TEST_DISCOVERY,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "discovery_ignored"
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device")
async def test_flow_user(hass: HomeAssistant):
"""Test config flow: discovered + configured through user."""
udn = "uuid:device_1"
location = "http://dummy"
mock_device = MockDevice(udn)
ssdp_discoveries = [
{
ssdp.ATTR_SSDP_LOCATION: location,
ssdp.ATTR_SSDP_ST: mock_device.device_type,
ssdp.ATTR_UPNP_UDN: mock_device.udn,
ssdp.ATTR_SSDP_USN: mock_device.usn,
}
]
discoveries = [
{
DISCOVERY_LOCATION: location,
DISCOVERY_NAME: mock_device.name,
DISCOVERY_ST: mock_device.device_type,
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_UNIQUE_ID: mock_device.unique_id,
DISCOVERY_USN: mock_device.usn,
DISCOVERY_HOSTNAME: mock_device.hostname,
}
]
# Ensure we have a ssdp Scanner.
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
# Speed up callback in ssdp.async_register_callback.
hass.state = CoreState.not_running
with patch.object(
Device, "async_create_device", AsyncMock(return_value=mock_device)
), patch.object(
ssdp, "async_get_discovery_info_by_st", Mock(return_value=ssdp_discoveries)
), patch.object(
Device, "async_supplement_discovery", AsyncMock(return_value=discoveries[0])
):
# Discovered via step user.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
# Discovered via step user.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
# Confirmed via step user.
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={"unique_id": mock_device.unique_id},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == mock_device.name
assert result["data"] == {
CONFIG_ENTRY_ST: mock_device.device_type,
CONFIG_ENTRY_UDN: mock_device.udn,
CONFIG_ENTRY_HOSTNAME: mock_device.hostname,
}
# Confirmed via step user.
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={"unique_id": TEST_USN},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == TEST_FRIENDLY_NAME
assert result["data"] == {
CONFIG_ENTRY_ST: TEST_ST,
CONFIG_ENTRY_UDN: TEST_UDN,
CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME,
}
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device")
async def test_flow_import(hass: HomeAssistant):
"""Test config flow: discovered + configured through configuration.yaml."""
udn = "uuid:device_1"
mock_device = MockDevice(udn)
location = "http://dummy"
ssdp_discoveries = [
{
ssdp.ATTR_SSDP_LOCATION: location,
ssdp.ATTR_SSDP_ST: mock_device.device_type,
ssdp.ATTR_UPNP_UDN: mock_device.udn,
ssdp.ATTR_SSDP_USN: mock_device.usn,
}
]
discoveries = [
{
DISCOVERY_LOCATION: location,
DISCOVERY_NAME: mock_device.name,
DISCOVERY_ST: mock_device.device_type,
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_UNIQUE_ID: mock_device.unique_id,
DISCOVERY_USN: mock_device.usn,
DISCOVERY_HOSTNAME: mock_device.hostname,
}
]
"""Test config flow: configured through configuration.yaml."""
# Ensure we have a ssdp Scanner.
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
# Speed up callback in ssdp.async_register_callback.
hass.state = CoreState.not_running
with patch.object(
Device, "async_create_device", AsyncMock(return_value=mock_device)
), patch.object(
ssdp, "async_get_discovery_info_by_st", Mock(return_value=ssdp_discoveries)
), patch.object(
Device, "async_supplement_discovery", AsyncMock(return_value=discoveries[0])
):
# Discovered via step import.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == mock_device.name
assert result["data"] == {
CONFIG_ENTRY_ST: mock_device.device_type,
CONFIG_ENTRY_UDN: mock_device.udn,
CONFIG_ENTRY_HOSTNAME: mock_device.hostname,
}
# Discovered via step import.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == TEST_FRIENDLY_NAME
assert result["data"] == {
CONFIG_ENTRY_ST: TEST_ST,
CONFIG_ENTRY_UDN: TEST_UDN,
CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME,
}
@pytest.mark.usefixtures("mock_ssdp_scanner")
async def test_flow_import_already_configured(hass: HomeAssistant):
"""Test config flow: discovered, but already configured."""
udn = "uuid:device_1"
mock_device = MockDevice(udn)
"""Test config flow: configured through configuration.yaml, but existing config entry."""
# Existing entry.
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONFIG_ENTRY_UDN: mock_device.udn,
CONFIG_ENTRY_ST: mock_device.device_type,
CONFIG_ENTRY_HOSTNAME: mock_device.hostname,
CONFIG_ENTRY_UDN: TEST_UDN,
CONFIG_ENTRY_ST: TEST_ST,
CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME,
},
options={CONFIG_ENTRY_SCAN_INTERVAL: DEFAULT_SCAN_INTERVAL},
)
@ -271,94 +194,88 @@ async def test_flow_import_already_configured(hass: HomeAssistant):
assert result["reason"] == "already_configured"
@pytest.mark.usefixtures("mock_ssdp_scanner")
async def test_flow_import_no_devices_found(hass: HomeAssistant):
"""Test config flow: no devices found, configured through configuration.yaml."""
ssdp_discoveries = []
with patch.object(
ssdp, "async_get_discovery_info_by_st", Mock(return_value=ssdp_discoveries)
# Ensure we have a ssdp Scanner.
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
ssdp_scanner.cache.clear()
# Discovered via step import.
with patch(
"homeassistant.components.upnp.config_flow.SSDP_SEARCH_TIMEOUT", new=0.0
):
# Discovered via step import.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "no_devices_found"
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device")
async def test_options_flow(hass: HomeAssistant):
"""Test options flow."""
# Ensure we have a ssdp Scanner.
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
# Speed up callback in ssdp.async_register_callback.
hass.state = CoreState.not_running
# Set up config entry.
udn = "uuid:device_1"
location = "http://192.168.1.1/desc.xml"
mock_device = MockDevice(udn)
ssdp_discoveries = [
{
ssdp.ATTR_SSDP_LOCATION: location,
ssdp.ATTR_SSDP_ST: mock_device.device_type,
ssdp.ATTR_UPNP_UDN: mock_device.udn,
ssdp.ATTR_SSDP_USN: mock_device.usn,
}
]
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONFIG_ENTRY_UDN: mock_device.udn,
CONFIG_ENTRY_ST: mock_device.device_type,
CONFIG_ENTRY_HOSTNAME: mock_device.hostname,
CONFIG_ENTRY_UDN: TEST_UDN,
CONFIG_ENTRY_ST: TEST_ST,
CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME,
},
options={CONFIG_ENTRY_SCAN_INTERVAL: DEFAULT_SCAN_INTERVAL},
)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id) is True
await hass.async_block_till_done()
mock_device = hass.data[DOMAIN][DOMAIN_DEVICES][TEST_UDN]
config = {
# no upnp, ensures no import-flow is started.
# Reset.
mock_device.times_polled = 0
# Forward time, ensure single poll after 30 (default) seconds.
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=31))
await hass.async_block_till_done()
assert mock_device.times_polled == 1
# Options flow with no input results in form.
result = await hass.config_entries.options.async_init(
config_entry.entry_id,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
# Options flow with input results in update to entry.
result2 = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={CONFIG_ENTRY_SCAN_INTERVAL: 60},
)
assert result2["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert config_entry.options == {
CONFIG_ENTRY_SCAN_INTERVAL: 60,
}
with patch.object(
Device, "async_create_device", AsyncMock(return_value=mock_device)
), patch.object(
ssdp,
"async_get_discovery_info_by_udn_st",
Mock(return_value=ssdp_discoveries[0]),
):
# Initialisation of component.
await async_setup_component(hass, "upnp", config)
await hass.async_block_till_done()
mock_device.times_polled = 0 # Reset.
# Forward time, ensure single poll after 30 (default) seconds.
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=31))
await hass.async_block_till_done()
assert mock_device.times_polled == 1
# Forward time, ensure single poll after 60 seconds, still from original setting.
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=61))
await hass.async_block_till_done()
assert mock_device.times_polled == 2
# Options flow with no input results in form.
result = await hass.config_entries.options.async_init(
config_entry.entry_id,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
# Now the updated interval takes effect.
# Forward time, ensure single poll after 120 seconds.
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=121))
await hass.async_block_till_done()
assert mock_device.times_polled == 3
# Options flow with input results in update to entry.
result2 = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={CONFIG_ENTRY_SCAN_INTERVAL: 60},
)
assert result2["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert config_entry.options == {
CONFIG_ENTRY_SCAN_INTERVAL: 60,
}
# Forward time, ensure single poll after 60 seconds, still from original setting.
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=61))
await hass.async_block_till_done()
assert mock_device.times_polled == 2
# Now the updated interval takes effect.
# Forward time, ensure single poll after 120 seconds.
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=121))
await hass.async_block_till_done()
assert mock_device.times_polled == 3
# Forward time, ensure single poll after 180 seconds.
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=181))
await hass.async_block_till_done()
assert mock_device.times_polled == 4
# Forward time, ensure single poll after 180 seconds.
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=181))
await hass.async_block_till_done()
assert mock_device.times_polled == 4

View file

@ -1,6 +1,7 @@
"""Test UPnP/IGD setup process."""
from __future__ import annotations
from unittest.mock import AsyncMock, Mock, patch
import pytest
from homeassistant.components import ssdp
from homeassistant.components.upnp.const import (
@ -8,51 +9,37 @@ from homeassistant.components.upnp.const import (
CONFIG_ENTRY_UDN,
DOMAIN,
)
from homeassistant.components.upnp.device import Device
from homeassistant.core import HomeAssistant
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.setup import async_setup_component
from .mock_device import MockDevice
from .common import TEST_DISCOVERY, TEST_ST, TEST_UDN
from .mock_ssdp_scanner import mock_ssdp_scanner # noqa: F401
from .mock_upnp_device import mock_upnp_device # noqa: F401
from tests.common import MockConfigEntry
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device")
async def test_async_setup_entry_default(hass: HomeAssistant):
"""Test async_setup_entry."""
udn = "uuid:device_1"
location = "http://192.168.1.1/desc.xml"
mock_device = MockDevice(udn)
discovery = {
ssdp.ATTR_SSDP_LOCATION: location,
ssdp.ATTR_SSDP_ST: mock_device.device_type,
ssdp.ATTR_UPNP_UDN: mock_device.udn,
ssdp.ATTR_SSDP_USN: mock_device.usn,
}
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONFIG_ENTRY_UDN: mock_device.udn,
CONFIG_ENTRY_ST: mock_device.device_type,
CONFIG_ENTRY_UDN: TEST_UDN,
CONFIG_ENTRY_ST: TEST_ST,
},
)
config = {
# no upnp
}
async_create_device = AsyncMock(return_value=mock_device)
mock_get_discovery = Mock()
with patch.object(Device, "async_create_device", async_create_device), patch.object(
ssdp, "async_get_discovery_info_by_udn_st", mock_get_discovery
):
# initialisation of component, no device discovered
mock_get_discovery.return_value = None
await async_setup_component(hass, "upnp", config)
await hass.async_block_till_done()
# Initialisation of component, no device discovered.
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
# loading of config_entry, device discovered
mock_get_discovery.return_value = discovery
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id) is True
# Device is discovered.
ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
# Speed up callback in ssdp.async_register_callback.
hass.state = CoreState.not_running
# ensure device is stored/used
async_create_device.assert_called_with(hass, discovery[ssdp.ATTR_SSDP_LOCATION])
# Load config_entry.
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id) is True