Better handle devices changing UDN and/or location in upnp component (#70008)
This commit is contained in:
parent
bfc82b030f
commit
bddfbe01f3
12 changed files with 583 additions and 184 deletions
|
@ -8,7 +8,7 @@ from datetime import timedelta
|
||||||
from ipaddress import ip_address
|
from ipaddress import ip_address
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from async_upnp_client.exceptions import UpnpConnectionError
|
from async_upnp_client.exceptions import UpnpCommunicationError, UpnpConnectionError
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
from homeassistant import config_entries
|
from homeassistant import config_entries
|
||||||
|
@ -26,19 +26,20 @@ from homeassistant.helpers.typing import ConfigType
|
||||||
from homeassistant.helpers.update_coordinator import (
|
from homeassistant.helpers.update_coordinator import (
|
||||||
CoordinatorEntity,
|
CoordinatorEntity,
|
||||||
DataUpdateCoordinator,
|
DataUpdateCoordinator,
|
||||||
|
UpdateFailed,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .const import (
|
from .const import (
|
||||||
CONF_LOCAL_IP,
|
CONF_LOCAL_IP,
|
||||||
CONFIG_ENTRY_HOSTNAME,
|
CONFIG_ENTRY_MAC_ADDRESS,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN,
|
||||||
CONFIG_ENTRY_ST,
|
CONFIG_ENTRY_ST,
|
||||||
CONFIG_ENTRY_UDN,
|
CONFIG_ENTRY_UDN,
|
||||||
DEFAULT_SCAN_INTERVAL,
|
DEFAULT_SCAN_INTERVAL,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
DOMAIN_DEVICES,
|
|
||||||
LOGGER,
|
LOGGER,
|
||||||
)
|
)
|
||||||
from .device import Device
|
from .device import Device, async_get_mac_address_from_host
|
||||||
|
|
||||||
NOTIFICATION_ID = "upnp_notification"
|
NOTIFICATION_ID = "upnp_notification"
|
||||||
NOTIFICATION_TITLE = "UPnP/IGD Setup"
|
NOTIFICATION_TITLE = "UPnP/IGD Setup"
|
||||||
|
@ -65,9 +66,7 @@ CONFIG_SCHEMA = vol.Schema(
|
||||||
|
|
||||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||||
"""Set up UPnP component."""
|
"""Set up UPnP component."""
|
||||||
hass.data[DOMAIN] = {
|
hass.data[DOMAIN] = {}
|
||||||
DOMAIN_DEVICES: {},
|
|
||||||
}
|
|
||||||
|
|
||||||
# Only start if set up via configuration.yaml.
|
# Only start if set up via configuration.yaml.
|
||||||
if DOMAIN in config:
|
if DOMAIN in config:
|
||||||
|
@ -82,7 +81,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||||
|
|
||||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
"""Set up UPnP/IGD device from a config entry."""
|
"""Set up UPnP/IGD device from a config entry."""
|
||||||
LOGGER.debug("Setting up config entry: %s", entry.unique_id)
|
LOGGER.debug("Setting up config entry: %s", entry.entry_id)
|
||||||
|
|
||||||
udn = entry.data[CONFIG_ENTRY_UDN]
|
udn = entry.data[CONFIG_ENTRY_UDN]
|
||||||
st = entry.data[CONFIG_ENTRY_ST] # pylint: disable=invalid-name
|
st = entry.data[CONFIG_ENTRY_ST] # pylint: disable=invalid-name
|
||||||
|
@ -126,67 +125,99 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
try:
|
try:
|
||||||
device = await Device.async_create_device(hass, location)
|
device = await Device.async_create_device(hass, location)
|
||||||
except UpnpConnectionError as err:
|
except UpnpConnectionError as err:
|
||||||
LOGGER.debug("Error connecting to device %s", location)
|
LOGGER.debug(
|
||||||
|
"Error connecting to device at location: %s, err: %s", location, err
|
||||||
|
)
|
||||||
raise ConfigEntryNotReady from err
|
raise ConfigEntryNotReady from err
|
||||||
|
|
||||||
# Ensure entry has a unique_id.
|
# Track the original UDN such that existing sensors do not change their unique_id.
|
||||||
if not entry.unique_id:
|
if CONFIG_ENTRY_ORIGINAL_UDN not in entry.data:
|
||||||
LOGGER.debug(
|
|
||||||
"Setting unique_id: %s, for config_entry: %s",
|
|
||||||
device.unique_id,
|
|
||||||
entry,
|
|
||||||
)
|
|
||||||
hass.config_entries.async_update_entry(
|
hass.config_entries.async_update_entry(
|
||||||
entry=entry,
|
entry=entry,
|
||||||
unique_id=device.unique_id,
|
data={
|
||||||
|
**entry.data,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN: device.udn,
|
||||||
|
},
|
||||||
)
|
)
|
||||||
|
device.original_udn = entry.data[CONFIG_ENTRY_ORIGINAL_UDN]
|
||||||
|
|
||||||
# Ensure entry has a hostname, for older entries.
|
# Store mac address for changed UDN matching.
|
||||||
if (
|
if device.host:
|
||||||
CONFIG_ENTRY_HOSTNAME not in entry.data
|
device.mac_address = await async_get_mac_address_from_host(hass, device.host)
|
||||||
or entry.data[CONFIG_ENTRY_HOSTNAME] != device.hostname
|
if device.mac_address and not entry.data.get("CONFIG_ENTRY_MAC_ADDRESS"):
|
||||||
):
|
|
||||||
hass.config_entries.async_update_entry(
|
hass.config_entries.async_update_entry(
|
||||||
entry=entry,
|
entry=entry,
|
||||||
data={CONFIG_ENTRY_HOSTNAME: device.hostname, **entry.data},
|
data={
|
||||||
|
**entry.data,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: device.mac_address,
|
||||||
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create device registry entry.
|
connections = {(dr.CONNECTION_UPNP, device.udn)}
|
||||||
|
if device.mac_address:
|
||||||
|
connections.add((dr.CONNECTION_NETWORK_MAC, device.mac_address))
|
||||||
|
|
||||||
device_registry = dr.async_get(hass)
|
device_registry = dr.async_get(hass)
|
||||||
device_registry.async_get_or_create(
|
device_entry = device_registry.async_get_device(
|
||||||
config_entry_id=entry.entry_id,
|
identifiers=set(), connections=connections
|
||||||
connections={(dr.CONNECTION_UPNP, device.udn)},
|
|
||||||
identifiers={(DOMAIN, device.udn)},
|
|
||||||
name=device.name,
|
|
||||||
manufacturer=device.manufacturer,
|
|
||||||
model=device.model_name,
|
|
||||||
)
|
)
|
||||||
|
if device_entry:
|
||||||
|
LOGGER.debug(
|
||||||
|
"Found device using connections: %s, device_entry: %s",
|
||||||
|
connections,
|
||||||
|
device_entry,
|
||||||
|
)
|
||||||
|
if not device_entry:
|
||||||
|
# No device found, create new device entry.
|
||||||
|
device_entry = device_registry.async_get_or_create(
|
||||||
|
config_entry_id=entry.entry_id,
|
||||||
|
connections=connections,
|
||||||
|
identifiers={(DOMAIN, device.usn)},
|
||||||
|
name=device.name,
|
||||||
|
manufacturer=device.manufacturer,
|
||||||
|
model=device.model_name,
|
||||||
|
)
|
||||||
|
LOGGER.debug(
|
||||||
|
"Created device using UDN '%s', device_entry: %s", device.udn, device_entry
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Update identifier.
|
||||||
|
device_entry = device_registry.async_update_device(
|
||||||
|
device_entry.id,
|
||||||
|
new_identifiers={(DOMAIN, device.usn)},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert device_entry
|
||||||
update_interval = timedelta(seconds=DEFAULT_SCAN_INTERVAL)
|
update_interval = timedelta(seconds=DEFAULT_SCAN_INTERVAL)
|
||||||
LOGGER.debug("update_interval: %s", update_interval)
|
|
||||||
coordinator = UpnpDataUpdateCoordinator(
|
coordinator = UpnpDataUpdateCoordinator(
|
||||||
hass,
|
hass,
|
||||||
device=device,
|
device=device,
|
||||||
|
device_entry=device_entry,
|
||||||
update_interval=update_interval,
|
update_interval=update_interval,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Try an initial refresh.
|
||||||
|
await coordinator.async_config_entry_first_refresh()
|
||||||
|
|
||||||
# Save coordinator.
|
# Save coordinator.
|
||||||
hass.data[DOMAIN][entry.entry_id] = coordinator
|
hass.data[DOMAIN][entry.entry_id] = coordinator
|
||||||
|
|
||||||
await coordinator.async_config_entry_first_refresh()
|
|
||||||
|
|
||||||
# Setup platforms, creating sensors/binary_sensors.
|
# Setup platforms, creating sensors/binary_sensors.
|
||||||
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
|
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
|
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
"""Unload a UPnP/IGD device from a config entry."""
|
"""Unload a UPnP/IGD device from a config entry."""
|
||||||
LOGGER.debug("Unloading config entry: %s", config_entry.unique_id)
|
LOGGER.debug("Unloading config entry: %s", entry.entry_id)
|
||||||
|
|
||||||
# Unload platforms.
|
# Unload platforms.
|
||||||
return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)
|
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
|
||||||
|
if unload_ok:
|
||||||
|
del hass.data[DOMAIN][entry.entry_id]
|
||||||
|
|
||||||
|
return unload_ok
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
@ -209,26 +240,45 @@ class UpnpDataUpdateCoordinator(DataUpdateCoordinator):
|
||||||
"""Define an object to update data from UPNP device."""
|
"""Define an object to update data from UPNP device."""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, hass: HomeAssistant, device: Device, update_interval: timedelta
|
self,
|
||||||
|
hass: HomeAssistant,
|
||||||
|
device: Device,
|
||||||
|
device_entry: dr.DeviceEntry,
|
||||||
|
update_interval: timedelta,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Initialize."""
|
"""Initialize."""
|
||||||
self.device = device
|
self.device = device
|
||||||
|
self.device_entry = device_entry
|
||||||
|
|
||||||
super().__init__(
|
super().__init__(
|
||||||
hass, LOGGER, name=device.name, update_interval=update_interval
|
hass,
|
||||||
|
LOGGER,
|
||||||
|
name=device.name,
|
||||||
|
update_interval=update_interval,
|
||||||
|
update_method=self._async_fetch_data,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _async_update_data(self) -> Mapping[str, Any]:
|
async def _async_fetch_data(self) -> Mapping[str, Any]:
|
||||||
"""Update data."""
|
"""Update data."""
|
||||||
update_values = await asyncio.gather(
|
try:
|
||||||
self.device.async_get_traffic_data(),
|
update_values = await asyncio.gather(
|
||||||
self.device.async_get_status(),
|
self.device.async_get_traffic_data(),
|
||||||
)
|
self.device.async_get_status(),
|
||||||
|
)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
**update_values[0],
|
**update_values[0],
|
||||||
**update_values[1],
|
**update_values[1],
|
||||||
}
|
}
|
||||||
|
except UpnpCommunicationError as exception:
|
||||||
|
LOGGER.debug(
|
||||||
|
"Caught exception when updating device: %s, exception: %s",
|
||||||
|
self.device,
|
||||||
|
exception,
|
||||||
|
)
|
||||||
|
raise UpdateFailed(
|
||||||
|
f"Unable to communicate with IGD at: {self.device.device_url}"
|
||||||
|
) from exception
|
||||||
|
|
||||||
|
|
||||||
class UpnpEntity(CoordinatorEntity[UpnpDataUpdateCoordinator]):
|
class UpnpEntity(CoordinatorEntity[UpnpDataUpdateCoordinator]):
|
||||||
|
@ -247,13 +297,13 @@ class UpnpEntity(CoordinatorEntity[UpnpDataUpdateCoordinator]):
|
||||||
self._device = coordinator.device
|
self._device = coordinator.device
|
||||||
self.entity_description = entity_description
|
self.entity_description = entity_description
|
||||||
self._attr_name = f"{coordinator.device.name} {entity_description.name}"
|
self._attr_name = f"{coordinator.device.name} {entity_description.name}"
|
||||||
self._attr_unique_id = f"{coordinator.device.udn}_{entity_description.unique_id or entity_description.key}"
|
self._attr_unique_id = f"{coordinator.device.original_udn}_{entity_description.unique_id or entity_description.key}"
|
||||||
self._attr_device_info = DeviceInfo(
|
self._attr_device_info = DeviceInfo(
|
||||||
connections={(dr.CONNECTION_UPNP, coordinator.device.udn)},
|
connections=coordinator.device_entry.connections,
|
||||||
name=coordinator.device.name,
|
name=coordinator.device_entry.name,
|
||||||
manufacturer=coordinator.device.manufacturer,
|
manufacturer=coordinator.device_entry.manufacturer,
|
||||||
model=coordinator.device.model_name,
|
model=coordinator.device_entry.model,
|
||||||
configuration_url=f"http://{coordinator.device.hostname}",
|
configuration_url=coordinator.device_entry.configuration_url,
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|
|
@ -14,7 +14,9 @@ from homeassistant.core import HomeAssistant
|
||||||
from homeassistant.data_entry_flow import FlowResult
|
from homeassistant.data_entry_flow import FlowResult
|
||||||
|
|
||||||
from .const import (
|
from .const import (
|
||||||
CONFIG_ENTRY_HOSTNAME,
|
CONFIG_ENTRY_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN,
|
||||||
CONFIG_ENTRY_ST,
|
CONFIG_ENTRY_ST,
|
||||||
CONFIG_ENTRY_UDN,
|
CONFIG_ENTRY_UDN,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
|
@ -23,6 +25,7 @@ from .const import (
|
||||||
ST_IGD_V1,
|
ST_IGD_V1,
|
||||||
ST_IGD_V2,
|
ST_IGD_V2,
|
||||||
)
|
)
|
||||||
|
from .device import async_get_mac_address_from_host
|
||||||
|
|
||||||
|
|
||||||
def _friendly_name_from_discovery(discovery_info: ssdp.SsdpServiceInfo) -> str:
|
def _friendly_name_from_discovery(discovery_info: ssdp.SsdpServiceInfo) -> str:
|
||||||
|
@ -50,15 +53,13 @@ async def _async_wait_for_discoveries(hass: HomeAssistant) -> bool:
|
||||||
device_discovered_event = asyncio.Event()
|
device_discovered_event = asyncio.Event()
|
||||||
|
|
||||||
async def device_discovered(info: SsdpServiceInfo, change: SsdpChange) -> None:
|
async def device_discovered(info: SsdpServiceInfo, change: SsdpChange) -> None:
|
||||||
if change == SsdpChange.BYEBYE:
|
if change != SsdpChange.BYEBYE:
|
||||||
return
|
LOGGER.info(
|
||||||
|
"Device discovered: %s, at: %s",
|
||||||
LOGGER.info(
|
info.ssdp_usn,
|
||||||
"Device discovered: %s, at: %s",
|
info.ssdp_location,
|
||||||
info.ssdp_usn,
|
)
|
||||||
info.ssdp_location,
|
device_discovered_event.set()
|
||||||
)
|
|
||||||
device_discovered_event.set()
|
|
||||||
|
|
||||||
cancel_discovered_callback_1 = await ssdp.async_register_callback(
|
cancel_discovered_callback_1 = await ssdp.async_register_callback(
|
||||||
hass,
|
hass,
|
||||||
|
@ -97,6 +98,14 @@ async def _async_discover_igd_devices(
|
||||||
) + await ssdp.async_get_discovery_info_by_st(hass, ST_IGD_V2)
|
) + await ssdp.async_get_discovery_info_by_st(hass, ST_IGD_V2)
|
||||||
|
|
||||||
|
|
||||||
|
async def _async_mac_address_from_discovery(
|
||||||
|
hass: HomeAssistant, discovery: SsdpServiceInfo
|
||||||
|
) -> str | None:
|
||||||
|
"""Get the mac address from a discovery."""
|
||||||
|
host = discovery.ssdp_headers["_host"]
|
||||||
|
return await async_get_mac_address_from_host(hass, host)
|
||||||
|
|
||||||
|
|
||||||
class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
"""Handle a UPnP/IGD config flow."""
|
"""Handle a UPnP/IGD config flow."""
|
||||||
|
|
||||||
|
@ -118,15 +127,13 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
if user_input is not None:
|
if user_input is not None:
|
||||||
# Ensure wanted device was discovered.
|
# Ensure wanted device was discovered.
|
||||||
assert self._discoveries
|
assert self._discoveries
|
||||||
matching_discoveries = [
|
discovery = next(
|
||||||
discovery
|
iter(
|
||||||
for discovery in self._discoveries
|
discovery
|
||||||
if discovery.ssdp_usn == user_input["unique_id"]
|
for discovery in self._discoveries
|
||||||
]
|
if discovery.ssdp_usn == user_input["unique_id"]
|
||||||
if not matching_discoveries:
|
)
|
||||||
return self.async_abort(reason="no_devices_found")
|
)
|
||||||
|
|
||||||
discovery = matching_discoveries[0]
|
|
||||||
await self.async_set_unique_id(discovery.ssdp_usn, raise_on_progress=False)
|
await self.async_set_unique_id(discovery.ssdp_usn, raise_on_progress=False)
|
||||||
return await self._async_create_entry_from_discovery(discovery)
|
return await self._async_create_entry_from_discovery(discovery)
|
||||||
|
|
||||||
|
@ -217,21 +224,46 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
# Ensure not already configuring/configured.
|
# Ensure not already configuring/configured.
|
||||||
unique_id = discovery_info.ssdp_usn
|
unique_id = discovery_info.ssdp_usn
|
||||||
await self.async_set_unique_id(unique_id)
|
await self.async_set_unique_id(unique_id)
|
||||||
hostname = discovery_info.ssdp_headers["_host"]
|
mac_address = await _async_mac_address_from_discovery(self.hass, discovery_info)
|
||||||
self._abort_if_unique_id_configured(
|
self._abort_if_unique_id_configured(
|
||||||
updates={CONFIG_ENTRY_HOSTNAME: hostname}, reload_on_update=False
|
# Store mac address for older entries.
|
||||||
|
# The location is stored in the config entry such that when the location changes, the entry is reloaded.
|
||||||
|
updates={
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: mac_address,
|
||||||
|
CONFIG_ENTRY_LOCATION: discovery_info.ssdp_location,
|
||||||
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Handle devices changing their UDN, only allow a single host.
|
# Handle devices changing their UDN, only allow a single host.
|
||||||
existing_entries = self._async_current_entries()
|
for entry in self._async_current_entries(include_ignore=True):
|
||||||
for config_entry in existing_entries:
|
entry_mac_address = entry.data.get(CONFIG_ENTRY_MAC_ADDRESS)
|
||||||
entry_hostname = config_entry.data.get(CONFIG_ENTRY_HOSTNAME)
|
entry_st = entry.data.get(CONFIG_ENTRY_ST)
|
||||||
if entry_hostname == hostname:
|
if entry_mac_address != mac_address:
|
||||||
LOGGER.debug(
|
continue
|
||||||
"Found existing config_entry with same hostname, discovery ignored"
|
|
||||||
)
|
if discovery_info.ssdp_st != entry_st:
|
||||||
|
# Check ssdp_st to prevent swapping between IGDv1 and IGDv2.
|
||||||
|
continue
|
||||||
|
|
||||||
|
if entry.source == config_entries.SOURCE_IGNORE:
|
||||||
|
# Host was already ignored. Don't update ignored entries.
|
||||||
return self.async_abort(reason="discovery_ignored")
|
return self.async_abort(reason="discovery_ignored")
|
||||||
|
|
||||||
|
LOGGER.debug("Updating entry: %s", entry.entry_id)
|
||||||
|
self.hass.config_entries.async_update_entry(
|
||||||
|
entry,
|
||||||
|
unique_id=unique_id,
|
||||||
|
data={**entry.data, CONFIG_ENTRY_UDN: discovery_info.ssdp_udn},
|
||||||
|
)
|
||||||
|
if entry.state == config_entries.ConfigEntryState.LOADED:
|
||||||
|
# Only reload when entry has state LOADED; when entry has state SETUP_RETRY,
|
||||||
|
# another load is started, causing the entry to be loaded twice.
|
||||||
|
LOGGER.debug("Reloading entry: %s", entry.entry_id)
|
||||||
|
self.hass.async_create_task(
|
||||||
|
self.hass.config_entries.async_reload(entry.entry_id)
|
||||||
|
)
|
||||||
|
return self.async_abort(reason="config_entry_updated")
|
||||||
|
|
||||||
# Store discovery.
|
# Store discovery.
|
||||||
self._discoveries = [discovery_info]
|
self._discoveries = [discovery_info]
|
||||||
|
|
||||||
|
@ -265,9 +297,12 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
)
|
)
|
||||||
|
|
||||||
title = _friendly_name_from_discovery(discovery)
|
title = _friendly_name_from_discovery(discovery)
|
||||||
|
mac_address = await _async_mac_address_from_discovery(self.hass, discovery)
|
||||||
data = {
|
data = {
|
||||||
CONFIG_ENTRY_UDN: discovery.upnp[ssdp.ATTR_UPNP_UDN],
|
CONFIG_ENTRY_UDN: discovery.upnp[ssdp.ATTR_UPNP_UDN],
|
||||||
CONFIG_ENTRY_ST: discovery.ssdp_st,
|
CONFIG_ENTRY_ST: discovery.ssdp_st,
|
||||||
CONFIG_ENTRY_HOSTNAME: discovery.ssdp_headers["_host"],
|
CONFIG_ENTRY_ORIGINAL_UDN: discovery.upnp[ssdp.ATTR_UPNP_UDN],
|
||||||
|
CONFIG_ENTRY_LOCATION: discovery.ssdp_location,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: mac_address,
|
||||||
}
|
}
|
||||||
return self.async_create_entry(title=title, data=data)
|
return self.async_create_entry(title=title, data=data)
|
||||||
|
|
|
@ -8,7 +8,6 @@ LOGGER = logging.getLogger(__package__)
|
||||||
|
|
||||||
CONF_LOCAL_IP = "local_ip"
|
CONF_LOCAL_IP = "local_ip"
|
||||||
DOMAIN = "upnp"
|
DOMAIN = "upnp"
|
||||||
DOMAIN_DEVICES = "devices"
|
|
||||||
BYTES_RECEIVED = "bytes_received"
|
BYTES_RECEIVED = "bytes_received"
|
||||||
BYTES_SENT = "bytes_sent"
|
BYTES_SENT = "bytes_sent"
|
||||||
PACKETS_RECEIVED = "packets_received"
|
PACKETS_RECEIVED = "packets_received"
|
||||||
|
@ -22,7 +21,9 @@ ROUTER_UPTIME = "uptime"
|
||||||
KIBIBYTE = 1024
|
KIBIBYTE = 1024
|
||||||
CONFIG_ENTRY_ST = "st"
|
CONFIG_ENTRY_ST = "st"
|
||||||
CONFIG_ENTRY_UDN = "udn"
|
CONFIG_ENTRY_UDN = "udn"
|
||||||
CONFIG_ENTRY_HOSTNAME = "hostname"
|
CONFIG_ENTRY_ORIGINAL_UDN = "original_udn"
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS = "mac_address"
|
||||||
|
CONFIG_ENTRY_LOCATION = "location"
|
||||||
DEFAULT_SCAN_INTERVAL = timedelta(seconds=30).total_seconds()
|
DEFAULT_SCAN_INTERVAL = timedelta(seconds=30).total_seconds()
|
||||||
ST_IGD_V1 = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
|
ST_IGD_V1 = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
|
||||||
ST_IGD_V2 = "urn:schemas-upnp-org:device:InternetGatewayDevice:2"
|
ST_IGD_V2 = "urn:schemas-upnp-org:device:InternetGatewayDevice:2"
|
||||||
|
|
|
@ -3,6 +3,8 @@ from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
from collections.abc import Mapping
|
from collections.abc import Mapping
|
||||||
|
from functools import partial
|
||||||
|
from ipaddress import ip_address
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
@ -11,9 +13,8 @@ from async_upnp_client.client import UpnpDevice
|
||||||
from async_upnp_client.client_factory import UpnpFactory
|
from async_upnp_client.client_factory import UpnpFactory
|
||||||
from async_upnp_client.exceptions import UpnpError
|
from async_upnp_client.exceptions import UpnpError
|
||||||
from async_upnp_client.profiles.igd import IgdDevice, StatusInfo
|
from async_upnp_client.profiles.igd import IgdDevice, StatusInfo
|
||||||
|
from getmac import get_mac_address
|
||||||
|
|
||||||
from homeassistant.components import ssdp
|
|
||||||
from homeassistant.components.ssdp import SsdpChange, SsdpServiceInfo
|
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import HomeAssistant
|
||||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
||||||
|
@ -32,6 +33,20 @@ from .const import (
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def async_get_mac_address_from_host(hass: HomeAssistant, host: str) -> str | None:
|
||||||
|
"""Get mac address from host."""
|
||||||
|
ip_addr = ip_address(host)
|
||||||
|
if ip_addr.version == 4:
|
||||||
|
mac_address = await hass.async_add_executor_job(
|
||||||
|
partial(get_mac_address, ip=host)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
mac_address = await hass.async_add_executor_job(
|
||||||
|
partial(get_mac_address, ip6=host)
|
||||||
|
)
|
||||||
|
return mac_address
|
||||||
|
|
||||||
|
|
||||||
async def async_create_upnp_device(
|
async def async_create_upnp_device(
|
||||||
hass: HomeAssistant, ssdp_location: str
|
hass: HomeAssistant, ssdp_location: str
|
||||||
) -> UpnpDevice:
|
) -> UpnpDevice:
|
||||||
|
@ -51,6 +66,7 @@ class Device:
|
||||||
self.hass = hass
|
self.hass = hass
|
||||||
self._igd_device = igd_device
|
self._igd_device = igd_device
|
||||||
self.coordinator: DataUpdateCoordinator | None = None
|
self.coordinator: DataUpdateCoordinator | None = None
|
||||||
|
self._mac_address: str | None = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def async_create_device(
|
async def async_create_device(
|
||||||
|
@ -63,36 +79,27 @@ class Device:
|
||||||
igd_device = IgdDevice(upnp_device, None)
|
igd_device = IgdDevice(upnp_device, None)
|
||||||
device = cls(hass, igd_device)
|
device = cls(hass, igd_device)
|
||||||
|
|
||||||
# Register SSDP callback for updates.
|
|
||||||
usn = f"{upnp_device.udn}::{upnp_device.device_type}"
|
|
||||||
await ssdp.async_register_callback(
|
|
||||||
hass, device.async_ssdp_callback, {"usn": usn}
|
|
||||||
)
|
|
||||||
|
|
||||||
return device
|
return device
|
||||||
|
|
||||||
async def async_ssdp_callback(
|
@property
|
||||||
self, service_info: SsdpServiceInfo, change: SsdpChange
|
def mac_address(self) -> str | None:
|
||||||
) -> None:
|
"""Get the mac address."""
|
||||||
"""SSDP callback, update if needed."""
|
return self._mac_address
|
||||||
_LOGGER.debug(
|
|
||||||
"SSDP Callback, change: %s, headers: %s", change, service_info.ssdp_headers
|
|
||||||
)
|
|
||||||
if service_info.ssdp_location is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
if change == SsdpChange.ALIVE:
|
@mac_address.setter
|
||||||
# We care only about updates.
|
def mac_address(self, mac_address: str) -> None:
|
||||||
return
|
"""Set the mac address."""
|
||||||
|
self._mac_address = mac_address
|
||||||
|
|
||||||
device = self._igd_device.device
|
@property
|
||||||
if service_info.ssdp_location == device.device_url:
|
def original_udn(self) -> str | None:
|
||||||
return
|
"""Get the mac address."""
|
||||||
|
return self._original_udn
|
||||||
|
|
||||||
new_upnp_device = await async_create_upnp_device(
|
@original_udn.setter
|
||||||
self.hass, service_info.ssdp_location
|
def original_udn(self, original_udn: str) -> None:
|
||||||
)
|
"""Set the original UDN."""
|
||||||
device.reinit(new_upnp_device)
|
self._original_udn = original_udn
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def udn(self) -> str:
|
def udn(self) -> str:
|
||||||
|
@ -130,12 +137,22 @@ class Device:
|
||||||
return self.usn
|
return self.usn
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def hostname(self) -> str | None:
|
def host(self) -> str | None:
|
||||||
"""Get the hostname."""
|
"""Get the hostname."""
|
||||||
url = self._igd_device.device.device_url
|
url = self._igd_device.device.device_url
|
||||||
parsed = urlparse(url)
|
parsed = urlparse(url)
|
||||||
return parsed.hostname
|
return parsed.hostname
|
||||||
|
|
||||||
|
@property
|
||||||
|
def device_url(self) -> str:
|
||||||
|
"""Get the device_url of the device."""
|
||||||
|
return self._igd_device.device.device_url
|
||||||
|
|
||||||
|
@property
|
||||||
|
def serial_number(self) -> str | None:
|
||||||
|
"""Get the serial number."""
|
||||||
|
return self._igd_device.device.serial_number
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
"""Get string representation."""
|
"""Get string representation."""
|
||||||
return f"IGD Device: {self.name}/{self.udn}::{self.device_type}"
|
return f"IGD Device: {self.name}/{self.udn}::{self.device_type}"
|
||||||
|
@ -179,7 +196,7 @@ class Device:
|
||||||
return_exceptions=True,
|
return_exceptions=True,
|
||||||
)
|
)
|
||||||
status_info: StatusInfo | None = None
|
status_info: StatusInfo | None = None
|
||||||
ip_address: str | None = None
|
router_ip: str | None = None
|
||||||
|
|
||||||
for idx, value in enumerate(values):
|
for idx, value in enumerate(values):
|
||||||
if isinstance(value, UpnpError):
|
if isinstance(value, UpnpError):
|
||||||
|
@ -199,10 +216,10 @@ class Device:
|
||||||
if isinstance(value, StatusInfo):
|
if isinstance(value, StatusInfo):
|
||||||
status_info = value
|
status_info = value
|
||||||
elif isinstance(value, str):
|
elif isinstance(value, str):
|
||||||
ip_address = value
|
router_ip = value
|
||||||
|
|
||||||
return {
|
return {
|
||||||
WAN_STATUS: status_info[0] if status_info is not None else None,
|
WAN_STATUS: status_info[0] if status_info is not None else None,
|
||||||
ROUTER_UPTIME: status_info[2] if status_info is not None else None,
|
ROUTER_UPTIME: status_info[2] if status_info is not None else None,
|
||||||
ROUTER_IP: ip_address,
|
ROUTER_IP: router_ip,
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
"name": "UPnP/IGD",
|
"name": "UPnP/IGD",
|
||||||
"config_flow": true,
|
"config_flow": true,
|
||||||
"documentation": "https://www.home-assistant.io/integrations/upnp",
|
"documentation": "https://www.home-assistant.io/integrations/upnp",
|
||||||
"requirements": ["async-upnp-client==0.27.0"],
|
"requirements": ["async-upnp-client==0.27.0", "getmac==0.8.2"],
|
||||||
"dependencies": ["network", "ssdp"],
|
"dependencies": ["network", "ssdp"],
|
||||||
"codeowners": ["@StevenLooman", "@ehendrix23"],
|
"codeowners": ["@StevenLooman", "@ehendrix23"],
|
||||||
"ssdp": [
|
"ssdp": [
|
||||||
|
|
|
@ -700,6 +700,7 @@ georss_qld_bushfire_alert_client==0.5
|
||||||
# homeassistant.components.minecraft_server
|
# homeassistant.components.minecraft_server
|
||||||
# homeassistant.components.nmap_tracker
|
# homeassistant.components.nmap_tracker
|
||||||
# homeassistant.components.samsungtv
|
# homeassistant.components.samsungtv
|
||||||
|
# homeassistant.components.upnp
|
||||||
getmac==0.8.2
|
getmac==0.8.2
|
||||||
|
|
||||||
# homeassistant.components.gios
|
# homeassistant.components.gios
|
||||||
|
|
|
@ -494,6 +494,7 @@ georss_qld_bushfire_alert_client==0.5
|
||||||
# homeassistant.components.minecraft_server
|
# homeassistant.components.minecraft_server
|
||||||
# homeassistant.components.nmap_tracker
|
# homeassistant.components.nmap_tracker
|
||||||
# homeassistant.components.samsungtv
|
# homeassistant.components.samsungtv
|
||||||
|
# homeassistant.components.upnp
|
||||||
getmac==0.8.2
|
getmac==0.8.2
|
||||||
|
|
||||||
# homeassistant.components.gios
|
# homeassistant.components.gios
|
||||||
|
|
|
@ -14,6 +14,9 @@ from homeassistant.components import ssdp
|
||||||
from homeassistant.components.upnp.const import (
|
from homeassistant.components.upnp.const import (
|
||||||
BYTES_RECEIVED,
|
BYTES_RECEIVED,
|
||||||
BYTES_SENT,
|
BYTES_SENT,
|
||||||
|
CONFIG_ENTRY_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN,
|
||||||
CONFIG_ENTRY_ST,
|
CONFIG_ENTRY_ST,
|
||||||
CONFIG_ENTRY_UDN,
|
CONFIG_ENTRY_UDN,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
|
@ -34,9 +37,11 @@ TEST_USN = f"{TEST_UDN}::{TEST_ST}"
|
||||||
TEST_LOCATION = "http://192.168.1.1/desc.xml"
|
TEST_LOCATION = "http://192.168.1.1/desc.xml"
|
||||||
TEST_HOSTNAME = urlparse(TEST_LOCATION).hostname
|
TEST_HOSTNAME = urlparse(TEST_LOCATION).hostname
|
||||||
TEST_FRIENDLY_NAME = "mock-name"
|
TEST_FRIENDLY_NAME = "mock-name"
|
||||||
|
TEST_MAC_ADDRESS = "00:11:22:33:44:55"
|
||||||
TEST_DISCOVERY = ssdp.SsdpServiceInfo(
|
TEST_DISCOVERY = ssdp.SsdpServiceInfo(
|
||||||
ssdp_usn=TEST_USN,
|
|
||||||
ssdp_st=TEST_ST,
|
ssdp_st=TEST_ST,
|
||||||
|
ssdp_udn=TEST_UDN,
|
||||||
|
ssdp_usn=TEST_USN,
|
||||||
ssdp_location=TEST_LOCATION,
|
ssdp_location=TEST_LOCATION,
|
||||||
upnp={
|
upnp={
|
||||||
"_udn": TEST_UDN,
|
"_udn": TEST_UDN,
|
||||||
|
@ -210,6 +215,26 @@ def mock_upnp_device():
|
||||||
yield mock_async_create_upnp_device, mock_igd_device
|
yield mock_async_create_upnp_device, mock_igd_device
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_mac_address_from_host():
|
||||||
|
"""Get mac address."""
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.upnp.device.get_mac_address",
|
||||||
|
return_value=TEST_MAC_ADDRESS,
|
||||||
|
):
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_no_mac_address_from_host():
|
||||||
|
"""Get no mac address."""
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.upnp.device.get_mac_address",
|
||||||
|
return_value=None,
|
||||||
|
):
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def mock_setup_entry():
|
def mock_setup_entry():
|
||||||
"""Mock async_setup_entry."""
|
"""Mock async_setup_entry."""
|
||||||
|
@ -272,15 +297,23 @@ async def ssdp_no_discovery():
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
async def setup_integration(
|
async def config_entry(
|
||||||
hass: HomeAssistant, mock_get_source_ip, ssdp_instant_discovery, mock_upnp_device
|
hass: HomeAssistant,
|
||||||
|
mock_get_source_ip,
|
||||||
|
ssdp_instant_discovery,
|
||||||
|
mock_upnp_device,
|
||||||
|
mock_mac_address_from_host,
|
||||||
):
|
):
|
||||||
"""Create an initialized integration."""
|
"""Create an initialized integration."""
|
||||||
entry = MockConfigEntry(
|
entry = MockConfigEntry(
|
||||||
domain=DOMAIN,
|
domain=DOMAIN,
|
||||||
|
unique_id=TEST_USN,
|
||||||
data={
|
data={
|
||||||
CONFIG_ENTRY_UDN: TEST_UDN,
|
|
||||||
CONFIG_ENTRY_ST: TEST_ST,
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -16,9 +16,7 @@ from .conftest import MockIgdDevice
|
||||||
from tests.common import MockConfigEntry, async_fire_time_changed
|
from tests.common import MockConfigEntry, async_fire_time_changed
|
||||||
|
|
||||||
|
|
||||||
async def test_upnp_binary_sensors(
|
async def test_upnp_binary_sensors(hass: HomeAssistant, config_entry: MockConfigEntry):
|
||||||
hass: HomeAssistant, setup_integration: MockConfigEntry
|
|
||||||
):
|
|
||||||
"""Test normal sensors."""
|
"""Test normal sensors."""
|
||||||
# First poll.
|
# First poll.
|
||||||
wan_status_state = hass.states.get("binary_sensor.mock_name_wan_status")
|
wan_status_state = hass.states.get("binary_sensor.mock_name_wan_status")
|
||||||
|
@ -26,7 +24,7 @@ async def test_upnp_binary_sensors(
|
||||||
|
|
||||||
# Second poll.
|
# Second poll.
|
||||||
mock_device: MockIgdDevice = hass.data[DOMAIN][
|
mock_device: MockIgdDevice = hass.data[DOMAIN][
|
||||||
setup_integration.entry_id
|
config_entry.entry_id
|
||||||
].device._igd_device
|
].device._igd_device
|
||||||
mock_device.status_data = {
|
mock_device.status_data = {
|
||||||
WAN_STATUS: "Disconnected",
|
WAN_STATUS: "Disconnected",
|
||||||
|
|
|
@ -1,11 +1,16 @@
|
||||||
"""Test UPnP/IGD config flow."""
|
"""Test UPnP/IGD config flow."""
|
||||||
|
|
||||||
|
from copy import deepcopy
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from homeassistant import config_entries, data_entry_flow
|
from homeassistant import config_entries, data_entry_flow
|
||||||
from homeassistant.components import ssdp
|
from homeassistant.components import ssdp
|
||||||
from homeassistant.components.upnp.const import (
|
from homeassistant.components.upnp.const import (
|
||||||
CONFIG_ENTRY_HOSTNAME,
|
CONFIG_ENTRY_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN,
|
||||||
CONFIG_ENTRY_ST,
|
CONFIG_ENTRY_ST,
|
||||||
CONFIG_ENTRY_UDN,
|
CONFIG_ENTRY_UDN,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
|
@ -15,8 +20,8 @@ from homeassistant.core import HomeAssistant
|
||||||
from .conftest import (
|
from .conftest import (
|
||||||
TEST_DISCOVERY,
|
TEST_DISCOVERY,
|
||||||
TEST_FRIENDLY_NAME,
|
TEST_FRIENDLY_NAME,
|
||||||
TEST_HOSTNAME,
|
|
||||||
TEST_LOCATION,
|
TEST_LOCATION,
|
||||||
|
TEST_MAC_ADDRESS,
|
||||||
TEST_ST,
|
TEST_ST,
|
||||||
TEST_UDN,
|
TEST_UDN,
|
||||||
TEST_USN,
|
TEST_USN,
|
||||||
|
@ -26,7 +31,10 @@ from tests.common import MockConfigEntry
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures(
|
@pytest.mark.usefixtures(
|
||||||
"ssdp_instant_discovery", "mock_setup_entry", "mock_get_source_ip"
|
"ssdp_instant_discovery",
|
||||||
|
"mock_setup_entry",
|
||||||
|
"mock_get_source_ip",
|
||||||
|
"mock_mac_address_from_host",
|
||||||
)
|
)
|
||||||
async def test_flow_ssdp(hass: HomeAssistant):
|
async def test_flow_ssdp(hass: HomeAssistant):
|
||||||
"""Test config flow: discovered + configured through ssdp."""
|
"""Test config flow: discovered + configured through ssdp."""
|
||||||
|
@ -49,7 +57,9 @@ async def test_flow_ssdp(hass: HomeAssistant):
|
||||||
assert result["data"] == {
|
assert result["data"] == {
|
||||||
CONFIG_ENTRY_ST: TEST_ST,
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
CONFIG_ENTRY_UDN: TEST_UDN,
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME,
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -73,32 +83,225 @@ async def test_flow_ssdp_incomplete_discovery(hass: HomeAssistant):
|
||||||
assert result["reason"] == "incomplete_discovery"
|
assert result["reason"] == "incomplete_discovery"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("mock_get_source_ip")
|
@pytest.mark.usefixtures(
|
||||||
async def test_flow_ssdp_discovery_ignored(hass: HomeAssistant):
|
"ssdp_instant_discovery",
|
||||||
"""Test config flow: discovery through ssdp, but ignored, as hostname is used by existing config entry."""
|
"mock_setup_entry",
|
||||||
# Existing entry.
|
"mock_get_source_ip",
|
||||||
config_entry = MockConfigEntry(
|
"mock_no_mac_address_from_host",
|
||||||
domain=DOMAIN,
|
)
|
||||||
data={
|
async def test_flow_ssdp_no_mac_address(hass: HomeAssistant):
|
||||||
CONFIG_ENTRY_UDN: TEST_UDN + "2",
|
"""Test config flow: discovered + configured through ssdp."""
|
||||||
CONFIG_ENTRY_ST: TEST_ST,
|
# Discovered via step ssdp.
|
||||||
CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME,
|
result = await hass.config_entries.flow.async_init(
|
||||||
},
|
DOMAIN,
|
||||||
|
context={"source": config_entries.SOURCE_SSDP},
|
||||||
|
data=TEST_DISCOVERY,
|
||||||
)
|
)
|
||||||
config_entry.add_to_hass(hass)
|
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||||
|
assert result["step_id"] == "ssdp_confirm"
|
||||||
|
|
||||||
|
# Confirm via step ssdp_confirm.
|
||||||
|
result = await hass.config_entries.flow.async_configure(
|
||||||
|
result["flow_id"],
|
||||||
|
user_input={},
|
||||||
|
)
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||||
|
assert result["title"] == TEST_FRIENDLY_NAME
|
||||||
|
assert result["data"] == {
|
||||||
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: None,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("mock_mac_address_from_host")
|
||||||
|
async def test_flow_ssdp_discovery_changed_udn(hass: HomeAssistant):
|
||||||
|
"""Test config flow: discovery through ssdp, same device, but new UDN, matched on mac address."""
|
||||||
|
entry = MockConfigEntry(
|
||||||
|
domain=DOMAIN,
|
||||||
|
unique_id=TEST_USN,
|
||||||
|
data={
|
||||||
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
|
},
|
||||||
|
source=config_entries.SOURCE_SSDP,
|
||||||
|
state=config_entries.ConfigEntryState.LOADED,
|
||||||
|
)
|
||||||
|
entry.add_to_hass(hass)
|
||||||
|
|
||||||
|
# New discovery via step ssdp.
|
||||||
|
new_udn = TEST_UDN + "2"
|
||||||
|
new_discovery = deepcopy(TEST_DISCOVERY)
|
||||||
|
new_discovery.ssdp_usn = f"{new_udn}::{TEST_ST}"
|
||||||
|
new_discovery.upnp["_udn"] = new_udn
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN,
|
||||||
|
context={"source": config_entries.SOURCE_SSDP},
|
||||||
|
data=new_discovery,
|
||||||
|
)
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
|
||||||
|
assert result["reason"] == "config_entry_updated"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures(
|
||||||
|
"ssdp_instant_discovery",
|
||||||
|
"mock_setup_entry",
|
||||||
|
"mock_get_source_ip",
|
||||||
|
)
|
||||||
|
async def test_flow_ssdp_discovery_changed_udn_but_st_differs(hass: HomeAssistant):
|
||||||
|
"""Test config flow: discovery through ssdp, same device, but new UDN, and different ST, so not matched --> new discovery."""
|
||||||
|
entry = MockConfigEntry(
|
||||||
|
domain=DOMAIN,
|
||||||
|
unique_id=TEST_USN,
|
||||||
|
data={
|
||||||
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
|
},
|
||||||
|
source=config_entries.SOURCE_SSDP,
|
||||||
|
state=config_entries.ConfigEntryState.LOADED,
|
||||||
|
)
|
||||||
|
entry.add_to_hass(hass)
|
||||||
|
|
||||||
|
# UDN + mac address different: New discovery via step ssdp.
|
||||||
|
new_udn = TEST_UDN + "2"
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.upnp.device.get_mac_address",
|
||||||
|
return_value=TEST_MAC_ADDRESS + "2",
|
||||||
|
):
|
||||||
|
new_discovery = deepcopy(TEST_DISCOVERY)
|
||||||
|
new_discovery.ssdp_usn = f"{new_udn}::{TEST_ST}"
|
||||||
|
new_discovery.upnp["_udn"] = new_udn
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN,
|
||||||
|
context={"source": config_entries.SOURCE_SSDP},
|
||||||
|
data=new_discovery,
|
||||||
|
)
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||||
|
assert result["step_id"] == "ssdp_confirm"
|
||||||
|
|
||||||
|
# UDN + ST different: New discovery via step ssdp.
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.upnp.device.get_mac_address",
|
||||||
|
return_value=TEST_MAC_ADDRESS,
|
||||||
|
):
|
||||||
|
new_st = TEST_ST + "2"
|
||||||
|
new_discovery = deepcopy(TEST_DISCOVERY)
|
||||||
|
new_discovery.ssdp_usn = f"{new_udn}::{new_st}"
|
||||||
|
new_discovery.ssdp_st = new_st
|
||||||
|
new_discovery.upnp["_udn"] = new_udn
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN,
|
||||||
|
context={"source": config_entries.SOURCE_SSDP},
|
||||||
|
data=new_discovery,
|
||||||
|
)
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||||
|
assert result["step_id"] == "ssdp_confirm"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("mock_mac_address_from_host")
|
||||||
|
async def test_flow_ssdp_discovery_changed_location(hass: HomeAssistant):
|
||||||
|
"""Test config flow: discovery through ssdp, same device, but new location."""
|
||||||
|
entry = MockConfigEntry(
|
||||||
|
domain=DOMAIN,
|
||||||
|
unique_id=TEST_USN,
|
||||||
|
data={
|
||||||
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
|
},
|
||||||
|
source=config_entries.SOURCE_SSDP,
|
||||||
|
state=config_entries.ConfigEntryState.LOADED,
|
||||||
|
)
|
||||||
|
entry.add_to_hass(hass)
|
||||||
|
|
||||||
|
# Discovery via step ssdp.
|
||||||
|
new_location = TEST_DISCOVERY.ssdp_location + "2"
|
||||||
|
new_discovery = deepcopy(TEST_DISCOVERY)
|
||||||
|
new_discovery.ssdp_location = new_location
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN,
|
||||||
|
context={"source": config_entries.SOURCE_SSDP},
|
||||||
|
data=new_discovery,
|
||||||
|
)
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
|
||||||
|
assert result["reason"] == "already_configured"
|
||||||
|
|
||||||
|
# Test if location is updated.
|
||||||
|
assert entry.data[CONFIG_ENTRY_LOCATION] == new_location
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("mock_mac_address_from_host")
|
||||||
|
async def test_flow_ssdp_discovery_ignored_entry(hass: HomeAssistant):
|
||||||
|
"""Test config flow: discovery through ssdp, same device, but new UDN, matched on mac address."""
|
||||||
|
entry = MockConfigEntry(
|
||||||
|
domain=DOMAIN,
|
||||||
|
unique_id=TEST_USN,
|
||||||
|
data={
|
||||||
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
|
},
|
||||||
|
source=config_entries.SOURCE_IGNORE,
|
||||||
|
)
|
||||||
|
entry.add_to_hass(hass)
|
||||||
|
|
||||||
# Discovered via step ssdp, but ignored.
|
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
context={"source": config_entries.SOURCE_SSDP},
|
context={"source": config_entries.SOURCE_SSDP},
|
||||||
data=TEST_DISCOVERY,
|
data=TEST_DISCOVERY,
|
||||||
)
|
)
|
||||||
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
|
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
|
||||||
|
assert result["reason"] == "already_configured"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("mock_mac_address_from_host")
|
||||||
|
async def test_flow_ssdp_discovery_changed_udn_ignored_entry(hass: HomeAssistant):
|
||||||
|
"""Test config flow: discovery through ssdp, same device, but new UDN, matched on mac address, entry ignored."""
|
||||||
|
entry = MockConfigEntry(
|
||||||
|
domain=DOMAIN,
|
||||||
|
unique_id=TEST_USN,
|
||||||
|
data={
|
||||||
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
|
},
|
||||||
|
source=config_entries.SOURCE_IGNORE,
|
||||||
|
)
|
||||||
|
entry.add_to_hass(hass)
|
||||||
|
|
||||||
|
# New discovery via step ssdp.
|
||||||
|
new_udn = TEST_UDN + "2"
|
||||||
|
new_discovery = deepcopy(TEST_DISCOVERY)
|
||||||
|
new_discovery.ssdp_usn = f"{new_udn}::{TEST_ST}"
|
||||||
|
new_discovery.upnp["_udn"] = new_udn
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN,
|
||||||
|
context={"source": config_entries.SOURCE_SSDP},
|
||||||
|
data=new_discovery,
|
||||||
|
)
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
|
||||||
assert result["reason"] == "discovery_ignored"
|
assert result["reason"] == "discovery_ignored"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures(
|
@pytest.mark.usefixtures(
|
||||||
"ssdp_instant_discovery", "mock_setup_entry", "mock_get_source_ip"
|
"ssdp_instant_discovery",
|
||||||
|
"mock_setup_entry",
|
||||||
|
"mock_get_source_ip",
|
||||||
|
"mock_mac_address_from_host",
|
||||||
)
|
)
|
||||||
async def test_flow_user(hass: HomeAssistant):
|
async def test_flow_user(hass: HomeAssistant):
|
||||||
"""Test config flow: discovered + configured through user."""
|
"""Test config flow: discovered + configured through user."""
|
||||||
|
@ -119,12 +322,32 @@ async def test_flow_user(hass: HomeAssistant):
|
||||||
assert result["data"] == {
|
assert result["data"] == {
|
||||||
CONFIG_ENTRY_ST: TEST_ST,
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
CONFIG_ENTRY_UDN: TEST_UDN,
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME,
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures(
|
@pytest.mark.usefixtures(
|
||||||
"ssdp_instant_discovery", "mock_setup_entry", "mock_get_source_ip"
|
"ssdp_no_discovery",
|
||||||
|
"mock_setup_entry",
|
||||||
|
"mock_get_source_ip",
|
||||||
|
"mock_mac_address_from_host",
|
||||||
|
)
|
||||||
|
async def test_flow_user_no_discovery(hass: HomeAssistant):
|
||||||
|
"""Test config flow: user, but no discovery."""
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||||
|
)
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
|
||||||
|
assert result["reason"] == "no_devices_found"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures(
|
||||||
|
"ssdp_instant_discovery",
|
||||||
|
"mock_setup_entry",
|
||||||
|
"mock_get_source_ip",
|
||||||
|
"mock_mac_address_from_host",
|
||||||
)
|
)
|
||||||
async def test_flow_import(hass: HomeAssistant):
|
async def test_flow_import(hass: HomeAssistant):
|
||||||
"""Test config flow: configured through configuration.yaml."""
|
"""Test config flow: configured through configuration.yaml."""
|
||||||
|
@ -137,23 +360,62 @@ async def test_flow_import(hass: HomeAssistant):
|
||||||
assert result["data"] == {
|
assert result["data"] == {
|
||||||
CONFIG_ENTRY_ST: TEST_ST,
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
CONFIG_ENTRY_UDN: TEST_UDN,
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME,
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures(
|
||||||
|
"mock_get_source_ip",
|
||||||
|
)
|
||||||
|
async def test_flow_import_incomplete_discovery(hass: HomeAssistant):
|
||||||
|
"""Test config flow: configured through configuration.yaml, but incomplete discovery."""
|
||||||
|
incomplete_discovery = ssdp.SsdpServiceInfo(
|
||||||
|
ssdp_usn=TEST_USN,
|
||||||
|
ssdp_st=TEST_ST,
|
||||||
|
ssdp_location=TEST_LOCATION,
|
||||||
|
upnp={
|
||||||
|
# ssdp.ATTR_UPNP_UDN: TEST_UDN, # Not provided.
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
async def register_callback(hass, callback, match_dict):
|
||||||
|
"""Immediately do callback."""
|
||||||
|
await callback(incomplete_discovery, ssdp.SsdpChange.ALIVE)
|
||||||
|
return MagicMock()
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.ssdp.async_register_callback",
|
||||||
|
side_effect=register_callback,
|
||||||
|
), patch(
|
||||||
|
"homeassistant.components.upnp.ssdp.async_get_discovery_info_by_st",
|
||||||
|
return_value=[incomplete_discovery],
|
||||||
|
):
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
|
||||||
|
)
|
||||||
|
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
|
||||||
|
assert result["reason"] == "incomplete_discovery"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("ssdp_instant_discovery", "mock_get_source_ip")
|
@pytest.mark.usefixtures("ssdp_instant_discovery", "mock_get_source_ip")
|
||||||
async def test_flow_import_already_configured(hass: HomeAssistant):
|
async def test_flow_import_already_configured(hass: HomeAssistant):
|
||||||
"""Test config flow: configured through configuration.yaml, but existing config entry."""
|
"""Test config flow: configured through configuration.yaml, but existing config entry."""
|
||||||
# Existing entry.
|
# Existing entry.
|
||||||
config_entry = MockConfigEntry(
|
entry = MockConfigEntry(
|
||||||
domain=DOMAIN,
|
domain=DOMAIN,
|
||||||
|
unique_id=TEST_USN,
|
||||||
data={
|
data={
|
||||||
CONFIG_ENTRY_UDN: TEST_UDN,
|
|
||||||
CONFIG_ENTRY_ST: TEST_ST,
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
CONFIG_ENTRY_HOSTNAME: TEST_HOSTNAME,
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
},
|
},
|
||||||
|
state=config_entries.ConfigEntryState.LOADED,
|
||||||
)
|
)
|
||||||
config_entry.add_to_hass(hass)
|
entry.add_to_hass(hass)
|
||||||
|
|
||||||
# Discovered via step import.
|
# Discovered via step import.
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
|
|
@ -3,29 +3,35 @@ from __future__ import annotations
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from homeassistant.components import ssdp
|
|
||||||
from homeassistant.components.upnp import UpnpDataUpdateCoordinator
|
|
||||||
from homeassistant.components.upnp.const import (
|
from homeassistant.components.upnp.const import (
|
||||||
|
CONFIG_ENTRY_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN,
|
||||||
CONFIG_ENTRY_ST,
|
CONFIG_ENTRY_ST,
|
||||||
CONFIG_ENTRY_UDN,
|
CONFIG_ENTRY_UDN,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
)
|
)
|
||||||
from homeassistant.components.upnp.device import Device
|
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import HomeAssistant
|
||||||
|
|
||||||
from .conftest import TEST_DISCOVERY, TEST_ST, TEST_UDN
|
from .conftest import TEST_LOCATION, TEST_MAC_ADDRESS, TEST_ST, TEST_UDN, TEST_USN
|
||||||
|
|
||||||
from tests.common import MockConfigEntry
|
from tests.common import MockConfigEntry
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("ssdp_instant_discovery", "mock_get_source_ip")
|
@pytest.mark.usefixtures(
|
||||||
|
"ssdp_instant_discovery", "mock_get_source_ip", "mock_mac_address_from_host"
|
||||||
|
)
|
||||||
async def test_async_setup_entry_default(hass: HomeAssistant):
|
async def test_async_setup_entry_default(hass: HomeAssistant):
|
||||||
"""Test async_setup_entry."""
|
"""Test async_setup_entry."""
|
||||||
entry = MockConfigEntry(
|
entry = MockConfigEntry(
|
||||||
domain=DOMAIN,
|
domain=DOMAIN,
|
||||||
|
unique_id=TEST_USN,
|
||||||
data={
|
data={
|
||||||
CONFIG_ENTRY_UDN: TEST_UDN,
|
|
||||||
CONFIG_ENTRY_ST: TEST_ST,
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -34,24 +40,23 @@ async def test_async_setup_entry_default(hass: HomeAssistant):
|
||||||
assert await hass.config_entries.async_setup(entry.entry_id) is True
|
assert await hass.config_entries.async_setup(entry.entry_id) is True
|
||||||
|
|
||||||
|
|
||||||
async def test_reinitialize_device(
|
@pytest.mark.usefixtures(
|
||||||
hass: HomeAssistant, setup_integration: MockConfigEntry
|
"ssdp_instant_discovery", "mock_get_source_ip", "mock_no_mac_address_from_host"
|
||||||
):
|
)
|
||||||
"""Test device is reinitialized when device changes location."""
|
async def test_async_setup_entry_default_no_mac_address(hass: HomeAssistant):
|
||||||
config_entry = setup_integration
|
"""Test async_setup_entry."""
|
||||||
coordinator: UpnpDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
|
entry = MockConfigEntry(
|
||||||
device: Device = coordinator.device
|
domain=DOMAIN,
|
||||||
assert device._igd_device.device.device_url == TEST_DISCOVERY.ssdp_location
|
unique_id=TEST_USN,
|
||||||
|
data={
|
||||||
# Reinit.
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
new_location = "http://192.168.1.1:12345/desc.xml"
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
await device.async_ssdp_callback(
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
ssdp.SsdpServiceInfo(
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
ssdp_usn="mock_usn",
|
CONFIG_ENTRY_MAC_ADDRESS: None,
|
||||||
ssdp_st="mock_st",
|
},
|
||||||
ssdp_location="http://192.168.1.1:12345/desc.xml",
|
|
||||||
upnp={},
|
|
||||||
),
|
|
||||||
...,
|
|
||||||
)
|
)
|
||||||
assert device._igd_device.device.device_url == new_location
|
|
||||||
|
# Load config_entry.
|
||||||
|
entry.add_to_hass(hass)
|
||||||
|
assert await hass.config_entries.async_setup(entry.entry_id) is True
|
||||||
|
|
|
@ -25,7 +25,7 @@ from .conftest import MockIgdDevice
|
||||||
from tests.common import MockConfigEntry, async_fire_time_changed
|
from tests.common import MockConfigEntry, async_fire_time_changed
|
||||||
|
|
||||||
|
|
||||||
async def test_upnp_sensors(hass: HomeAssistant, setup_integration: MockConfigEntry):
|
async def test_upnp_sensors(hass: HomeAssistant, config_entry: MockConfigEntry):
|
||||||
"""Test normal sensors."""
|
"""Test normal sensors."""
|
||||||
# First poll.
|
# First poll.
|
||||||
b_received_state = hass.states.get("sensor.mock_name_b_received")
|
b_received_state = hass.states.get("sensor.mock_name_b_received")
|
||||||
|
@ -43,7 +43,7 @@ async def test_upnp_sensors(hass: HomeAssistant, setup_integration: MockConfigEn
|
||||||
|
|
||||||
# Second poll.
|
# Second poll.
|
||||||
mock_device: MockIgdDevice = hass.data[DOMAIN][
|
mock_device: MockIgdDevice = hass.data[DOMAIN][
|
||||||
setup_integration.entry_id
|
config_entry.entry_id
|
||||||
].device._igd_device
|
].device._igd_device
|
||||||
mock_device.traffic_data = {
|
mock_device.traffic_data = {
|
||||||
BYTES_RECEIVED: 10240,
|
BYTES_RECEIVED: 10240,
|
||||||
|
@ -74,13 +74,9 @@ async def test_upnp_sensors(hass: HomeAssistant, setup_integration: MockConfigEn
|
||||||
assert wan_status_state.state == "Disconnected"
|
assert wan_status_state.state == "Disconnected"
|
||||||
|
|
||||||
|
|
||||||
async def test_derived_upnp_sensors(
|
async def test_derived_upnp_sensors(hass: HomeAssistant, config_entry: MockConfigEntry):
|
||||||
hass: HomeAssistant, setup_integration: MockConfigEntry
|
|
||||||
):
|
|
||||||
"""Test derived sensors."""
|
"""Test derived sensors."""
|
||||||
coordinator: UpnpDataUpdateCoordinator = hass.data[DOMAIN][
|
coordinator: UpnpDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
|
||||||
setup_integration.entry_id
|
|
||||||
]
|
|
||||||
|
|
||||||
# First poll.
|
# First poll.
|
||||||
kib_s_received_state = hass.states.get("sensor.mock_name_kib_s_received")
|
kib_s_received_state = hass.states.get("sensor.mock_name_kib_s_received")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue