Move parts of ssdp to async_upnp_client (#55540)

* Move parts of ssdp to async_upnp_client

* Fix test for environments with multiple sources

* Fix sonos tests

* More fixes/changes

* More fixes

* Use async_upnp_client==0.21.0

* Pylint/test fixes

* More changes after review

* Fix tests

* Improve testing

* Fix mypy

* Fix yamaha_musiccast tests?

* Changes after review

* Pylint

* Reduce calls to combined_headers

* Update to async_upnp_client==0.21.1

* Update to async_upnp_client==0.21.2

* use as_dict

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Steven Looman 2021-09-12 01:38:16 +02:00 committed by GitHub
parent a4a6bf8a85
commit 73260c5b88
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 890 additions and 1558 deletions

View file

@ -990,7 +990,6 @@ omit =
homeassistant/components/squeezebox/__init__.py
homeassistant/components/squeezebox/browse_media.py
homeassistant/components/squeezebox/media_player.py
homeassistant/components/ssdp/util.py
homeassistant/components/starline/*
homeassistant/components/starlingbank/sensor.py
homeassistant/components/steam_online/sensor.py

View file

@ -2,7 +2,7 @@
"domain": "dlna_dmr",
"name": "DLNA Digital Media Renderer",
"documentation": "https://www.home-assistant.io/integrations/dlna_dmr",
"requirements": ["async-upnp-client==0.20.0"],
"requirements": ["async-upnp-client==0.21.2"],
"dependencies": ["network"],
"codeowners": [],
"iot_class": "local_push"

View file

@ -263,8 +263,10 @@ class SonosDiscoveryManager:
else:
async_dispatcher_send(self.hass, f"{SONOS_SEEN}-{uid}")
@callback
def _async_ssdp_discovered_player(self, info):
async def _async_ssdp_discovered_player(self, info, change):
if change == ssdp.SsdpChange.BYEBYE:
return
discovered_ip = urlparse(info[ssdp.ATTR_SSDP_LOCATION]).hostname
boot_seqnum = info.get("X-RINCON-BOOTSEQ")
uid = info.get(ssdp.ATTR_UPNP_UDN)
@ -316,7 +318,7 @@ class SonosDiscoveryManager:
return
self.entry.async_on_unload(
ssdp.async_register_callback(
await ssdp.async_register_callback(
self.hass, self._async_ssdp_discovered_player, {"st": UPNP_ST}
)
)

View file

@ -2,14 +2,18 @@
from __future__ import annotations
import asyncio
from collections.abc import Mapping
from collections.abc import Awaitable
from datetime import timedelta
from enum import Enum
from ipaddress import IPv4Address, IPv6Address
import logging
from typing import Any, Callable
from typing import Any, Callable, Mapping
from async_upnp_client.search import SSDPListener
from async_upnp_client.aiohttp import AiohttpSessionRequester
from async_upnp_client.const import DeviceOrServiceType, SsdpHeaders, SsdpSource
from async_upnp_client.description_cache import DescriptionCache
from async_upnp_client.ssdp import SSDP_PORT
from async_upnp_client.ssdp_listener import SsdpDevice, SsdpListener
from async_upnp_client.utils import CaseInsensitiveDict
from homeassistant import config_entries
@ -19,12 +23,12 @@ from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP,
MATCH_ALL,
)
from homeassistant.core import CoreState, HomeAssistant, callback as core_callback
from homeassistant.core import HomeAssistant, callback as core_callback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import async_get_ssdp, bind_hass
from .descriptions import DescriptionManager
from .flow import FlowDispatcher, SSDPFlow
DOMAIN = "ssdp"
@ -61,14 +65,25 @@ DISCOVERY_MAPPING = {
"location": ATTR_SSDP_LOCATION,
}
SsdpChange = Enum("SsdpChange", "ALIVE BYEBYE UPDATE")
SsdpCallback = Callable[[Mapping[str, Any], SsdpChange], Awaitable]
SSDP_SOURCE_SSDP_CHANGE_MAPPING: Mapping[SsdpSource, SsdpChange] = {
SsdpSource.SEARCH: SsdpChange.ALIVE,
SsdpSource.ADVERTISEMENT_ALIVE: SsdpChange.ALIVE,
SsdpSource.ADVERTISEMENT_BYEBYE: SsdpChange.BYEBYE,
SsdpSource.ADVERTISEMENT_UPDATE: SsdpChange.UPDATE,
}
_LOGGER = logging.getLogger(__name__)
@bind_hass
def async_register_callback(
async def async_register_callback(
hass: HomeAssistant,
callback: Callable[[dict], None],
callback: SsdpCallback,
match_dict: None | dict[str, str] = None,
) -> Callable[[], None]:
"""Register to receive a callback on ssdp broadcast.
@ -76,60 +91,61 @@ def async_register_callback(
Returns a callback that can be used to cancel the registration.
"""
scanner: Scanner = hass.data[DOMAIN]
return scanner.async_register_callback(callback, match_dict)
return await scanner.async_register_callback(callback, match_dict)
@bind_hass
def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
async def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
hass: HomeAssistant, udn: str, st: str
) -> dict[str, str] | None:
"""Fetch the discovery info cache."""
scanner: Scanner = hass.data[DOMAIN]
return scanner.async_get_discovery_info_by_udn_st(udn, st)
return await scanner.async_get_discovery_info_by_udn_st(udn, st)
@bind_hass
def async_get_discovery_info_by_st( # pylint: disable=invalid-name
async def async_get_discovery_info_by_st( # pylint: disable=invalid-name
hass: HomeAssistant, st: str
) -> list[dict[str, str]]:
"""Fetch all the entries matching the st."""
scanner: Scanner = hass.data[DOMAIN]
return scanner.async_get_discovery_info_by_st(st)
return await scanner.async_get_discovery_info_by_st(st)
@bind_hass
def async_get_discovery_info_by_udn(
async def async_get_discovery_info_by_udn(
hass: HomeAssistant, udn: str
) -> list[dict[str, str]]:
"""Fetch all the entries matching the udn."""
scanner: Scanner = hass.data[DOMAIN]
return scanner.async_get_discovery_info_by_udn(udn)
return await scanner.async_get_discovery_info_by_udn(udn)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the SSDP integration."""
scanner = hass.data[DOMAIN] = Scanner(hass, await async_get_ssdp(hass))
scanner = hass.data[DOMAIN] = Scanner(hass)
asyncio.create_task(scanner.async_start())
return True
@core_callback
def _async_process_callbacks(
callbacks: list[Callable[[dict], None]], discovery_info: dict[str, str]
async def _async_process_callbacks(
callbacks: list[SsdpCallback],
discovery_info: dict[str, str],
ssdp_change: SsdpChange,
) -> None:
for callback in callbacks:
try:
callback(discovery_info)
await callback(discovery_info, ssdp_change)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Failed to callback info: %s", discovery_info)
@core_callback
def _async_headers_match(
headers: Mapping[str, str], match_dict: dict[str, str]
headers: Mapping[str, Any], match_dict: dict[str, str]
) -> bool:
for header, val in match_dict.items():
if val == MATCH_ALL:
@ -141,25 +157,39 @@ def _async_headers_match(
class Scanner:
"""Class to manage SSDP scanning."""
"""Class to manage SSDP searching and SSDP advertisements."""
def __init__(
self, hass: HomeAssistant, integration_matchers: dict[str, list[dict[str, str]]]
) -> None:
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize class."""
self.hass = hass
self.seen: set[tuple[str, str | None]] = set()
self.cache: dict[tuple[str, str], Mapping[str, str]] = {}
self._integration_matchers = integration_matchers
self._cancel_scan: Callable[[], None] | None = None
self._ssdp_listeners: list[SSDPListener] = []
self._callbacks: list[tuple[Callable[[dict], None], dict[str, str]]] = []
self.flow_dispatcher: FlowDispatcher | None = None
self.description_manager: DescriptionManager | None = None
self._ssdp_listeners: list[SsdpListener] = []
self._callbacks: list[tuple[SsdpCallback, dict[str, str]]] = []
self._flow_dispatcher: FlowDispatcher | None = None
self._description_cache: DescriptionCache | None = None
self._integration_matchers: dict[str, list[dict[str, str]]] | None = None
@core_callback
def async_register_callback(
self, callback: Callable[[dict], None], match_dict: None | dict[str, str] = None
@property
def _ssdp_devices(self) -> list[SsdpDevice]:
"""Get all seen devices."""
return [
ssdp_device
for ssdp_listener in self._ssdp_listeners
for ssdp_device in ssdp_listener.devices.values()
]
@property
def _all_headers_from_ssdp_devices(
self,
) -> dict[tuple[str, str], Mapping[str, Any]]:
return {
(ssdp_device.udn, dst): headers
for ssdp_device in self._ssdp_devices
for dst, headers in ssdp_device.all_combined_headers.items()
}
async def async_register_callback(
self, callback: SsdpCallback, match_dict: None | dict[str, str] = None
) -> Callable[[], None]:
"""Register a callback."""
if match_dict is None:
@ -167,12 +197,13 @@ class Scanner:
# Make sure any entries that happened
# before the callback was registered are fired
if self.hass.state != CoreState.running:
for headers in self.cache.values():
if _async_headers_match(headers, match_dict):
_async_process_callbacks(
[callback], self._async_headers_to_discovery_info(headers)
)
for headers in self._all_headers_from_ssdp_devices.values():
if _async_headers_match(headers, match_dict):
await _async_process_callbacks(
[callback],
await self._async_headers_to_discovery_info(headers),
SsdpChange.ALIVE,
)
callback_entry = (callback, match_dict)
self._callbacks.append(callback_entry)
@ -183,14 +214,19 @@ class Scanner:
return _async_remove_callback
@core_callback
def async_stop(self, *_: Any) -> None:
async def async_stop(self, *_: Any) -> None:
"""Stop the scanner."""
assert self._cancel_scan is not None
self._cancel_scan()
for listener in self._ssdp_listeners:
listener.async_stop()
self._ssdp_listeners = []
await self._async_stop_ssdp_listeners()
async def _async_stop_ssdp_listeners(self) -> None:
"""Stop the SSDP listeners."""
await asyncio.gather(
*(listener.async_stop() for listener in self._ssdp_listeners),
return_exceptions=True,
)
async def _async_build_source_set(self) -> set[IPv4Address | IPv6Address]:
"""Build the list of ssdp sources."""
@ -208,34 +244,57 @@ class Scanner:
}
async def async_scan(self, *_: Any) -> None:
"""Scan for new entries using ssdp default and broadcast target."""
"""Scan for new entries using ssdp listeners."""
await self.async_scan_multicast()
await self.async_scan_broadcast()
async def async_scan_multicast(self, *_: Any) -> None:
"""Scan for new entries using multicase target."""
for ssdp_listener in self._ssdp_listeners:
await ssdp_listener.async_search()
async def async_scan_broadcast(self, *_: Any) -> None:
"""Scan for new entries using broadcast target."""
# Some sonos devices only seem to respond if we send to the broadcast
# address. This matches pysonos' behavior
# https://github.com/amelchio/pysonos/blob/d4329b4abb657d106394ae69357805269708c996/pysonos/discovery.py#L120
for listener in self._ssdp_listeners:
listener.async_search()
try:
IPv4Address(listener.source_ip)
except ValueError:
continue
# Some sonos devices only seem to respond if we send to the broadcast
# address. This matches pysonos' behavior
# https://github.com/amelchio/pysonos/blob/d4329b4abb657d106394ae69357805269708c996/pysonos/discovery.py#L120
listener.async_search((str(IPV4_BROADCAST), SSDP_PORT))
await listener.async_search((str(IPV4_BROADCAST), SSDP_PORT))
async def async_start(self) -> None:
"""Start the scanner."""
self.description_manager = DescriptionManager(self.hass)
self.flow_dispatcher = FlowDispatcher(self.hass)
"""Start the scanners."""
session = async_get_clientsession(self.hass)
requester = AiohttpSessionRequester(session, True, 10)
self._description_cache = DescriptionCache(requester)
self._flow_dispatcher = FlowDispatcher(self.hass)
self._integration_matchers = await async_get_ssdp(self.hass)
await self._async_start_ssdp_listeners()
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stop)
self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STARTED, self._flow_dispatcher.async_start
)
self._cancel_scan = async_track_time_interval(
self.hass, self.async_scan, SCAN_INTERVAL
)
# Trigger the initial-scan.
await self.async_scan()
async def _async_start_ssdp_listeners(self) -> None:
"""Start the SSDP Listeners."""
for source_ip in await self._async_build_source_set():
self._ssdp_listeners.append(
SSDPListener(
async_connect_callback=self.async_scan,
async_callback=self._async_process_entry,
SsdpListener(
async_callback=self._ssdp_listener_callback,
source_ip=source_ip,
)
)
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stop)
self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STARTED, self.flow_dispatcher.async_start
)
results = await asyncio.gather(
*(listener.async_start() for listener in self._ssdp_listeners),
return_exceptions=True,
@ -251,135 +310,116 @@ class Scanner:
failed_listeners.append(self._ssdp_listeners[idx])
for listener in failed_listeners:
self._ssdp_listeners.remove(listener)
self._cancel_scan = async_track_time_interval(
self.hass, self.async_scan, SCAN_INTERVAL
)
@core_callback
def _async_get_matching_callbacks(
self, headers: Mapping[str, str]
) -> list[Callable[[dict], None]]:
self,
combined_headers: SsdpHeaders,
) -> list[SsdpCallback]:
"""Return a list of callbacks that match."""
return [
callback
for callback, match_dict in self._callbacks
if _async_headers_match(headers, match_dict)
if _async_headers_match(combined_headers, match_dict)
]
@core_callback
def _async_matching_domains(self, info_with_req: CaseInsensitiveDict) -> set[str]:
def _async_matching_domains(self, info_with_desc: CaseInsensitiveDict) -> set[str]:
assert self._integration_matchers is not None
domains = set()
for domain, matchers in self._integration_matchers.items():
for matcher in matchers:
if all(info_with_req.get(k) == v for (k, v) in matcher.items()):
if all(info_with_desc.get(k) == v for (k, v) in matcher.items()):
domains.add(domain)
return domains
def _async_seen(self, header_st: str | None, header_location: str | None) -> bool:
"""Check if we have seen a specific st and optional location."""
if header_st is None:
return True
return (header_st, header_location) in self.seen
async def _ssdp_listener_callback(
self, ssdp_device: SsdpDevice, dst: DeviceOrServiceType, source: SsdpSource
) -> None:
"""Handle a device/service change."""
_LOGGER.debug(
"Change, ssdp_device: %s, dst: %s, source: %s", ssdp_device, dst, source
)
def _async_see(self, header_st: str | None, header_location: str | None) -> None:
"""Mark a specific st and optional location as seen."""
if header_st is not None:
self.seen.add((header_st, header_location))
location = ssdp_device.location
info_desc = await self._async_get_description_dict(location) or {}
combined_headers = ssdp_device.combined_headers(dst)
info_with_desc = CaseInsensitiveDict(combined_headers, **info_desc)
discovery_info = discovery_info_from_headers_and_description(info_with_desc)
def _async_unsee(self, header_st: str | None, header_location: str | None) -> None:
"""If we see a device in a new location, unsee the original location."""
if header_st is not None:
self.seen.discard((header_st, header_location))
callbacks = self._async_get_matching_callbacks(combined_headers)
ssdp_change = SSDP_SOURCE_SSDP_CHANGE_MAPPING[source]
await _async_process_callbacks(callbacks, discovery_info, ssdp_change)
async def _async_process_entry(self, headers: Mapping[str, str]) -> None:
"""Process SSDP entries."""
_LOGGER.debug("_async_process_entry: %s", headers)
h_st = headers.get("st")
h_location = headers.get("location")
for domain in self._async_matching_domains(info_with_desc):
_LOGGER.debug("Discovered %s at %s", domain, location)
if h_st and (udn := _udn_from_usn(headers.get("usn"))):
cache_key = (udn, h_st)
if old_headers := self.cache.get(cache_key):
old_h_location = old_headers.get("location")
if h_location != old_h_location:
self._async_unsee(old_headers.get("st"), old_h_location)
self.cache[cache_key] = headers
callbacks = self._async_get_matching_callbacks(headers)
if self._async_seen(h_st, h_location) and not callbacks:
return
assert self.description_manager is not None
info_req = await self.description_manager.fetch_description(h_location) or {}
info_with_req = CaseInsensitiveDict(**headers, **info_req)
discovery_info = discovery_info_from_headers_and_request(info_with_req)
_async_process_callbacks(callbacks, discovery_info)
if self._async_seen(h_st, h_location):
return
self._async_see(h_st, h_location)
for domain in self._async_matching_domains(info_with_req):
_LOGGER.debug("Discovered %s at %s", domain, h_location)
flow: SSDPFlow = {
"domain": domain,
"context": {"source": config_entries.SOURCE_SSDP},
"data": discovery_info,
}
assert self.flow_dispatcher is not None
self.flow_dispatcher.create(flow)
assert self._flow_dispatcher is not None
self._flow_dispatcher.create(flow)
@core_callback
def _async_headers_to_discovery_info(
self, headers: Mapping[str, str]
) -> dict[str, str]:
async def _async_get_description_dict(
self, location: str | None
) -> Mapping[str, str]:
"""Get description dict."""
assert self._description_cache is not None
return await self._description_cache.async_get_description_dict(location) or {}
async def _async_headers_to_discovery_info(
self, headers: Mapping[str, Any]
) -> dict[str, Any]:
"""Combine the headers and description into discovery_info.
Building this is a bit expensive so we only do it on demand.
"""
assert self.description_manager is not None
assert self._description_cache is not None
location = headers["location"]
info_req = self.description_manager.async_cached_description(location) or {}
return discovery_info_from_headers_and_request(
CaseInsensitiveDict(**headers, **info_req)
info_desc = (
await self._description_cache.async_get_description_dict(location) or {}
)
return discovery_info_from_headers_and_description(
CaseInsensitiveDict(headers, **info_desc)
)
@core_callback
def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
async def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
self, udn: str, st: str
) -> dict[str, str] | None:
) -> dict[str, Any] | None:
"""Return discovery_info for a udn and st."""
if headers := self.cache.get((udn, st)):
return self._async_headers_to_discovery_info(headers)
if headers := self._all_headers_from_ssdp_devices.get((udn, st)):
return await self._async_headers_to_discovery_info(headers)
return None
@core_callback
def async_get_discovery_info_by_st( # pylint: disable=invalid-name
async def async_get_discovery_info_by_st( # pylint: disable=invalid-name
self, st: str
) -> list[dict[str, str]]:
) -> list[dict[str, Any]]:
"""Return matching discovery_infos for a st."""
return [
self._async_headers_to_discovery_info(headers)
for udn_st, headers in self.cache.items()
await self._async_headers_to_discovery_info(headers)
for udn_st, headers in self._all_headers_from_ssdp_devices.items()
if udn_st[1] == st
]
@core_callback
def async_get_discovery_info_by_udn(self, udn: str) -> list[dict[str, str]]:
async def async_get_discovery_info_by_udn(self, udn: str) -> list[dict[str, Any]]:
"""Return matching discovery_infos for a udn."""
return [
self._async_headers_to_discovery_info(headers)
for udn_st, headers in self.cache.items()
await self._async_headers_to_discovery_info(headers)
for udn_st, headers in self._all_headers_from_ssdp_devices.items()
if udn_st[0] == udn
]
def discovery_info_from_headers_and_request(
info_with_req: CaseInsensitiveDict,
) -> dict[str, str]:
def discovery_info_from_headers_and_description(
info_with_desc: CaseInsensitiveDict,
) -> dict[str, Any]:
"""Convert headers and description to discovery_info."""
info = {DISCOVERY_MAPPING.get(k.lower(), k): v for k, v in info_with_req.items()}
info = {
DISCOVERY_MAPPING.get(k.lower(), k): v
for k, v in info_with_desc.as_dict().items()
}
if ATTR_UPNP_UDN not in info and ATTR_SSDP_USN in info:
if udn := _udn_from_usn(info[ATTR_SSDP_USN]):

View file

@ -1,69 +0,0 @@
"""The SSDP integration."""
from __future__ import annotations
import asyncio
import logging
import aiohttp
from defusedxml import ElementTree
from homeassistant.core import HomeAssistant, callback
from .util import etree_to_dict
_LOGGER = logging.getLogger(__name__)
class DescriptionManager:
"""Class to cache and manage fetching descriptions."""
def __init__(self, hass: HomeAssistant) -> None:
"""Init the manager."""
self.hass = hass
self._description_cache: dict[str, None | dict[str, str]] = {}
async def fetch_description(
self, xml_location: str | None
) -> None | dict[str, str]:
"""Fetch the location or get it from the cache."""
if xml_location is None:
return None
if xml_location not in self._description_cache:
try:
self._description_cache[xml_location] = await self._fetch_description(
xml_location
)
except Exception: # pylint: disable=broad-except
# If it fails, cache the failure so we do not keep trying over and over
self._description_cache[xml_location] = None
_LOGGER.exception("Failed to fetch ssdp data from: %s", xml_location)
return self._description_cache[xml_location]
@callback
def async_cached_description(self, xml_location: str) -> None | dict[str, str]:
"""Fetch the description from the cache."""
return self._description_cache.get(xml_location)
async def _fetch_description(self, xml_location: str) -> None | dict[str, str]:
"""Fetch an XML description."""
session = self.hass.helpers.aiohttp_client.async_get_clientsession()
try:
for _ in range(2):
resp = await session.get(xml_location, timeout=5)
# Samsung Smart TV sometimes returns an empty document the
# first time. Retry once.
if xml := await resp.text(errors="replace"):
break
except (aiohttp.ClientError, asyncio.TimeoutError) as err:
_LOGGER.debug("Error fetching %s: %s", xml_location, err)
return None
try:
tree = ElementTree.fromstring(xml)
except ElementTree.ParseError as err:
_LOGGER.debug("Error parsing %s: %s", xml_location, err)
return None
root = etree_to_dict(tree).get("root") or {}
return root.get("device") or {}

View file

@ -4,7 +4,7 @@
"documentation": "https://www.home-assistant.io/integrations/ssdp",
"requirements": [
"defusedxml==0.7.1",
"async-upnp-client==0.20.0"
"async-upnp-client==0.21.2"
],
"dependencies": ["network"],
"after_dependencies": ["zeroconf"],

View file

@ -1,42 +0,0 @@
"""Util functions used by SSDP."""
from __future__ import annotations
from collections import defaultdict
from typing import Any
from defusedxml import ElementTree
# Adapted from http://stackoverflow.com/a/10077069
# to follow the XML to JSON spec
# https://www.xml.com/pub/a/2006/05/31/converting-between-xml-and-json.html
def etree_to_dict(tree: ElementTree) -> dict[str, dict[str, Any] | None]:
"""Convert an ETree object to a dict."""
# strip namespace
tag_name = tree.tag[tree.tag.find("}") + 1 :]
tree_dict: dict[str, dict[str, Any] | None] = {
tag_name: {} if tree.attrib else None
}
children = list(tree)
if children:
child_dict: dict[str, list] = defaultdict(list)
for child in map(etree_to_dict, children):
for k, val in child.items():
child_dict[k].append(val)
tree_dict = {
tag_name: {k: v[0] if len(v) == 1 else v for k, v in child_dict.items()}
}
dict_meta = tree_dict[tag_name]
if tree.attrib:
assert dict_meta is not None
dict_meta.update(("@" + k, v) for k, v in tree.attrib.items())
if tree.text:
text = tree.text.strip()
if children or tree.attrib:
if text:
assert dict_meta is not None
dict_meta["#text"] = text
else:
tree_dict[tag_name] = text
return tree_dict

View file

@ -5,7 +5,6 @@ import asyncio
from collections.abc import Mapping
from dataclasses import dataclass
from datetime import timedelta
from ipaddress import ip_address
from typing import Any
import voluptuous as vol
@ -13,13 +12,12 @@ import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.components.binary_sensor import BinarySensorEntityDescription
from homeassistant.components.network import async_get_source_ip
from homeassistant.components.network.const import PUBLIC_TARGET_IP
from homeassistant.components.sensor import SensorEntityDescription
from homeassistant.components.ssdp import SsdpChange
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv, device_registry as dr
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.typing import ConfigType
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
@ -27,7 +25,6 @@ from homeassistant.helpers.update_coordinator import (
)
from .const import (
CONF_LOCAL_IP,
CONFIG_ENTRY_HOSTNAME,
CONFIG_ENTRY_SCAN_INTERVAL,
CONFIG_ENTRY_ST,
@ -36,7 +33,6 @@ from .const import (
DOMAIN,
DOMAIN_CONFIG,
DOMAIN_DEVICES,
DOMAIN_LOCAL_IP,
LOGGER,
)
from .device import Device
@ -49,9 +45,7 @@ PLATFORMS = ["binary_sensor", "sensor"]
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Optional(CONF_LOCAL_IP): vol.All(ip_address, cv.string),
},
{},
)
},
extra=vol.ALLOW_EXTRA,
@ -63,11 +57,9 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
LOGGER.debug("async_setup, config: %s", config)
conf_default = CONFIG_SCHEMA({DOMAIN: {}})[DOMAIN]
conf = config.get(DOMAIN, conf_default)
local_ip = await async_get_source_ip(hass, PUBLIC_TARGET_IP)
hass.data[DOMAIN] = {
DOMAIN_CONFIG: conf,
DOMAIN_DEVICES: {},
DOMAIN_LOCAL_IP: conf.get(CONF_LOCAL_IP, local_ip),
}
# Only start if set up via configuration.yaml.
@ -93,16 +85,18 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
device_discovered_event = asyncio.Event()
discovery_info: Mapping[str, Any] | None = None
@callback
def device_discovered(info: Mapping[str, Any]) -> None:
async def device_discovered(headers: Mapping[str, Any], change: SsdpChange) -> None:
if change == SsdpChange.BYEBYE:
return
nonlocal discovery_info
LOGGER.debug(
"Device discovered: %s, at: %s", usn, info[ssdp.ATTR_SSDP_LOCATION]
"Device discovered: %s, at: %s", usn, headers[ssdp.ATTR_SSDP_LOCATION]
)
discovery_info = info
discovery_info = headers
device_discovered_event.set()
cancel_discovered_callback = ssdp.async_register_callback(
cancel_discovered_callback = await ssdp.async_register_callback(
hass,
device_discovered,
{
@ -177,9 +171,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
LOGGER.debug("Enabling sensors")
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
# Start device updater.
await device.async_start()
return True
@ -187,9 +178,6 @@ async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) ->
"""Unload a UPnP/IGD device from a config entry."""
LOGGER.debug("Unloading config entry: %s", config_entry.unique_id)
if coordinator := hass.data[DOMAIN].pop(config_entry.entry_id, None):
await coordinator.device.async_stop()
LOGGER.debug("Deleting sensors")
return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)
@ -228,10 +216,10 @@ class UpnpDataUpdateCoordinator(DataUpdateCoordinator):
self.device.async_get_status(),
)
data = dict(update_values[0])
data.update(update_values[1])
return data
return {
**update_values[0],
**update_values[1],
}
class UpnpEntity(CoordinatorEntity):

View file

@ -10,6 +10,7 @@ import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.components.ssdp import SsdpChange
from homeassistant.const import CONF_SCAN_INTERVAL
from homeassistant.core import HomeAssistant, callback
@ -40,8 +41,10 @@ async def _async_wait_for_discoveries(hass: HomeAssistant) -> bool:
"""Wait for a device to be discovered."""
device_discovered_event = asyncio.Event()
@callback
def device_discovered(info: Mapping[str, Any]) -> None:
async def device_discovered(info: Mapping[str, Any], change: SsdpChange) -> None:
if change == SsdpChange.BYEBYE:
return
LOGGER.info(
"Device discovered: %s, at: %s",
info[ssdp.ATTR_SSDP_USN],
@ -49,14 +52,14 @@ async def _async_wait_for_discoveries(hass: HomeAssistant) -> bool:
)
device_discovered_event.set()
cancel_discovered_callback_1 = ssdp.async_register_callback(
cancel_discovered_callback_1 = await ssdp.async_register_callback(
hass,
device_discovered,
{
ssdp.ATTR_SSDP_ST: ST_IGD_V1,
},
)
cancel_discovered_callback_2 = ssdp.async_register_callback(
cancel_discovered_callback_2 = await ssdp.async_register_callback(
hass,
device_discovered,
{
@ -77,11 +80,11 @@ async def _async_wait_for_discoveries(hass: HomeAssistant) -> bool:
return True
def _discovery_igd_devices(hass: HomeAssistant) -> list[Mapping[str, Any]]:
async def _async_discover_igd_devices(hass: HomeAssistant) -> list[Mapping[str, Any]]:
"""Discovery IGD devices."""
return ssdp.async_get_discovery_info_by_st(
return await ssdp.async_get_discovery_info_by_st(
hass, ST_IGD_V1
) + ssdp.async_get_discovery_info_by_st(hass, ST_IGD_V2)
) + await ssdp.async_get_discovery_info_by_st(hass, ST_IGD_V2)
class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
@ -121,7 +124,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return await self._async_create_entry_from_discovery(discovery)
# Discover devices.
discoveries = _discovery_igd_devices(self.hass)
discoveries = await _async_discover_igd_devices(self.hass)
# Store discoveries which have not been configured.
current_unique_ids = {
@ -171,7 +174,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
# Discover devices.
await _async_wait_for_discoveries(self.hass)
discoveries = _discovery_igd_devices(self.hass)
discoveries = await _async_discover_igd_devices(self.hass)
# Ensure anything to add. If not, silently abort.
if not discoveries:

View file

@ -6,11 +6,9 @@ from homeassistant.const import TIME_SECONDS
LOGGER = logging.getLogger(__package__)
CONF_LOCAL_IP = "local_ip"
DOMAIN = "upnp"
DOMAIN_CONFIG = "config"
DOMAIN_DEVICES = "devices"
DOMAIN_LOCAL_IP = "local_ip"
BYTES_RECEIVED = "bytes_received"
BYTES_SENT = "bytes_sent"
PACKETS_RECEIVED = "packets_received"

View file

@ -3,25 +3,23 @@ from __future__ import annotations
import asyncio
from collections.abc import Mapping
from ipaddress import IPv4Address
from typing import Any
from urllib.parse import urlparse
from async_upnp_client import UpnpFactory
from async_upnp_client import UpnpDevice, UpnpFactory
from async_upnp_client.aiohttp import AiohttpSessionRequester
from async_upnp_client.device_updater import DeviceUpdater
from async_upnp_client.profiles.igd import IgdDevice
from homeassistant.components import ssdp
from homeassistant.components.ssdp import SsdpChange
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
import homeassistant.util.dt as dt_util
from .const import (
BYTES_RECEIVED,
BYTES_SENT,
CONF_LOCAL_IP,
DOMAIN,
DOMAIN_CONFIG,
LOGGER as _LOGGER,
PACKETS_RECEIVED,
PACKETS_SENT,
@ -32,54 +30,61 @@ from .const import (
)
def _get_local_ip(hass: HomeAssistant) -> IPv4Address | None:
"""Get the configured local ip."""
if DOMAIN in hass.data and DOMAIN_CONFIG in hass.data[DOMAIN]:
local_ip = hass.data[DOMAIN][DOMAIN_CONFIG].get(CONF_LOCAL_IP)
if local_ip:
return IPv4Address(local_ip)
return None
class Device:
"""Home Assistant representation of a UPnP/IGD device."""
def __init__(self, igd_device: IgdDevice, device_updater: DeviceUpdater) -> None:
def __init__(self, hass: HomeAssistant, igd_device: IgdDevice) -> None:
"""Initialize UPnP/IGD device."""
self.hass = hass
self._igd_device = igd_device
self._device_updater = device_updater
self.coordinator: DataUpdateCoordinator = None
@classmethod
async def async_create_device(
async def async_create_upnp_device(
cls, hass: HomeAssistant, ssdp_location: str
) -> Device:
"""Create UPnP/IGD device."""
) -> UpnpDevice:
"""Create UPnP device."""
# Build async_upnp_client requester.
session = async_get_clientsession(hass)
requester = AiohttpSessionRequester(session, True, 10)
# Create async_upnp_client device.
factory = UpnpFactory(requester, disable_state_variable_validation=True)
upnp_device = await factory.async_create_device(ssdp_location)
return await factory.async_create_device(ssdp_location)
@classmethod
async def async_create_device(
cls, hass: HomeAssistant, ssdp_location: str
) -> Device:
"""Create UPnP/IGD device."""
upnp_device = await Device.async_create_upnp_device(hass, ssdp_location)
# Create profile wrapper.
igd_device = IgdDevice(upnp_device, None)
device = cls(hass, igd_device)
# Create updater.
local_ip = _get_local_ip(hass)
device_updater = DeviceUpdater(
device=upnp_device, factory=factory, source_ip=local_ip
# Register SSDP callback for updates.
usn = f"{upnp_device.udn}::{upnp_device.device_type}"
await ssdp.async_register_callback(
hass, device.async_ssdp_callback, {ssdp.ATTR_SSDP_USN: usn}
)
return cls(igd_device, device_updater)
return device
async def async_start(self) -> None:
"""Start the device updater."""
await self._device_updater.async_start()
async def async_ssdp_callback(
self, headers: Mapping[str, Any], change: SsdpChange
) -> None:
"""SSDP callback, update if needed."""
if change != SsdpChange.UPDATE or ssdp.ATTR_SSDP_LOCATION not in headers:
return
async def async_stop(self) -> None:
"""Stop the device updater."""
await self._device_updater.async_stop()
location = headers[ssdp.ATTR_SSDP_LOCATION]
device = self._igd_device.device
if location == device.device_url:
return
new_upnp_device = Device.async_create_upnp_device(self.hass, location)
device.reinit(new_upnp_device)
@property
def udn(self) -> str:

View file

@ -3,7 +3,7 @@
"name": "UPnP/IGD",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/upnp",
"requirements": ["async-upnp-client==0.20.0"],
"requirements": ["async-upnp-client==0.21.2"],
"dependencies": ["network", "ssdp"],
"codeowners": ["@StevenLooman","@ehendrix23"],
"ssdp": [

View file

@ -30,7 +30,7 @@ SCAN_INTERVAL = timedelta(seconds=60)
async def get_upnp_desc(hass: HomeAssistant, host: str):
"""Get the upnp description URL for a given host, using the SSPD scanner."""
ssdp_entries = ssdp.async_get_discovery_info_by_st(hass, "upnp:rootdevice")
ssdp_entries = await ssdp.async_get_discovery_info_by_st(hass, "upnp:rootdevice")
matches = [w for w in ssdp_entries if w.get("_host", "") == host]
upnp_desc = None
for match in matches:

View file

@ -8,7 +8,7 @@ from ipaddress import IPv4Address, IPv6Address
import logging
from urllib.parse import urlparse
from async_upnp_client.search import SSDPListener
from async_upnp_client.search import SsdpSearchListener
import voluptuous as vol
from yeelight import BulbException
from yeelight.aio import KEY_CONNECTED, AsyncBulb
@ -395,7 +395,7 @@ class YeelightScanner:
return _async_connected
self._listeners.append(
SSDPListener(
SsdpSearchListener(
async_callback=self._async_process_entry,
service_type=SSDP_ST,
target=SSDP_TARGET,

View file

@ -2,7 +2,7 @@
"domain": "yeelight",
"name": "Yeelight",
"documentation": "https://www.home-assistant.io/integrations/yeelight",
"requirements": ["yeelight==0.7.4", "async-upnp-client==0.20.0"],
"requirements": ["yeelight==0.7.4", "async-upnp-client==0.21.2"],
"codeowners": ["@rytilahti", "@zewelor", "@shenxn", "@starkillerOG"],
"config_flow": true,
"dependencies": ["network"],

View file

@ -4,7 +4,7 @@ aiodiscover==1.4.2
aiohttp==3.7.4.post0
aiohttp_cors==0.7.0
astral==2.2
async-upnp-client==0.20.0
async-upnp-client==0.21.2
async_timeout==3.0.1
attrs==21.2.0
awesomeversion==21.8.1

View file

@ -318,7 +318,7 @@ asterisk_mbox==0.5.0
# homeassistant.components.ssdp
# homeassistant.components.upnp
# homeassistant.components.yeelight
async-upnp-client==0.20.0
async-upnp-client==0.21.2
# homeassistant.components.supla
asyncpysupla==0.0.5

View file

@ -209,7 +209,7 @@ arcam-fmj==0.7.0
# homeassistant.components.ssdp
# homeassistant.components.upnp
# homeassistant.components.yeelight
async-upnp-client==0.20.0
async-upnp-client==0.21.2
# homeassistant.components.aurora
auroranoaa==0.0.2

View file

@ -74,16 +74,28 @@ def soco_fixture(music_library, speaker_info, battery_info, alarm_clock):
yield mock_soco
@pytest.fixture(autouse=True)
async def silent_ssdp_scanner(hass):
"""Start SSDP component and get Scanner, prevent actual SSDP traffic."""
with patch(
"homeassistant.components.ssdp.Scanner._async_start_ssdp_listeners"
), patch("homeassistant.components.ssdp.Scanner._async_stop_ssdp_listeners"), patch(
"homeassistant.components.ssdp.Scanner.async_scan"
):
yield
@pytest.fixture(name="discover", autouse=True)
def discover_fixture(soco):
"""Create a mock soco discover fixture."""
def do_callback(hass, callback, *args, **kwargs):
callback(
async def do_callback(hass, callback, *args, **kwargs):
await callback(
{
ssdp.ATTR_UPNP_UDN: soco.uid,
ssdp.ATTR_SSDP_LOCATION: f"http://{soco.ip_address}/",
}
},
ssdp.SsdpChange.ALIVE,
)
return MagicMock()

View file

@ -0,0 +1,25 @@
"""Configuration for SSDP tests."""
from unittest.mock import AsyncMock, patch
from async_upnp_client.ssdp_listener import SsdpListener
import pytest
@pytest.fixture(autouse=True)
async def silent_ssdp_listener():
"""Patch SsdpListener class, preventing any actual SSDP traffic."""
with patch("homeassistant.components.ssdp.SsdpListener.async_start"), patch(
"homeassistant.components.ssdp.SsdpListener.async_stop"
), patch("homeassistant.components.ssdp.SsdpListener.async_search"):
# Fixtures are initialized before patches. When the component is started here,
# certain functions/methods might not be patched in time.
yield SsdpListener
@pytest.fixture
def mock_flow_init(hass):
"""Mock hass.config_entries.flow.async_init."""
with patch.object(
hass.config_entries.flow, "async_init", return_value=AsyncMock()
) as mock_init:
yield mock_init

File diff suppressed because it is too large Load diff

View file

@ -1 +1 @@
"""Tests for the IGD component."""
"""Tests for the upnp component."""

View file

@ -1,23 +0,0 @@
"""Common for upnp."""
from urllib.parse import urlparse
from homeassistant.components import ssdp
TEST_UDN = "uuid:device"
TEST_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
TEST_USN = f"{TEST_UDN}::{TEST_ST}"
TEST_LOCATION = "http://192.168.1.1/desc.xml"
TEST_HOSTNAME = urlparse(TEST_LOCATION).hostname
TEST_FRIENDLY_NAME = "friendly name"
TEST_DISCOVERY = {
ssdp.ATTR_SSDP_LOCATION: TEST_LOCATION,
ssdp.ATTR_SSDP_ST: TEST_ST,
ssdp.ATTR_SSDP_USN: TEST_USN,
ssdp.ATTR_UPNP_UDN: TEST_UDN,
"usn": TEST_USN,
"location": TEST_LOCATION,
"_host": TEST_HOSTNAME,
"_udn": TEST_UDN,
"friendlyName": TEST_FRIENDLY_NAME,
}

View file

@ -0,0 +1,187 @@
"""Configuration for SSDP tests."""
from typing import Any, Mapping
from unittest.mock import AsyncMock, MagicMock, patch
from urllib.parse import urlparse
import pytest
from homeassistant.components import ssdp
from homeassistant.components.upnp.const import (
BYTES_RECEIVED,
BYTES_SENT,
PACKETS_RECEIVED,
PACKETS_SENT,
ROUTER_IP,
ROUTER_UPTIME,
TIMESTAMP,
WAN_STATUS,
)
from homeassistant.core import HomeAssistant
from homeassistant.util import dt
TEST_UDN = "uuid:device"
TEST_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
TEST_USN = f"{TEST_UDN}::{TEST_ST}"
TEST_LOCATION = "http://192.168.1.1/desc.xml"
TEST_HOSTNAME = urlparse(TEST_LOCATION).hostname
TEST_FRIENDLY_NAME = "friendly name"
TEST_DISCOVERY = {
ssdp.ATTR_SSDP_LOCATION: TEST_LOCATION,
ssdp.ATTR_SSDP_ST: TEST_ST,
ssdp.ATTR_SSDP_USN: TEST_USN,
ssdp.ATTR_UPNP_UDN: TEST_UDN,
"usn": TEST_USN,
"location": TEST_LOCATION,
"_host": TEST_HOSTNAME,
"_udn": TEST_UDN,
"friendlyName": TEST_FRIENDLY_NAME,
}
class MockDevice:
"""Mock device for Device."""
def __init__(self, hass: HomeAssistant, udn: str) -> None:
"""Initialize mock device."""
self.hass = hass
self._udn = udn
self.traffic_times_polled = 0
self.status_times_polled = 0
@classmethod
async def async_create_device(cls, hass, ssdp_location) -> "MockDevice":
"""Return self."""
return cls(hass, TEST_UDN)
async def async_ssdp_callback(
self, headers: Mapping[str, Any], change: ssdp.SsdpChange
) -> None:
"""SSDP callback, update if needed."""
pass
@property
def udn(self) -> str:
"""Get the UDN."""
return self._udn
@property
def manufacturer(self) -> str:
"""Get manufacturer."""
return "mock-manufacturer"
@property
def name(self) -> str:
"""Get name."""
return "mock-name"
@property
def model_name(self) -> str:
"""Get the model name."""
return "mock-model-name"
@property
def device_type(self) -> str:
"""Get the device type."""
return "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
@property
def usn(self) -> str:
"""Get the USN."""
return f"{self.udn}::{self.device_type}"
@property
def unique_id(self) -> str:
"""Get the unique id."""
return self.usn
@property
def hostname(self) -> str:
"""Get the hostname."""
return "mock-hostname"
async def async_get_traffic_data(self) -> Mapping[str, Any]:
"""Get traffic data."""
self.traffic_times_polled += 1
return {
TIMESTAMP: dt.utcnow(),
BYTES_RECEIVED: 0,
BYTES_SENT: 0,
PACKETS_RECEIVED: 0,
PACKETS_SENT: 0,
}
async def async_get_status(self) -> Mapping[str, Any]:
"""Get connection status, uptime, and external IP."""
self.status_times_polled += 1
return {
WAN_STATUS: "Connected",
ROUTER_UPTIME: 0,
ROUTER_IP: "192.168.0.1",
}
@pytest.fixture(autouse=True)
def mock_upnp_device():
"""Mock homeassistant.components.upnp.Device."""
with patch(
"homeassistant.components.upnp.Device", new=MockDevice
) as mock_async_create_device:
yield mock_async_create_device
@pytest.fixture
def mock_setup_entry():
"""Mock async_setup_entry."""
with patch(
"homeassistant.components.upnp.async_setup_entry",
return_value=AsyncMock(True),
) as mock_setup:
yield mock_setup
@pytest.fixture(autouse=True)
async def silent_ssdp_scanner(hass):
"""Start SSDP component and get Scanner, prevent actual SSDP traffic."""
with patch(
"homeassistant.components.ssdp.Scanner._async_start_ssdp_listeners"
), patch("homeassistant.components.ssdp.Scanner._async_stop_ssdp_listeners"), patch(
"homeassistant.components.ssdp.Scanner.async_scan"
):
yield
@pytest.fixture
async def ssdp_instant_discovery():
"""Instance discovery."""
# Set up device discovery callback.
async def register_callback(hass, callback, match_dict):
"""Immediately do callback."""
await callback(TEST_DISCOVERY, ssdp.SsdpChange.ALIVE)
return MagicMock()
with patch(
"homeassistant.components.ssdp.async_register_callback",
side_effect=register_callback,
) as mock_register, patch(
"homeassistant.components.ssdp.async_get_discovery_info_by_st",
return_value=[TEST_DISCOVERY],
) as mock_get_info:
yield (mock_register, mock_get_info)
@pytest.fixture
async def ssdp_no_discovery():
"""No discovery."""
# Set up device discovery callback.
async def register_callback(hass, callback, match_dict):
"""Don't do callback."""
return MagicMock()
with patch(
"homeassistant.components.ssdp.async_register_callback",
side_effect=register_callback,
) as mock_register, patch(
"homeassistant.components.ssdp.async_get_discovery_info_by_st",
return_value=[],
) as mock_get_info:
yield (mock_register, mock_get_info)

View file

@ -1,49 +0,0 @@
"""Mock ssdp.Scanner."""
from __future__ import annotations
from typing import Any
from unittest.mock import patch
import pytest
from homeassistant.components import ssdp
from homeassistant.core import callback
class MockSsdpDescriptionManager(ssdp.DescriptionManager):
"""Mocked ssdp DescriptionManager."""
async def fetch_description(
self, xml_location: str | None
) -> None | dict[str, str]:
"""Fetch the location or get it from the cache."""
if xml_location is None:
return None
return {}
class MockSsdpScanner(ssdp.Scanner):
"""Mocked ssdp Scanner."""
@callback
def async_stop(self, *_: Any) -> None:
"""Stop the scanner."""
# Do nothing.
async def async_start(self) -> None:
"""Start the scanner."""
self.description_manager = MockSsdpDescriptionManager(self.hass)
@callback
def async_scan(self, *_: Any) -> None:
"""Scan for new entries."""
# Do nothing.
@pytest.fixture
def mock_ssdp_scanner():
"""Mock ssdp Scanner."""
with patch(
"homeassistant.components.ssdp.Scanner", new=MockSsdpScanner
) as mock_ssdp_scanner:
yield mock_ssdp_scanner

View file

@ -1,104 +0,0 @@
"""Mock device for testing purposes."""
from typing import Any, Mapping
from unittest.mock import AsyncMock, patch
import pytest
from homeassistant.components.upnp.const import (
BYTES_RECEIVED,
BYTES_SENT,
PACKETS_RECEIVED,
PACKETS_SENT,
ROUTER_IP,
ROUTER_UPTIME,
TIMESTAMP,
WAN_STATUS,
)
from homeassistant.components.upnp.device import Device
from homeassistant.util import dt
from .common import TEST_UDN
class MockDevice(Device):
"""Mock device for Device."""
def __init__(self, udn: str) -> None:
"""Initialize mock device."""
igd_device = object()
mock_device_updater = AsyncMock()
super().__init__(igd_device, mock_device_updater)
self._udn = udn
self.traffic_times_polled = 0
self.status_times_polled = 0
@classmethod
async def async_create_device(cls, hass, ssdp_location) -> "MockDevice":
"""Return self."""
return cls(TEST_UDN)
@property
def udn(self) -> str:
"""Get the UDN."""
return self._udn
@property
def manufacturer(self) -> str:
"""Get manufacturer."""
return "mock-manufacturer"
@property
def name(self) -> str:
"""Get name."""
return "mock-name"
@property
def model_name(self) -> str:
"""Get the model name."""
return "mock-model-name"
@property
def device_type(self) -> str:
"""Get the device type."""
return "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
@property
def hostname(self) -> str:
"""Get the hostname."""
return "mock-hostname"
async def async_get_traffic_data(self) -> Mapping[str, Any]:
"""Get traffic data."""
self.traffic_times_polled += 1
return {
TIMESTAMP: dt.utcnow(),
BYTES_RECEIVED: 0,
BYTES_SENT: 0,
PACKETS_RECEIVED: 0,
PACKETS_SENT: 0,
}
async def async_get_status(self) -> Mapping[str, Any]:
"""Get connection status, uptime, and external IP."""
self.status_times_polled += 1
return {
WAN_STATUS: "Connected",
ROUTER_UPTIME: 0,
ROUTER_IP: "192.168.0.1",
}
async def async_start(self) -> None:
"""Start the device updater."""
async def async_stop(self) -> None:
"""Stop the device updater."""
@pytest.fixture
def mock_upnp_device():
"""Mock upnp Device.async_create_device."""
with patch(
"homeassistant.components.upnp.Device", new=MockDevice
) as mock_async_create_device:
yield mock_async_create_device

View file

@ -1,7 +1,6 @@
"""Test UPnP/IGD config flow."""
from datetime import timedelta
from unittest.mock import patch
import pytest
@ -15,11 +14,10 @@ from homeassistant.components.upnp.const import (
DEFAULT_SCAN_INTERVAL,
DOMAIN,
)
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.setup import async_setup_component
from homeassistant.core import HomeAssistant
from homeassistant.util import dt
from .common import (
from .conftest import (
TEST_DISCOVERY,
TEST_FRIENDLY_NAME,
TEST_HOSTNAME,
@ -28,25 +26,15 @@ from .common import (
TEST_UDN,
TEST_USN,
)
from .mock_ssdp_scanner import mock_ssdp_scanner # noqa: F401
from .mock_upnp_device import mock_upnp_device # noqa: F401
from tests.common import MockConfigEntry, async_fire_time_changed
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device", "mock_get_source_ip")
async def test_flow_ssdp_discovery(
hass: HomeAssistant,
):
@pytest.mark.usefixtures(
"ssdp_instant_discovery", "mock_setup_entry", "mock_get_source_ip"
)
async def test_flow_ssdp(hass: HomeAssistant):
"""Test config flow: discovered + configured through ssdp."""
# Ensure we have a ssdp Scanner.
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
# Speed up callback in ssdp.async_register_callback.
hass.state = CoreState.not_running
# Discovered via step ssdp.
result = await hass.config_entries.flow.async_init(
DOMAIN,
@ -70,7 +58,7 @@ async def test_flow_ssdp_discovery(
}
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_get_source_ip")
@pytest.mark.usefixtures("mock_get_source_ip")
async def test_flow_ssdp_incomplete_discovery(hass: HomeAssistant):
"""Test config flow: incomplete discovery through ssdp."""
# Discovered via step ssdp.
@ -88,7 +76,7 @@ async def test_flow_ssdp_incomplete_discovery(hass: HomeAssistant):
assert result["reason"] == "incomplete_discovery"
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_get_source_ip")
@pytest.mark.usefixtures("mock_get_source_ip")
async def test_flow_ssdp_discovery_ignored(hass: HomeAssistant):
"""Test config flow: discovery through ssdp, but ignored, as hostname is used by existing config entry."""
# Existing entry.
@ -113,17 +101,11 @@ async def test_flow_ssdp_discovery_ignored(hass: HomeAssistant):
assert result["reason"] == "discovery_ignored"
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device", "mock_get_source_ip")
@pytest.mark.usefixtures(
"ssdp_instant_discovery", "mock_setup_entry", "mock_get_source_ip"
)
async def test_flow_user(hass: HomeAssistant):
"""Test config flow: discovered + configured through user."""
# Ensure we have a ssdp Scanner.
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
# Speed up callback in ssdp.async_register_callback.
hass.state = CoreState.not_running
# Discovered via step user.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
@ -145,17 +127,11 @@ async def test_flow_user(hass: HomeAssistant):
}
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device", "mock_get_source_ip")
@pytest.mark.usefixtures(
"ssdp_instant_discovery", "mock_setup_entry", "mock_get_source_ip"
)
async def test_flow_import(hass: HomeAssistant):
"""Test config flow: configured through configuration.yaml."""
# Ensure we have a ssdp Scanner.
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
# Speed up callback in ssdp.async_register_callback.
hass.state = CoreState.not_running
# Discovered via step import.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
@ -169,7 +145,7 @@ async def test_flow_import(hass: HomeAssistant):
}
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_get_source_ip")
@pytest.mark.usefixtures("ssdp_instant_discovery", "mock_get_source_ip")
async def test_flow_import_already_configured(hass: HomeAssistant):
"""Test config flow: configured through configuration.yaml, but existing config entry."""
# Existing entry.
@ -193,37 +169,20 @@ async def test_flow_import_already_configured(hass: HomeAssistant):
assert result["reason"] == "already_configured"
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_get_source_ip")
@pytest.mark.usefixtures("ssdp_no_discovery", "mock_get_source_ip")
async def test_flow_import_no_devices_found(hass: HomeAssistant):
"""Test config flow: no devices found, configured through configuration.yaml."""
# Ensure we have a ssdp Scanner.
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
ssdp_scanner.cache.clear()
# Discovered via step import.
with patch(
"homeassistant.components.upnp.config_flow.SSDP_SEARCH_TIMEOUT", new=0.0
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "no_devices_found"
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "no_devices_found"
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device", "mock_get_source_ip")
@pytest.mark.usefixtures("ssdp_instant_discovery", "mock_get_source_ip")
async def test_options_flow(hass: HomeAssistant):
"""Test options flow."""
# Ensure we have a ssdp Scanner.
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
# Speed up callback in ssdp.async_register_callback.
hass.state = CoreState.not_running
# Set up config entry.
config_entry = MockConfigEntry(
domain=DOMAIN,

View file

@ -3,25 +3,23 @@ from __future__ import annotations
import pytest
from homeassistant.components import ssdp
from homeassistant.components.upnp.const import (
CONFIG_ENTRY_ST,
CONFIG_ENTRY_UDN,
DOMAIN,
)
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from .common import TEST_DISCOVERY, TEST_ST, TEST_UDN
from .mock_ssdp_scanner import mock_ssdp_scanner # noqa: F401
from .mock_upnp_device import mock_upnp_device # noqa: F401
from .conftest import TEST_ST, TEST_UDN
from tests.common import MockConfigEntry
@pytest.mark.usefixtures("mock_ssdp_scanner", "mock_upnp_device", "mock_get_source_ip")
@pytest.mark.usefixtures("ssdp_instant_discovery", "mock_get_source_ip")
async def test_async_setup_entry_default(hass: HomeAssistant):
"""Test async_setup_entry."""
entry = MockConfigEntry(
domain=DOMAIN,
data={
@ -34,12 +32,6 @@ async def test_async_setup_entry_default(hass: HomeAssistant):
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
# Device is discovered.
ssdp_scanner: ssdp.Scanner = hass.data[ssdp.DOMAIN]
ssdp_scanner.cache[(TEST_UDN, TEST_ST)] = TEST_DISCOVERY
# Speed up callback in ssdp.async_register_callback.
hass.state = CoreState.not_running
# Load config_entry.
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id) is True

View file

@ -14,6 +14,17 @@ from homeassistant.const import CONF_HOST
from tests.common import MockConfigEntry
@pytest.fixture(autouse=True)
async def silent_ssdp_scanner(hass):
"""Start SSDP component and get Scanner, prevent actual SSDP traffic."""
with patch(
"homeassistant.components.ssdp.Scanner._async_start_ssdp_listeners"
), patch("homeassistant.components.ssdp.Scanner._async_stop_ssdp_listeners"), patch(
"homeassistant.components.ssdp.Scanner.async_scan"
):
yield
@pytest.fixture(autouse=True)
def mock_setup_entry():
"""Mock setting up a config entry."""

View file

@ -4,7 +4,7 @@ from datetime import timedelta
from ipaddress import IPv4Address
from unittest.mock import AsyncMock, MagicMock, patch
from async_upnp_client.search import SSDPListener
from async_upnp_client.search import SsdpSearchListener
from yeelight import BulbException, BulbType
from yeelight.main import _MODEL_SPECS
@ -145,7 +145,7 @@ def _mocked_bulb(cannot_connect=False):
def _patched_ssdp_listener(info, *args, **kwargs):
listener = SSDPListener(*args, **kwargs)
listener = SsdpSearchListener(*args, **kwargs)
async def _async_callback(*_):
if kwargs["source_ip"] == IPv4Address(FAIL_TO_BIND_IP):
@ -173,7 +173,7 @@ def _patch_discovery(no_device=False, capabilities=None):
)
return patch(
"homeassistant.components.yeelight.SSDPListener",
"homeassistant.components.yeelight.SsdpSearchListener",
new=_generate_fake_ssdp_listener,
)