From 2b0b3c238a4442f124918ab3879350565186ee03 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 8 Mar 2024 23:11:00 -1000 Subject: [PATCH] Make SSDP tasks background HassJob to avoid delaying startup (#112668) --- homeassistant/components/ssdp/__init__.py | 53 +++++++++++++------ .../components/dlna_dmr/test_media_player.py | 16 +++--- .../dlna_dms/test_device_availability.py | 14 ++--- .../dlna_dms/test_dms_device_source.py | 2 +- tests/components/ssdp/test_init.py | 15 +++--- 5 files changed, 60 insertions(+), 40 deletions(-) diff --git a/homeassistant/components/ssdp/__init__.py b/homeassistant/components/ssdp/__init__.py index ce6b7e30a84..72e94996361 100644 --- a/homeassistant/components/ssdp/__init__.py +++ b/homeassistant/components/ssdp/__init__.py @@ -3,10 +3,11 @@ from __future__ import annotations import asyncio -from collections.abc import Awaitable, Callable, Mapping +from collections.abc import Callable, Coroutine, Mapping from dataclasses import dataclass, field from datetime import timedelta from enum import Enum +from functools import partial from ipaddress import IPv4Address, IPv6Address import logging import socket @@ -42,7 +43,7 @@ from homeassistant.const import ( MATCH_ALL, __version__ as current_version, ) -from homeassistant.core import Event, HomeAssistant, callback as core_callback +from homeassistant.core import Event, HassJob, HomeAssistant, callback as core_callback from homeassistant.data_entry_flow import BaseServiceInfo from homeassistant.helpers import config_validation as cv, discovery_flow from homeassistant.helpers.aiohttp_client import async_get_clientsession @@ -53,6 +54,7 @@ from homeassistant.helpers.system_info import async_get_system_info from homeassistant.helpers.typing import ConfigType from homeassistant.loader import async_get_ssdp, bind_hass from homeassistant.util.async_ import create_eager_task +from homeassistant.util.logging import catch_log_exception DOMAIN = "ssdp" SSDP_SCANNER = "scanner" @@ -124,7 +126,9 @@ class SsdpServiceInfo(BaseServiceInfo): SsdpChange = Enum("SsdpChange", "ALIVE BYEBYE UPDATE") -SsdpCallback = Callable[[SsdpServiceInfo, SsdpChange], Awaitable] +SsdpHassJobCallback = HassJob[ + [SsdpServiceInfo, SsdpChange], Coroutine[Any, Any, None] | None +] SSDP_SOURCE_SSDP_CHANGE_MAPPING: Mapping[SsdpSource, SsdpChange] = { SsdpSource.SEARCH_ALIVE: SsdpChange.ALIVE, @@ -135,10 +139,15 @@ SSDP_SOURCE_SSDP_CHANGE_MAPPING: Mapping[SsdpSource, SsdpChange] = { } +def _format_err(name: str, *args: Any) -> str: + """Format error message.""" + return f"Exception in SSDP callback {name}: {args}" + + @bind_hass async def async_register_callback( hass: HomeAssistant, - callback: SsdpCallback, + callback: Callable[[SsdpServiceInfo, SsdpChange], Coroutine[Any, Any, None] | None], match_dict: None | dict[str, str] = None, ) -> Callable[[], None]: """Register to receive a callback on ssdp broadcast. @@ -146,7 +155,14 @@ async def async_register_callback( Returns a callback that can be used to cancel the registration. """ scanner: Scanner = hass.data[DOMAIN][SSDP_SCANNER] - return await scanner.async_register_callback(callback, match_dict) + job = HassJob( + catch_log_exception( + callback, + partial(_format_err, str(callback)), + ), + f"ssdp callback {match_dict}", + ) + return await scanner.async_register_callback(job, match_dict) @bind_hass @@ -206,14 +222,18 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: return True -async def _async_process_callbacks( - callbacks: list[SsdpCallback], +@core_callback +def _async_process_callbacks( + hass: HomeAssistant, + callbacks: list[SsdpHassJobCallback], discovery_info: SsdpServiceInfo, ssdp_change: SsdpChange, ) -> None: for callback in callbacks: try: - await callback(discovery_info, ssdp_change) + hass.async_run_hass_job( + callback, discovery_info, ssdp_change, eager_start=True, background=True + ) except Exception: # pylint: disable=broad-except _LOGGER.exception("Failed to callback info: %s", discovery_info) @@ -287,7 +307,7 @@ class Scanner: self._cancel_scan: Callable[[], None] | None = None self._ssdp_listeners: list[SsdpListener] = [] self._device_tracker = SsdpDeviceTracker() - self._callbacks: list[tuple[SsdpCallback, dict[str, str]]] = [] + self._callbacks: list[tuple[SsdpHassJobCallback, dict[str, str]]] = [] self._description_cache: DescriptionCache | None = None self.integration_matchers = integration_matchers @@ -297,7 +317,7 @@ class Scanner: return list(self._device_tracker.devices.values()) async def async_register_callback( - self, callback: SsdpCallback, match_dict: None | dict[str, str] = None + self, callback: SsdpHassJobCallback, match_dict: None | dict[str, str] = None ) -> Callable[[], None]: """Register a callback.""" if match_dict is None: @@ -310,7 +330,8 @@ class Scanner: for ssdp_device in self._ssdp_devices: for headers in ssdp_device.all_combined_headers.values(): if _async_headers_match(headers, lower_match_dict): - await _async_process_callbacks( + _async_process_callbacks( + self.hass, [callback], await self._async_headers_to_discovery_info( ssdp_device, headers @@ -426,7 +447,7 @@ class Scanner: def _async_get_matching_callbacks( self, combined_headers: CaseInsensitiveDict, - ) -> list[SsdpCallback]: + ) -> list[SsdpHassJobCallback]: """Return a list of callbacks that match.""" return [ callback @@ -451,10 +472,11 @@ class Scanner: _, info_desc = self._description_cache.peek_description_dict(location) if info_desc is None: # Fetch info desc in separate task and process from there. - self.hass.async_create_task( + self.hass.async_create_background_task( self._ssdp_listener_process_callback_with_lookup( ssdp_device, dst, source ), + name=f"ssdp_info_desc_lookup_{location}", eager_start=True, ) return @@ -509,10 +531,7 @@ class Scanner: if callbacks: ssdp_change = SSDP_SOURCE_SSDP_CHANGE_MAPPING[source] - self.hass.async_create_task( - _async_process_callbacks(callbacks, discovery_info, ssdp_change), - eager_start=True, - ) + _async_process_callbacks(self.hass, callbacks, discovery_info, ssdp_change) # Config flows should only be created for alive/update messages from alive devices if source == SsdpSource.ADVERTISEMENT_BYEBYE: diff --git a/tests/components/dlna_dmr/test_media_player.py b/tests/components/dlna_dmr/test_media_player.py index 6d39c8e74be..4eb4780add3 100644 --- a/tests/components/dlna_dmr/test_media_player.py +++ b/tests/components/dlna_dmr/test_media_player.py @@ -1425,7 +1425,7 @@ async def test_become_available( domain_data_mock.upnp_factory.async_create_device.reset_mock() # Send an SSDP notification from the now alive device - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -1498,7 +1498,7 @@ async def test_alive_but_gone( domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError # Send an SSDP notification from the still missing device - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -1611,7 +1611,7 @@ async def test_multiple_ssdp_alive( ) # Send two SSDP notifications with the new device URL - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -1651,7 +1651,7 @@ async def test_ssdp_byebye( ) -> None: """Test device is disconnected when byebye is received.""" # First byebye will cause a disconnect - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -1703,7 +1703,7 @@ async def test_ssdp_update_seen_bootid( domain_data_mock.upnp_factory.async_create_device.side_effect = None # Send SSDP alive with boot ID - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -1830,7 +1830,7 @@ async def test_ssdp_update_missed_bootid( domain_data_mock.upnp_factory.async_create_device.side_effect = None # Send SSDP alive with boot ID - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -1907,7 +1907,7 @@ async def test_ssdp_bootid( domain_data_mock.upnp_factory.async_create_device.side_effect = None # Send SSDP alive with boot ID - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -2367,7 +2367,7 @@ async def test_connections_restored( domain_data_mock.upnp_factory.async_create_device.reset_mock() # Send an SSDP notification from the now alive device - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, diff --git a/tests/components/dlna_dms/test_device_availability.py b/tests/components/dlna_dms/test_device_availability.py index c54588ce473..c1ad3c91a7b 100644 --- a/tests/components/dlna_dms/test_device_availability.py +++ b/tests/components/dlna_dms/test_device_availability.py @@ -177,7 +177,7 @@ async def test_become_available( upnp_factory_mock.async_create_device.reset_mock() # Send an SSDP notification from the now alive device - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -205,7 +205,7 @@ async def test_alive_but_gone( upnp_factory_mock.async_create_device.side_effect = UpnpError # Send an SSDP notification from the still missing device - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -308,7 +308,7 @@ async def test_multiple_ssdp_alive( upnp_factory_mock.async_create_device.side_effect = create_device_delayed # Send two SSDP notifications with the new device URL - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -343,7 +343,7 @@ async def test_ssdp_byebye( ) -> None: """Test device is disconnected when byebye is received.""" # First byebye will cause a disconnect - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -386,7 +386,7 @@ async def test_ssdp_update_seen_bootid( upnp_factory_mock.async_create_device.side_effect = None # Send SSDP alive with boot ID - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -498,7 +498,7 @@ async def test_ssdp_update_missed_bootid( upnp_factory_mock.async_create_device.side_effect = None # Send SSDP alive with boot ID - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, @@ -568,7 +568,7 @@ async def test_ssdp_bootid( upnp_factory_mock.async_create_device.reset_mock() # Send SSDP alive with boot ID - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, diff --git a/tests/components/dlna_dms/test_dms_device_source.py b/tests/components/dlna_dms/test_dms_device_source.py index 75b26ceb900..47bd7b0b39b 100644 --- a/tests/components/dlna_dms/test_dms_device_source.py +++ b/tests/components/dlna_dms/test_dms_device_source.py @@ -67,7 +67,7 @@ async def test_catch_request_error_unavailable( ) -> None: """Test the device is checked for availability before trying requests.""" # DmsDevice notifies of disconnect via SSDP - ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] + ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target await ssdp_callback( ssdp.SsdpServiceInfo( ssdp_usn=MOCK_DEVICE_USN, diff --git a/tests/components/ssdp/test_init.py b/tests/components/ssdp/test_init.py index e88214f01f5..7af2287c893 100644 --- a/tests/components/ssdp/test_init.py +++ b/tests/components/ssdp/test_init.py @@ -343,7 +343,7 @@ async def test_flow_start_only_alive( } ) ssdp_listener._on_search(mock_ssdp_search_response) - await hass.async_block_till_done() + await hass.async_block_till_done(wait_background_tasks=True) mock_flow_init.assert_awaited_once_with( "mock-domain", context={"source": config_entries.SOURCE_SSDP}, data=ANY @@ -464,6 +464,7 @@ async def test_start_stop_scanner(mock_source_set, hass: HomeAssistant) -> None: @pytest.mark.usefixtures("mock_get_source_ip") +@pytest.mark.no_fail_on_log_exception @patch("homeassistant.components.ssdp.async_get_ssdp", return_value={}) async def test_scan_with_registered_callback( mock_get_ssdp, @@ -523,9 +524,9 @@ async def test_scan_with_registered_callback( async_match_any_callback = AsyncMock() await ssdp.async_register_callback(hass, async_match_any_callback) - await hass.async_block_till_done() + await hass.async_block_till_done(wait_background_tasks=True) ssdp_listener._on_search(mock_ssdp_search_response) - await hass.async_block_till_done() + await hass.async_block_till_done(wait_background_tasks=True) assert async_integration_callback.call_count == 1 assert async_integration_match_all_callback.call_count == 1 @@ -549,7 +550,7 @@ async def test_scan_with_registered_callback( ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus", ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", } - assert "Failed to callback info" in caplog.text + assert "Exception in SSDP callback" in caplog.text async_integration_callback_from_cache = AsyncMock() await ssdp.async_register_callback( @@ -835,7 +836,7 @@ async def test_flow_dismiss_on_byebye( } ) ssdp_listener._on_search(mock_ssdp_search_response) - await hass.async_block_till_done() + await hass.async_block_till_done(wait_background_tasks=True) mock_flow_init.assert_awaited_once_with( "mock-domain", context={"source": config_entries.SOURCE_SSDP}, data=ANY @@ -853,7 +854,7 @@ async def test_flow_dismiss_on_byebye( } ) ssdp_listener._on_alive(mock_ssdp_advertisement) - await hass.async_block_till_done() + await hass.async_block_till_done(wait_background_tasks=True) mock_flow_init.assert_awaited_once_with( "mock-domain", context={"source": config_entries.SOURCE_SSDP}, data=ANY ) @@ -868,7 +869,7 @@ async def test_flow_dismiss_on_byebye( hass.config_entries.flow, "async_abort" ) as mock_async_abort: ssdp_listener._on_byebye(mock_ssdp_advertisement) - await hass.async_block_till_done() + await hass.async_block_till_done(wait_background_tasks=True) assert len(mock_async_progress_by_init_data_type.mock_calls) == 1 assert mock_async_abort.mock_calls[0][1][0] == "mock_flow_id"