From 2d4ccddd99bcc4bd314f813214be693788b1c0bf Mon Sep 17 00:00:00 2001 From: epenet <6771947+epenet@users.noreply.github.com> Date: Tue, 8 Mar 2022 07:53:59 +0100 Subject: [PATCH] Fix SSDP unique id in SamsungTV config flow (#67811) Co-authored-by: epenet Co-authored-by: J. Nick Koston --- .../components/samsungtv/config_flow.py | 24 +++++- .../components/samsungtv/test_config_flow.py | 78 +++++++++++++++++-- 2 files changed, 92 insertions(+), 10 deletions(-) diff --git a/homeassistant/components/samsungtv/config_flow.py b/homeassistant/components/samsungtv/config_flow.py index 8b7e0f83a49..8c990dece77 100644 --- a/homeassistant/components/samsungtv/config_flow.py +++ b/homeassistant/components/samsungtv/config_flow.py @@ -71,6 +71,7 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): self._host: str = "" self._mac: str | None = None self._udn: str | None = None + self._upnp_udn: str | None = None self._manufacturer: str | None = None self._model: str | None = None self._name: str | None = None @@ -220,10 +221,19 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): Returns the existing entry if it was updated. """ for entry in self._async_current_entries(include_ignore=False): - if entry.data[CONF_HOST] != self._host: + mac = entry.data.get(CONF_MAC) + mac_match = mac and self._mac and mac == self._mac + upnp_udn_match = self._upnp_udn and self._upnp_udn == entry.unique_id + if ( + entry.data[CONF_HOST] != self._host + and not mac_match + and not upnp_udn_match + ): continue entry_kw_args: dict = {} - if self.unique_id and entry.unique_id is None: + if (self._udn and self._upnp_udn and self._upnp_udn != self._udn) or ( + self.unique_id and entry.unique_id is None + ): entry_kw_args["unique_id"] = self.unique_id if self._mac and not entry.data.get(CONF_MAC): entry_kw_args["data"] = {**entry.data, CONF_MAC: self._mac} @@ -264,16 +274,22 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): """Handle a flow initialized by ssdp discovery.""" LOGGER.debug("Samsung device found via SSDP: %s", discovery_info) model_name: str = discovery_info.upnp.get(ssdp.ATTR_UPNP_MODEL_NAME) or "" - self._udn = _strip_uuid(discovery_info.upnp[ssdp.ATTR_UPNP_UDN]) + self._udn = self._upnp_udn = _strip_uuid( + discovery_info.upnp[ssdp.ATTR_UPNP_UDN] + ) if hostname := urlparse(discovery_info.ssdp_location or "").hostname: self._host = hostname - await self._async_set_unique_id_from_udn() self._manufacturer = discovery_info.upnp[ssdp.ATTR_UPNP_MANUFACTURER] self._abort_if_manufacturer_is_not_samsung() if not await self._async_get_and_check_device_info(): # If we cannot get device info for an SSDP discovery # its likely a legacy tv. self._name = self._title = self._model = model_name + + # The UDN provided by the ssdp discovery doesn't always match the UDN + # from the device_info, used by the the other methods so we need to + # ensure the device_info is loaded before setting the unique_id + await self._async_set_unique_id_from_udn() self._async_update_and_abort_for_matching_unique_id() self._async_abort_if_host_already_in_progress() self.context["title_placeholders"] = {"device": self._title} diff --git a/tests/components/samsungtv/test_config_flow.py b/tests/components/samsungtv/test_config_flow.py index 98eef3d7810..b6f23f83912 100644 --- a/tests/components/samsungtv/test_config_flow.py +++ b/tests/components/samsungtv/test_config_flow.py @@ -339,7 +339,7 @@ async def test_ssdp(hass: HomeAssistant) -> None: assert result["data"][CONF_NAME] == "fake_name" assert result["data"][CONF_MANUFACTURER] == "Samsung fake_manufacturer" assert result["data"][CONF_MODEL] == "fake_model" - assert result["result"].unique_id == "0d1cef00-00dc-1000-9c80-4844f7b172de" + assert result["result"].unique_id == "123" @pytest.mark.usefixtures("remote") @@ -373,7 +373,7 @@ async def test_ssdp_noprefix(hass: HomeAssistant) -> None: assert result["data"][CONF_NAME] == "fake2_name" assert result["data"][CONF_MANUFACTURER] == "Samsung fake2_manufacturer" assert result["data"][CONF_MODEL] == "fake2_model" - assert result["result"].unique_id == "0d1cef00-00dc-1000-9c80-4844f7b172df" + assert result["result"].unique_id == "345" @pytest.mark.usefixtures("remotews") @@ -448,7 +448,7 @@ async def test_ssdp_websocket_success_populates_mac_address( assert result["data"][CONF_MAC] == "aa:bb:ww:ii:ff:ii" assert result["data"][CONF_MANUFACTURER] == "Samsung fake_manufacturer" assert result["data"][CONF_MODEL] == "82GXARRS" - assert result["result"].unique_id == "0d1cef00-00dc-1000-9c80-4844f7b172de" + assert result["result"].unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" async def test_ssdp_websocket_not_supported( @@ -591,7 +591,7 @@ async def test_ssdp_already_configured(hass: HomeAssistant) -> None: assert result2["reason"] == RESULT_ALREADY_CONFIGURED # check updated device info - assert entry.unique_id == "0d1cef00-00dc-1000-9c80-4844f7b172de" + assert entry.unique_id == "123" @pytest.mark.usefixtures("remote") @@ -1013,7 +1013,7 @@ async def test_update_old_entry(hass: HomeAssistant) -> None: # check updated device info assert entry2.data.get(CONF_ID) is not None assert entry2.data.get(CONF_IP_ADDRESS) is not None - assert entry2.unique_id == "0d1cef00-00dc-1000-9c80-4844f7b172de" + assert entry2.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" @pytest.mark.usefixtures("remotews") @@ -1099,7 +1099,7 @@ async def test_update_missing_mac_unique_id_added_from_ssdp( assert result["type"] == "abort" assert result["reason"] == "already_configured" assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" - assert entry.unique_id == "0d1cef00-00dc-1000-9c80-4844f7b172de" + assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" @pytest.mark.usefixtures("remotews") @@ -1308,3 +1308,69 @@ async def test_form_reauth_websocket_not_supported(hass: HomeAssistant) -> None: assert result2["type"] == "abort" assert result2["reason"] == "not_supported" + + +@pytest.mark.usefixtures("remotews") +async def test_update_incorrect_udn_matching_upnp_udn_unique_id_added_from_ssdp( + hass: HomeAssistant, +) -> None: + """Test updating the wrong udn from ssdp via upnp udn match.""" + entry = MockConfigEntry( + domain=DOMAIN, + data=MOCK_OLD_ENTRY, + unique_id="0d1cef00-00dc-1000-9c80-4844f7b172de", + ) + entry.add_to_hass(hass) + with patch( + "homeassistant.components.samsungtv.async_setup", + return_value=True, + ) as mock_setup, patch( + "homeassistant.components.samsungtv.async_setup_entry", + return_value=True, + ) as mock_setup_entry: + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=MOCK_SSDP_DATA, + ) + await hass.async_block_till_done() + assert len(mock_setup.mock_calls) == 1 + assert len(mock_setup_entry.mock_calls) == 1 + + assert result["type"] == "abort" + assert result["reason"] == "already_configured" + assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" + + +@pytest.mark.usefixtures("remotews") +async def test_update_incorrect_udn_matching_mac_unique_id_added_from_ssdp( + hass: HomeAssistant, +) -> None: + """Test updating the wrong udn from ssdp via mac match.""" + entry = MockConfigEntry( + domain=DOMAIN, + data={**MOCK_OLD_ENTRY, CONF_MAC: "aa:bb:ww:ii:ff:ii"}, + unique_id=None, + ) + entry.add_to_hass(hass) + with patch( + "homeassistant.components.samsungtv.async_setup", + return_value=True, + ) as mock_setup, patch( + "homeassistant.components.samsungtv.async_setup_entry", + return_value=True, + ) as mock_setup_entry: + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=MOCK_SSDP_DATA, + ) + await hass.async_block_till_done() + assert len(mock_setup.mock_calls) == 1 + assert len(mock_setup_entry.mock_calls) == 1 + + assert result["type"] == "abort" + assert result["reason"] == "already_configured" + assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4"