Refactoring upnp component (#43646)

This commit is contained in:
Steven Looman 2021-01-29 10:23:34 +01:00 committed by GitHub
parent f080af698d
commit 25c5c6aec9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 284 additions and 207 deletions

View file

@ -1,7 +1,6 @@
"""Open ports in your router for Home Assistant and provide statistics."""
import asyncio
from ipaddress import ip_address
from operator import itemgetter
import voluptuous as vol
@ -19,7 +18,6 @@ from .const import (
DISCOVERY_LOCATION,
DISCOVERY_ST,
DISCOVERY_UDN,
DISCOVERY_USN,
DOMAIN,
DOMAIN_CONFIG,
DOMAIN_COORDINATORS,
@ -38,46 +36,27 @@ CONFIG_SCHEMA = vol.Schema(
)
async def async_discover_and_construct(
hass: HomeAssistantType, udn: str = None, st: str = None
) -> Device:
async def async_construct_device(hass: HomeAssistantType, udn: str, st: str) -> Device:
"""Discovery devices and construct a Device for one."""
# pylint: disable=invalid-name
_LOGGER.debug("Constructing device: %s::%s", udn, st)
discovery_infos = await Device.async_discover(hass)
_LOGGER.debug("Discovered devices: %s", discovery_infos)
if not discovery_infos:
_LOGGER.info("No UPnP/IGD devices discovered")
discoveries = [
discovery
for discovery in await Device.async_discover(hass)
if discovery[DISCOVERY_UDN] == udn and discovery[DISCOVERY_ST] == st
]
if not discoveries:
_LOGGER.info("Device not discovered")
return None
if udn:
# Get the discovery info with specified UDN/ST.
filtered = [di for di in discovery_infos if di[DISCOVERY_UDN] == udn]
if st:
filtered = [di for di in filtered if di[DISCOVERY_ST] == st]
if not filtered:
_LOGGER.warning(
'Wanted UPnP/IGD device with UDN/ST "%s"/"%s" not found, aborting',
udn,
st,
)
return None
# Some additional clues for remote debugging.
if len(discoveries) > 1:
_LOGGER.info("Multiple devices discovered: %s", discoveries)
# Ensure we're always taking the latest, if we filtered only on UDN.
filtered = sorted(filtered, key=itemgetter(DISCOVERY_ST), reverse=True)
discovery_info = filtered[0]
else:
# Get the first/any.
discovery_info = discovery_infos[0]
if len(discovery_infos) > 1:
device_name = discovery_info.get(
DISCOVERY_USN, discovery_info.get(DISCOVERY_LOCATION, "")
)
_LOGGER.info("Detected multiple UPnP/IGD devices, using: %s", device_name)
_LOGGER.debug("Constructing from discovery_info: %s", discovery_info)
location = discovery_info[DISCOVERY_LOCATION]
discovery = discoveries[0]
_LOGGER.debug("Constructing from discovery: %s", discovery)
location = discovery[DISCOVERY_LOCATION]
return await Device.async_create_device(hass, location)
@ -110,10 +89,10 @@ async def async_setup_entry(hass: HomeAssistantType, config_entry: ConfigEntry)
_LOGGER.debug("Setting up config entry: %s", config_entry.unique_id)
# Discover and construct.
udn = config_entry.data.get(CONFIG_ENTRY_UDN)
st = config_entry.data.get(CONFIG_ENTRY_ST) # pylint: disable=invalid-name
udn = config_entry.data[CONFIG_ENTRY_UDN]
st = config_entry.data[CONFIG_ENTRY_ST] # pylint: disable=invalid-name
try:
device = await async_discover_and_construct(hass, udn, st)
device = await async_construct_device(hass, udn, st)
except asyncio.TimeoutError as err:
raise ConfigEntryNotReady from err

View file

@ -1,6 +1,6 @@
"""Config flow for UPNP."""
from datetime import timedelta
from typing import Mapping, Optional
from typing import Any, Mapping, Optional
import voluptuous as vol
@ -9,7 +9,7 @@ from homeassistant.components import ssdp
from homeassistant.const import CONF_SCAN_INTERVAL
from homeassistant.core import callback
from .const import ( # pylint: disable=unused-import
from .const import (
CONFIG_ENTRY_SCAN_INTERVAL,
CONFIG_ENTRY_ST,
CONFIG_ENTRY_UDN,
@ -18,6 +18,7 @@ from .const import ( # pylint: disable=unused-import
DISCOVERY_NAME,
DISCOVERY_ST,
DISCOVERY_UDN,
DISCOVERY_UNIQUE_ID,
DISCOVERY_USN,
DOMAIN,
DOMAIN_COORDINATORS,
@ -26,6 +27,16 @@ from .const import ( # pylint: disable=unused-import
from .device import Device
def discovery_info_to_discovery(discovery_info: Mapping) -> Mapping:
"""Convert a SSDP-discovery to 'our' discovery."""
return {
DISCOVERY_UDN: discovery_info[ssdp.ATTR_UPNP_UDN],
DISCOVERY_ST: discovery_info[ssdp.ATTR_SSDP_ST],
DISCOVERY_LOCATION: discovery_info[ssdp.ATTR_SSDP_LOCATION],
DISCOVERY_USN: discovery_info[ssdp.ATTR_SSDP_USN],
}
class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a UPnP/IGD config flow."""
@ -37,43 +48,46 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
# - user(None): scan --> user({...}) --> create_entry()
# - import(None) --> create_entry()
def __init__(self):
def __init__(self) -> None:
"""Initialize the UPnP/IGD config flow."""
self._discoveries: Mapping = None
async def async_step_user(self, user_input: Optional[Mapping] = None):
async def async_step_user(
self, user_input: Optional[Mapping] = None
) -> Mapping[str, Any]:
"""Handle a flow start."""
_LOGGER.debug("async_step_user: user_input: %s", user_input)
# This uses DISCOVERY_USN as the identifier for the device.
if user_input is not None:
# Ensure wanted device was discovered.
matching_discoveries = [
discovery
for discovery in self._discoveries
if discovery[DISCOVERY_USN] == user_input["usn"]
if discovery[DISCOVERY_UNIQUE_ID] == user_input["unique_id"]
]
if not matching_discoveries:
return self.async_abort(reason="no_devices_found")
discovery = matching_discoveries[0]
await self.async_set_unique_id(
discovery[DISCOVERY_USN], raise_on_progress=False
discovery[DISCOVERY_UNIQUE_ID], raise_on_progress=False
)
return await self._async_create_entry_from_discovery(discovery)
# Discover devices.
discoveries = await Device.async_discover(self.hass)
discoveries = [
await Device.async_supplement_discovery(self.hass, discovery)
for discovery in await Device.async_discover(self.hass)
]
# Store discoveries which have not been configured, add name for each discovery.
current_usns = {entry.unique_id for entry in self._async_current_entries()}
self._discoveries = [
{
**discovery,
DISCOVERY_NAME: await self._async_get_name_for_discovery(discovery),
# Store discoveries which have not been configured.
current_unique_ids = {
entry.unique_id for entry in self._async_current_entries()
}
self._discoveries = [
discovery
for discovery in discoveries
if discovery[DISCOVERY_USN] not in current_usns
if discovery[DISCOVERY_UNIQUE_ID] not in current_unique_ids
]
# Ensure anything to add.
@ -82,9 +96,9 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
data_schema = vol.Schema(
{
vol.Required("usn"): vol.In(
vol.Required("unique_id"): vol.In(
{
discovery[DISCOVERY_USN]: discovery[DISCOVERY_NAME]
discovery[DISCOVERY_UNIQUE_ID]: discovery[DISCOVERY_NAME]
for discovery in self._discoveries
}
),
@ -95,7 +109,9 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
data_schema=data_schema,
)
async def async_step_import(self, import_info: Optional[Mapping]):
async def async_step_import(
self, import_info: Optional[Mapping]
) -> Mapping[str, Any]:
"""Import a new UPnP/IGD device as a config entry.
This flow is triggered by `async_setup`. If no device has been
@ -119,18 +135,24 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return self.async_abort(reason="no_devices_found")
# Ensure complete discovery.
discovery_info = self._discoveries[0]
if DISCOVERY_USN not in discovery_info:
discovery = self._discoveries[0]
if (
DISCOVERY_UDN not in discovery
or DISCOVERY_ST not in discovery
or DISCOVERY_LOCATION not in discovery
or DISCOVERY_USN not in discovery
):
_LOGGER.debug("Incomplete discovery, ignoring")
return self.async_abort(reason="incomplete_discovery")
# Ensure not already configuring/configured.
usn = discovery_info[DISCOVERY_USN]
await self.async_set_unique_id(usn)
discovery = await Device.async_supplement_discovery(self.hass, discovery)
unique_id = discovery[DISCOVERY_UNIQUE_ID]
await self.async_set_unique_id(unique_id)
return await self._async_create_entry_from_discovery(discovery_info)
return await self._async_create_entry_from_discovery(discovery)
async def async_step_ssdp(self, discovery_info: Mapping):
async def async_step_ssdp(self, discovery_info: Mapping) -> Mapping[str, Any]:
"""Handle a discovered UPnP/IGD device.
This flow is triggered by the SSDP component. It will check if the
@ -142,36 +164,35 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
if (
ssdp.ATTR_UPNP_UDN not in discovery_info
or ssdp.ATTR_SSDP_ST not in discovery_info
or ssdp.ATTR_SSDP_LOCATION not in discovery_info
or ssdp.ATTR_SSDP_USN not in discovery_info
):
_LOGGER.debug("Incomplete discovery, ignoring")
return self.async_abort(reason="incomplete_discovery")
# Convert to something we understand/speak.
discovery = discovery_info_to_discovery(discovery_info)
# Ensure not already configuring/configured.
udn = discovery_info[ssdp.ATTR_UPNP_UDN]
st = discovery_info[ssdp.ATTR_SSDP_ST] # pylint: disable=invalid-name
usn = f"{udn}::{st}"
await self.async_set_unique_id(usn)
discovery = await Device.async_supplement_discovery(self.hass, discovery)
unique_id = discovery[DISCOVERY_UNIQUE_ID]
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
# Store discovery.
_LOGGER.debug("New discovery, continuing")
name = discovery_info.get("friendlyName", "")
discovery = {
DISCOVERY_UDN: udn,
DISCOVERY_ST: st,
DISCOVERY_NAME: name,
}
self._discoveries = [discovery]
# Ensure user recognizable.
# pylint: disable=no-member # https://github.com/PyCQA/pylint/issues/3167
self.context["title_placeholders"] = {
"name": name,
"name": discovery[DISCOVERY_NAME],
}
return await self.async_step_ssdp_confirm()
async def async_step_ssdp_confirm(self, user_input: Optional[Mapping] = None):
async def async_step_ssdp_confirm(
self, user_input: Optional[Mapping] = None
) -> Mapping[str, Any]:
"""Confirm integration via SSDP."""
_LOGGER.debug("async_step_ssdp_confirm: user_input: %s", user_input)
if user_input is None:
@ -182,24 +203,21 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
@staticmethod
@callback
def async_get_options_flow(config_entry):
def async_get_options_flow(
config_entry: config_entries.ConfigEntry,
) -> config_entries.OptionsFlow:
"""Define the config flow to handle options."""
return UpnpOptionsFlowHandler(config_entry)
async def _async_create_entry_from_discovery(
self,
discovery: Mapping,
):
) -> Mapping[str, Any]:
"""Create an entry from discovery."""
_LOGGER.debug(
"_async_create_entry_from_discovery: discovery: %s",
discovery,
)
# Get name from device, if not found already.
if DISCOVERY_NAME not in discovery and DISCOVERY_LOCATION in discovery:
discovery[DISCOVERY_NAME] = await self._async_get_name_for_discovery(
discovery
)
title = discovery.get(DISCOVERY_NAME, "")
data = {
@ -208,26 +226,18 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
}
return self.async_create_entry(title=title, data=data)
async def _async_get_name_for_discovery(self, discovery: Mapping):
"""Get the name of the device from a discovery."""
_LOGGER.debug("_async_get_name_for_discovery: discovery: %s", discovery)
device = await Device.async_create_device(
self.hass, discovery[DISCOVERY_LOCATION]
)
return device.name
class UpnpOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle a UPnP options flow."""
def __init__(self, config_entry):
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
"""Initialize."""
self.config_entry = config_entry
async def async_step_init(self, user_input=None):
async def async_step_init(self, user_input: Mapping = None) -> None:
"""Manage the options."""
if user_input is not None:
udn = self.config_entry.data.get(CONFIG_ENTRY_UDN)
udn = self.config_entry.data[CONFIG_ENTRY_UDN]
coordinator = self.hass.data[DOMAIN][DOMAIN_COORDINATORS][udn]
update_interval_sec = user_input.get(
CONFIG_ENTRY_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL

View file

@ -8,10 +8,10 @@ LOGGER = logging.getLogger(__package__)
CONF_LOCAL_IP = "local_ip"
DOMAIN = "upnp"
DOMAIN_CONFIG = "config"
DOMAIN_COORDINATORS = "coordinators"
DOMAIN_DEVICES = "devices"
DOMAIN_LOCAL_IP = "local_ip"
DOMAIN_CONFIG = "config"
BYTES_RECEIVED = "bytes_received"
BYTES_SENT = "bytes_sent"
PACKETS_RECEIVED = "packets_received"
@ -21,12 +21,13 @@ DATA_PACKETS = "packets"
DATA_RATE_PACKETS_PER_SECOND = f"{DATA_PACKETS}/{TIME_SECONDS}"
KIBIBYTE = 1024
UPDATE_INTERVAL = timedelta(seconds=30)
DISCOVERY_NAME = "name"
DISCOVERY_LOCATION = "location"
DISCOVERY_NAME = "name"
DISCOVERY_ST = "st"
DISCOVERY_UDN = "udn"
DISCOVERY_UNIQUE_ID = "unique_id"
DISCOVERY_USN = "usn"
CONFIG_ENTRY_UDN = "udn"
CONFIG_ENTRY_ST = "st"
CONFIG_ENTRY_SCAN_INTERVAL = "scan_interval"
CONFIG_ENTRY_ST = "st"
CONFIG_ENTRY_UDN = "udn"
DEFAULT_SCAN_INTERVAL = timedelta(seconds=30).seconds

View file

@ -1,4 +1,6 @@
"""Home Assistant representation of an UPnP/IGD."""
from __future__ import annotations
import asyncio
from ipaddress import IPv4Address
from typing import List, Mapping
@ -16,8 +18,10 @@ from .const import (
BYTES_SENT,
CONF_LOCAL_IP,
DISCOVERY_LOCATION,
DISCOVERY_NAME,
DISCOVERY_ST,
DISCOVERY_UDN,
DISCOVERY_UNIQUE_ID,
DISCOVERY_USN,
DOMAIN,
DOMAIN_CONFIG,
@ -29,12 +33,11 @@ from .const import (
class Device:
"""Home Assistant representation of an UPnP/IGD."""
"""Home Assistant representation of a UPnP/IGD device."""
def __init__(self, igd_device):
"""Initialize UPnP/IGD device."""
self._igd_device: IgdDevice = igd_device
self._mapped_ports = []
@classmethod
async def async_discover(cls, hass: HomeAssistantType) -> List[Mapping]:
@ -46,24 +49,35 @@ class Device:
if local_ip:
local_ip = IPv4Address(local_ip)
discovery_infos = await IgdDevice.async_search(source_ip=local_ip, timeout=10)
discoveries = await IgdDevice.async_search(source_ip=local_ip, timeout=10)
# add extra info and store devices
devices = []
for discovery_info in discovery_infos:
discovery_info[DISCOVERY_UDN] = discovery_info["_udn"]
discovery_info[DISCOVERY_ST] = discovery_info["st"]
discovery_info[DISCOVERY_LOCATION] = discovery_info["location"]
usn = f"{discovery_info[DISCOVERY_UDN]}::{discovery_info[DISCOVERY_ST]}"
discovery_info[DISCOVERY_USN] = usn
_LOGGER.debug("Discovered device: %s", discovery_info)
# Supplement/standardize discovery.
for discovery in discoveries:
discovery[DISCOVERY_UDN] = discovery["_udn"]
discovery[DISCOVERY_ST] = discovery["st"]
discovery[DISCOVERY_LOCATION] = discovery["location"]
discovery[DISCOVERY_USN] = discovery["usn"]
_LOGGER.debug("Discovered device: %s", discovery)
devices.append(discovery_info)
return devices
return discoveries
@classmethod
async def async_create_device(cls, hass: HomeAssistantType, ssdp_location: str):
async def async_supplement_discovery(
cls, hass: HomeAssistantType, discovery: Mapping
) -> Mapping:
"""Get additional data from device and supplement discovery."""
device = await Device.async_create_device(hass, discovery[DISCOVERY_LOCATION])
discovery[DISCOVERY_NAME] = device.name
# Set unique_id.
discovery[DISCOVERY_UNIQUE_ID] = discovery[DISCOVERY_USN]
return discovery
@classmethod
async def async_create_device(
cls, hass: HomeAssistantType, ssdp_location: str
) -> Device:
"""Create UPnP/IGD device."""
# build async_upnp_client requester
session = async_get_clientsession(hass)
@ -102,10 +116,15 @@ class Device:
"""Get the device type."""
return self._igd_device.device_type
@property
def usn(self) -> str:
"""Get the USN."""
return f"{self.udn}::{self.device_type}"
@property
def unique_id(self) -> str:
"""Get the unique id."""
return f"{self.udn}::{self.device_type}"
return self.usn
def __str__(self) -> str:
"""Get string representation."""

View file

@ -83,13 +83,7 @@ async def async_setup_entry(
hass, config_entry: ConfigEntry, async_add_entities
) -> None:
"""Set up the UPnP/IGD sensors."""
data = config_entry.data
if CONFIG_ENTRY_UDN in data:
udn = data[CONFIG_ENTRY_UDN]
else:
# any device will do
udn = list(hass.data[DOMAIN][DOMAIN_DEVICES])[0]
udn = config_entry.data[CONFIG_ENTRY_UDN]
device: Device = hass.data[DOMAIN][DOMAIN_DEVICES][udn]
update_interval_sec = config_entry.options.get(

View file

@ -21,8 +21,6 @@ class MockDevice(Device):
igd_device = object()
super().__init__(igd_device)
self._udn = udn
self.added_port_mappings = []
self.removed_port_mappings = []
@classmethod
async def async_create_device(cls, hass, ssdp_location):
@ -54,18 +52,6 @@ class MockDevice(Device):
"""Get the device type."""
return "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
async def _async_add_port_mapping(
self, external_port: int, local_ip: str, internal_port: int
) -> None:
"""Add a port mapping."""
entry = [external_port, local_ip, internal_port]
self.added_port_mappings.append(entry)
async def _async_delete_port_mapping(self, external_port: int) -> None:
"""Remove a port mapping."""
entry = external_port
self.removed_port_mappings.append(entry)
async def async_get_traffic_data(self) -> Mapping[str, any]:
"""Get traffic data."""
return {

View file

@ -11,10 +11,13 @@ from homeassistant.components.upnp.const import (
CONFIG_ENTRY_UDN,
DEFAULT_SCAN_INTERVAL,
DISCOVERY_LOCATION,
DISCOVERY_NAME,
DISCOVERY_ST,
DISCOVERY_UDN,
DISCOVERY_UNIQUE_ID,
DISCOVERY_USN,
DOMAIN,
DOMAIN_COORDINATORS,
)
from homeassistant.components.upnp.device import Device
from homeassistant.helpers.typing import HomeAssistantType
@ -28,25 +31,34 @@ from tests.common import MockConfigEntry
async def test_flow_ssdp_discovery(hass: HomeAssistantType):
"""Test config flow: discovered + configured through ssdp."""
udn = "uuid:device_1"
location = "dummy"
mock_device = MockDevice(udn)
discovery_infos = [
discoveries = [
{
DISCOVERY_LOCATION: location,
DISCOVERY_NAME: mock_device.name,
DISCOVERY_ST: mock_device.device_type,
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_LOCATION: "dummy",
DISCOVERY_UNIQUE_ID: mock_device.unique_id,
DISCOVERY_USN: mock_device.usn,
}
]
with patch.object(
Device, "async_create_device", AsyncMock(return_value=mock_device)
), patch.object(Device, "async_discover", AsyncMock(return_value=discovery_infos)):
), patch.object(
Device, "async_discover", AsyncMock(return_value=discoveries)
), patch.object(
Device, "async_supplement_discovery", AsyncMock(return_value=discoveries[0])
):
# Discovered via step ssdp.
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data={
ssdp.ATTR_SSDP_LOCATION: location,
ssdp.ATTR_SSDP_ST: mock_device.device_type,
ssdp.ATTR_SSDP_USN: mock_device.usn,
ssdp.ATTR_UPNP_UDN: mock_device.udn,
"friendlyName": mock_device.name,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
@ -69,17 +81,9 @@ async def test_flow_ssdp_discovery(hass: HomeAssistantType):
async def test_flow_ssdp_discovery_incomplete(hass: HomeAssistantType):
"""Test config flow: incomplete discovery through ssdp."""
udn = "uuid:device_1"
location = "dummy"
mock_device = MockDevice(udn)
discovery_infos = [
{
DISCOVERY_ST: mock_device.device_type,
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_LOCATION: "dummy",
}
]
with patch.object(
Device, "async_create_device", AsyncMock(return_value=mock_device)
), patch.object(Device, "async_discover", AsyncMock(return_value=discovery_infos)):
# Discovered via step ssdp.
result = await hass.config_entries.flow.async_init(
DOMAIN,
@ -87,7 +91,7 @@ async def test_flow_ssdp_discovery_incomplete(hass: HomeAssistantType):
data={
ssdp.ATTR_SSDP_ST: mock_device.device_type,
# ssdp.ATTR_UPNP_UDN: mock_device.udn, # Not provided.
"friendlyName": mock_device.name,
ssdp.ATTR_SSDP_LOCATION: location,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
@ -97,19 +101,26 @@ async def test_flow_ssdp_discovery_incomplete(hass: HomeAssistantType):
async def test_flow_user(hass: HomeAssistantType):
"""Test config flow: discovered + configured through user."""
udn = "uuid:device_1"
location = "dummy"
mock_device = MockDevice(udn)
discovery_infos = [
discoveries = [
{
DISCOVERY_USN: mock_device.unique_id,
DISCOVERY_LOCATION: location,
DISCOVERY_NAME: mock_device.name,
DISCOVERY_ST: mock_device.device_type,
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_LOCATION: "dummy",
DISCOVERY_UNIQUE_ID: mock_device.unique_id,
DISCOVERY_USN: mock_device.usn,
}
]
with patch.object(
Device, "async_create_device", AsyncMock(return_value=mock_device)
), patch.object(Device, "async_discover", AsyncMock(return_value=discovery_infos)):
), patch.object(
Device, "async_discover", AsyncMock(return_value=discoveries)
), patch.object(
Device, "async_supplement_discovery", AsyncMock(return_value=discoveries[0])
):
# Discovered via step user.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
@ -120,7 +131,7 @@ async def test_flow_user(hass: HomeAssistantType):
# Confirmed via step user.
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={"usn": mock_device.unique_id},
user_input={"unique_id": mock_device.unique_id},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
@ -135,18 +146,25 @@ async def test_flow_import(hass: HomeAssistantType):
"""Test config flow: discovered + configured through configuration.yaml."""
udn = "uuid:device_1"
mock_device = MockDevice(udn)
discovery_infos = [
location = "dummy"
discoveries = [
{
DISCOVERY_USN: mock_device.unique_id,
DISCOVERY_LOCATION: location,
DISCOVERY_NAME: mock_device.name,
DISCOVERY_ST: mock_device.device_type,
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_LOCATION: "dummy",
DISCOVERY_UNIQUE_ID: mock_device.unique_id,
DISCOVERY_USN: mock_device.usn,
}
]
with patch.object(
Device, "async_create_device", AsyncMock(return_value=mock_device)
), patch.object(Device, "async_discover", AsyncMock(return_value=discovery_infos)):
), patch.object(
Device, "async_discover", AsyncMock(return_value=discoveries)
), patch.object(
Device, "async_supplement_discovery", AsyncMock(return_value=discoveries[0])
):
# Discovered via step import.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
@ -160,18 +178,10 @@ async def test_flow_import(hass: HomeAssistantType):
}
async def test_flow_import_duplicate(hass: HomeAssistantType):
async def test_flow_import_already_configured(hass: HomeAssistantType):
"""Test config flow: discovered, but already configured."""
udn = "uuid:device_1"
mock_device = MockDevice(udn)
discovery_infos = [
{
DISCOVERY_USN: mock_device.unique_id,
DISCOVERY_ST: mock_device.device_type,
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_LOCATION: "dummy",
}
]
# Existing entry.
config_entry = MockConfigEntry(
@ -184,9 +194,6 @@ async def test_flow_import_duplicate(hass: HomeAssistantType):
)
config_entry.add_to_hass(hass)
with patch.object(
Device, "async_create_device", AsyncMock(return_value=mock_device)
), patch.object(Device, "async_discover", AsyncMock(return_value=discovery_infos)):
# Discovered via step import.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
@ -200,17 +207,19 @@ async def test_flow_import_incomplete(hass: HomeAssistantType):
"""Test config flow: incomplete discovery, configured through configuration.yaml."""
udn = "uuid:device_1"
mock_device = MockDevice(udn)
discovery_infos = [
location = "dummy"
discoveries = [
{
DISCOVERY_ST: mock_device.device_type,
DISCOVERY_LOCATION: location,
DISCOVERY_NAME: mock_device.name,
# DISCOVERY_ST: mock_device.device_type,
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_LOCATION: "dummy",
DISCOVERY_UNIQUE_ID: mock_device.unique_id,
DISCOVERY_USN: mock_device.usn,
}
]
with patch.object(
Device, "async_create_device", AsyncMock(return_value=mock_device)
), patch.object(Device, "async_discover", AsyncMock(return_value=discovery_infos)):
with patch.object(Device, "async_discover", AsyncMock(return_value=discoveries)):
# Discovered via step import.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
@ -224,12 +233,16 @@ async def test_options_flow(hass: HomeAssistantType):
"""Test options flow."""
# Set up config entry.
udn = "uuid:device_1"
location = "http://192.168.1.1/desc.xml"
mock_device = MockDevice(udn)
discovery_infos = [
discoveries = [
{
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_LOCATION: location,
DISCOVERY_NAME: mock_device.name,
DISCOVERY_ST: mock_device.device_type,
DISCOVERY_LOCATION: "http://192.168.1.1/desc.xml",
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_UNIQUE_ID: mock_device.unique_id,
DISCOVERY_USN: mock_device.usn,
}
]
config_entry = MockConfigEntry(
@ -245,16 +258,15 @@ async def test_options_flow(hass: HomeAssistantType):
config = {
# no upnp, ensures no import-flow is started.
}
async_discover = AsyncMock(return_value=discovery_infos)
with patch.object(
Device, "async_create_device", AsyncMock(return_value=mock_device)
), patch.object(Device, "async_discover", async_discover):
), patch.object(Device, "async_discover", AsyncMock(return_value=discoveries)):
# Initialisation of component.
await async_setup_component(hass, "upnp", config)
await hass.async_block_till_done()
# DataUpdateCoordinator gets a default of 30 seconds for updates.
coordinator = hass.data[DOMAIN]["coordinators"][mock_device.udn]
coordinator = hass.data[DOMAIN][DOMAIN_COORDINATORS][mock_device.udn]
assert coordinator.update_interval == timedelta(seconds=DEFAULT_SCAN_INTERVAL)
# Options flow with no input results in form.

View file

@ -4,9 +4,16 @@ from unittest.mock import AsyncMock, patch
from homeassistant.components import upnp
from homeassistant.components.upnp.const import (
CONFIG_ENTRY_ST,
CONFIG_ENTRY_UDN,
DISCOVERY_LOCATION,
DISCOVERY_NAME,
DISCOVERY_ST,
DISCOVERY_UDN,
DISCOVERY_UNIQUE_ID,
DISCOVERY_USN,
DOMAIN,
DOMAIN_DEVICES,
)
from homeassistant.components.upnp.device import Device
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
@ -20,35 +27,104 @@ from tests.common import MockConfigEntry
async def test_async_setup_entry_default(hass):
"""Test async_setup_entry."""
udn = "uuid:device_1"
location = "http://192.168.1.1/desc.xml"
mock_device = MockDevice(udn)
discovery_infos = [
discoveries = [
{
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_LOCATION: location,
DISCOVERY_NAME: mock_device.name,
DISCOVERY_ST: mock_device.device_type,
DISCOVERY_LOCATION: "http://192.168.1.1/desc.xml",
DISCOVERY_UDN: mock_device.udn,
DISCOVERY_UNIQUE_ID: mock_device.unique_id,
DISCOVERY_USN: mock_device.usn,
}
]
entry = MockConfigEntry(
domain=upnp.DOMAIN, data={"udn": mock_device.udn, "st": mock_device.device_type}
domain=upnp.DOMAIN,
data={
CONFIG_ENTRY_UDN: mock_device.udn,
CONFIG_ENTRY_ST: mock_device.device_type,
},
)
config = {
# no upnp
}
async_discover = AsyncMock(return_value=[])
async_discover = AsyncMock()
with patch.object(
Device, "async_create_device", AsyncMock(return_value=mock_device)
), patch.object(Device, "async_discover", async_discover):
# initialisation of component, no device discovered
async_discover.return_value = []
await async_setup_component(hass, "upnp", config)
await hass.async_block_till_done()
# loading of config_entry, device discovered
async_discover.return_value = discovery_infos
async_discover.return_value = discoveries
assert await upnp.async_setup_entry(hass, entry) is True
# ensure device is stored/used
assert hass.data[upnp.DOMAIN]["devices"][udn] == mock_device
assert hass.data[DOMAIN][DOMAIN_DEVICES][udn] == mock_device
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
await hass.async_block_till_done()
async def test_sync_setup_entry_multiple_discoveries(hass):
"""Test async_setup_entry."""
udn_0 = "uuid:device_1"
location_0 = "http://192.168.1.1/desc.xml"
mock_device_0 = MockDevice(udn_0)
udn_1 = "uuid:device_2"
location_1 = "http://192.168.1.2/desc.xml"
mock_device_1 = MockDevice(udn_1)
discoveries = [
{
DISCOVERY_LOCATION: location_0,
DISCOVERY_NAME: mock_device_0.name,
DISCOVERY_ST: mock_device_0.device_type,
DISCOVERY_UDN: mock_device_0.udn,
DISCOVERY_UNIQUE_ID: mock_device_0.unique_id,
DISCOVERY_USN: mock_device_0.usn,
},
{
DISCOVERY_LOCATION: location_1,
DISCOVERY_NAME: mock_device_1.name,
DISCOVERY_ST: mock_device_1.device_type,
DISCOVERY_UDN: mock_device_1.udn,
DISCOVERY_UNIQUE_ID: mock_device_1.unique_id,
DISCOVERY_USN: mock_device_1.usn,
},
]
entry = MockConfigEntry(
domain=upnp.DOMAIN,
data={
CONFIG_ENTRY_UDN: mock_device_1.udn,
CONFIG_ENTRY_ST: mock_device_1.device_type,
},
)
config = {
# no upnp
}
async_create_device = AsyncMock(return_value=mock_device_1)
async_discover = AsyncMock()
with patch.object(Device, "async_create_device", async_create_device), patch.object(
Device, "async_discover", async_discover
):
# initialisation of component, no device discovered
async_discover.return_value = []
await async_setup_component(hass, "upnp", config)
await hass.async_block_till_done()
# loading of config_entry, device discovered
async_discover.return_value = discoveries
assert await upnp.async_setup_entry(hass, entry) is True
# ensure device is stored/used
async_create_device.assert_called_with(hass, discoveries[1][DISCOVERY_LOCATION])
assert udn_0 not in hass.data[DOMAIN][DOMAIN_DEVICES]
assert hass.data[DOMAIN][DOMAIN_DEVICES][udn_1] == mock_device_1
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
await hass.async_block_till_done()