Add LinkPlay integration (#113940)

* Intial commit

* Add artsound as virtual integration

* Add config_flow test
Add linkplay to .coveragerc
Add linkplay to .strict-typing

* Remove artsound component

* Bump package version

* Address mypy and coveragerc

* Address comments

* Address more feedback, add zeroconf and user flow

* Catch broken bridge in async_setup_entry

* Raise ConfigEntryNotReady, add __all__

* Implement new tests for the config_flow

* Fix async warning

* Fix test

* Address feedback

* Address comments

* Address comment

---------

Co-authored-by: Philip Vanloo <26272906+pvanloo@users.noreply.github.com>
This commit is contained in:
Philip Vanloo 2024-07-25 12:27:10 +02:00 committed by GitHub
parent 33d5ed52e6
commit cde22a44db
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 731 additions and 0 deletions

View file

@ -280,6 +280,7 @@ homeassistant.components.lidarr.*
homeassistant.components.lifx.*
homeassistant.components.light.*
homeassistant.components.linear_garage_door.*
homeassistant.components.linkplay.*
homeassistant.components.litejet.*
homeassistant.components.litterrobot.*
homeassistant.components.local_ip.*

View file

@ -795,6 +795,8 @@ build.json @home-assistant/supervisor
/tests/components/light/ @home-assistant/core
/homeassistant/components/linear_garage_door/ @IceBotYT
/tests/components/linear_garage_door/ @IceBotYT
/homeassistant/components/linkplay/ @Velleman
/tests/components/linkplay/ @Velleman
/homeassistant/components/linux_battery/ @fabaff
/homeassistant/components/litejet/ @joncar
/tests/components/litejet/ @joncar

View file

@ -0,0 +1,44 @@
"""Support for LinkPlay devices."""
from linkplay.bridge import LinkPlayBridge
from linkplay.discovery import linkplay_factory_bridge
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import PLATFORMS
class LinkPlayData:
"""Data for LinkPlay."""
bridge: LinkPlayBridge
type LinkPlayConfigEntry = ConfigEntry[LinkPlayData]
async def async_setup_entry(hass: HomeAssistant, entry: LinkPlayConfigEntry) -> bool:
"""Async setup hass config entry. Called when an entry has been setup."""
session = async_get_clientsession(hass)
if (
bridge := await linkplay_factory_bridge(entry.data[CONF_HOST], session)
) is None:
raise ConfigEntryNotReady(
f"Failed to connect to LinkPlay device at {entry.data[CONF_HOST]}"
)
entry.runtime_data = LinkPlayData()
entry.runtime_data.bridge = bridge
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: LinkPlayConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View file

@ -0,0 +1,91 @@
"""Config flow to configure LinkPlay component."""
from typing import Any
from linkplay.discovery import linkplay_factory_bridge
import voluptuous as vol
from homeassistant.components import zeroconf
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_HOST, CONF_MODEL
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN
class LinkPlayConfigFlow(ConfigFlow, domain=DOMAIN):
"""LinkPlay config flow."""
def __init__(self) -> None:
"""Initialize the LinkPlay config flow."""
self.data: dict[str, Any] = {}
async def async_step_zeroconf(
self, discovery_info: zeroconf.ZeroconfServiceInfo
) -> ConfigFlowResult:
"""Handle Zeroconf discovery."""
session = async_get_clientsession(self.hass)
bridge = await linkplay_factory_bridge(discovery_info.host, session)
if bridge is None:
return self.async_abort(reason="cannot_connect")
self.data[CONF_HOST] = discovery_info.host
self.data[CONF_MODEL] = bridge.device.name
await self.async_set_unique_id(bridge.device.uuid)
self._abort_if_unique_id_configured(updates={CONF_HOST: discovery_info.host})
self.context["title_placeholders"] = {
"name": self.data[CONF_MODEL],
}
return await self.async_step_discovery_confirm()
async def async_step_discovery_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm discovery."""
if user_input is not None:
return self.async_create_entry(
title=self.data[CONF_MODEL],
data={CONF_HOST: self.data[CONF_HOST]},
)
self._set_confirm_only()
return self.async_show_form(
step_id="discovery_confirm",
description_placeholders={
"name": self.data[CONF_MODEL],
},
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initialized by the user."""
errors: dict[str, str] = {}
if user_input:
session = async_get_clientsession(self.hass)
bridge = await linkplay_factory_bridge(user_input[CONF_HOST], session)
if bridge is not None:
self.data[CONF_HOST] = user_input[CONF_HOST]
self.data[CONF_MODEL] = bridge.device.name
await self.async_set_unique_id(bridge.device.uuid)
self._abort_if_unique_id_configured(
updates={CONF_HOST: self.data[CONF_HOST]}
)
return self.async_create_entry(
title=self.data[CONF_MODEL],
data={CONF_HOST: self.data[CONF_HOST]},
)
errors["base"] = "cannot_connect"
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Required(CONF_HOST): str}),
errors=errors,
)

View file

@ -0,0 +1,6 @@
"""LinkPlay constants."""
from homeassistant.const import Platform
DOMAIN = "linkplay"
PLATFORMS = [Platform.MEDIA_PLAYER]

View file

@ -0,0 +1,11 @@
{
"domain": "linkplay",
"name": "LinkPlay",
"codeowners": ["@Velleman"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/linkplay",
"integration_type": "hub",
"iot_class": "local_polling",
"requirements": ["python-linkplay==0.0.5"],
"zeroconf": ["_linkplay._tcp.local."]
}

View file

@ -0,0 +1,257 @@
"""Support for LinkPlay media players."""
from __future__ import annotations
from collections.abc import Callable, Coroutine
import logging
from typing import Any, Concatenate
from linkplay.bridge import LinkPlayBridge
from linkplay.consts import EqualizerMode, LoopMode, PlayingMode, PlayingStatus
from linkplay.exceptions import LinkPlayException, LinkPlayRequestException
from homeassistant.components import media_source
from homeassistant.components.media_player import (
BrowseMedia,
MediaPlayerDeviceClass,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType,
RepeatMode,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util.dt import utcnow
from . import LinkPlayConfigEntry
from .const import DOMAIN
from .utils import get_info_from_project
_LOGGER = logging.getLogger(__name__)
STATE_MAP: dict[PlayingStatus, MediaPlayerState] = {
PlayingStatus.STOPPED: MediaPlayerState.IDLE,
PlayingStatus.PAUSED: MediaPlayerState.PAUSED,
PlayingStatus.PLAYING: MediaPlayerState.PLAYING,
PlayingStatus.LOADING: MediaPlayerState.BUFFERING,
}
SOURCE_MAP: dict[PlayingMode, str] = {
PlayingMode.LINE_IN: "Line In",
PlayingMode.BLUETOOTH: "Bluetooth",
PlayingMode.OPTICAL: "Optical",
PlayingMode.LINE_IN_2: "Line In 2",
PlayingMode.USB_DAC: "USB DAC",
PlayingMode.COAXIAL: "Coaxial",
PlayingMode.XLR: "XLR",
PlayingMode.HDMI: "HDMI",
PlayingMode.OPTICAL_2: "Optical 2",
}
SOURCE_MAP_INV: dict[str, PlayingMode] = {v: k for k, v in SOURCE_MAP.items()}
REPEAT_MAP: dict[LoopMode, RepeatMode] = {
LoopMode.CONTINOUS_PLAY_ONE_SONG: RepeatMode.ONE,
LoopMode.PLAY_IN_ORDER: RepeatMode.OFF,
LoopMode.CONTINUOUS_PLAYBACK: RepeatMode.ALL,
LoopMode.RANDOM_PLAYBACK: RepeatMode.ALL,
LoopMode.LIST_CYCLE: RepeatMode.ALL,
}
REPEAT_MAP_INV: dict[RepeatMode, LoopMode] = {v: k for k, v in REPEAT_MAP.items()}
EQUALIZER_MAP: dict[EqualizerMode, str] = {
EqualizerMode.NONE: "None",
EqualizerMode.CLASSIC: "Classic",
EqualizerMode.POP: "Pop",
EqualizerMode.JAZZ: "Jazz",
EqualizerMode.VOCAL: "Vocal",
}
EQUALIZER_MAP_INV: dict[str, EqualizerMode] = {v: k for k, v in EQUALIZER_MAP.items()}
DEFAULT_FEATURES: MediaPlayerEntityFeature = (
MediaPlayerEntityFeature.PLAY
| MediaPlayerEntityFeature.PLAY_MEDIA
| MediaPlayerEntityFeature.BROWSE_MEDIA
| MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.STOP
| MediaPlayerEntityFeature.VOLUME_MUTE
| MediaPlayerEntityFeature.VOLUME_SET
| MediaPlayerEntityFeature.SELECT_SOURCE
| MediaPlayerEntityFeature.SELECT_SOUND_MODE
| MediaPlayerEntityFeature.GROUPING
)
SEEKABLE_FEATURES: MediaPlayerEntityFeature = (
MediaPlayerEntityFeature.PREVIOUS_TRACK
| MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.REPEAT_SET
| MediaPlayerEntityFeature.SEEK
)
async def async_setup_entry(
hass: HomeAssistant,
entry: LinkPlayConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up a media player from a config entry."""
async_add_entities([LinkPlayMediaPlayerEntity(entry.runtime_data.bridge)])
def exception_wrap[_LinkPlayEntityT: LinkPlayMediaPlayerEntity, **_P, _R](
func: Callable[Concatenate[_LinkPlayEntityT, _P], Coroutine[Any, Any, _R]],
) -> Callable[Concatenate[_LinkPlayEntityT, _P], Coroutine[Any, Any, _R]]:
"""Define a wrapper to catch exceptions and raise HomeAssistant errors."""
async def _wrap(self: _LinkPlayEntityT, *args: _P.args, **kwargs: _P.kwargs) -> _R:
try:
return await func(self, *args, **kwargs)
except LinkPlayRequestException as err:
raise HomeAssistantError(
f"Exception occurred when communicating with API {func}: {err}"
) from err
return _wrap
class LinkPlayMediaPlayerEntity(MediaPlayerEntity):
"""Representation of a LinkPlay media player."""
_attr_sound_mode_list = list(EQUALIZER_MAP.values())
_attr_device_class = MediaPlayerDeviceClass.RECEIVER
_attr_media_content_type = MediaType.MUSIC
_attr_has_entity_name = True
_attr_name = None
def __init__(self, bridge: LinkPlayBridge) -> None:
"""Initialize the LinkPlay media player."""
self._bridge = bridge
self._attr_unique_id = bridge.device.uuid
self._attr_source_list = [
SOURCE_MAP[playing_mode] for playing_mode in bridge.device.playmode_support
]
manufacturer, model = get_info_from_project(bridge.device.properties["project"])
self._attr_device_info = dr.DeviceInfo(
configuration_url=bridge.endpoint,
connections={(dr.CONNECTION_NETWORK_MAC, bridge.device.properties["MAC"])},
hw_version=bridge.device.properties["hardware"],
identifiers={(DOMAIN, bridge.device.uuid)},
manufacturer=manufacturer,
model=model,
name=bridge.device.name,
sw_version=bridge.device.properties["firmware"],
)
@exception_wrap
async def async_update(self) -> None:
"""Update the state of the media player."""
try:
await self._bridge.player.update_status()
self._update_properties()
except LinkPlayException:
self._attr_available = False
raise
@exception_wrap
async def async_select_source(self, source: str) -> None:
"""Select input source."""
await self._bridge.player.set_play_mode(SOURCE_MAP_INV[source])
@exception_wrap
async def async_select_sound_mode(self, sound_mode: str) -> None:
"""Select sound mode."""
await self._bridge.player.set_equalizer_mode(EQUALIZER_MAP_INV[sound_mode])
@exception_wrap
async def async_mute_volume(self, mute: bool) -> None:
"""Mute the volume."""
if mute:
await self._bridge.player.mute()
else:
await self._bridge.player.unmute()
@exception_wrap
async def async_set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
await self._bridge.player.set_volume(int(volume * 100))
@exception_wrap
async def async_media_pause(self) -> None:
"""Send pause command."""
await self._bridge.player.pause()
@exception_wrap
async def async_media_play(self) -> None:
"""Send play command."""
await self._bridge.player.resume()
@exception_wrap
async def async_set_repeat(self, repeat: RepeatMode) -> None:
"""Set repeat mode."""
await self._bridge.player.set_loop_mode(REPEAT_MAP_INV[repeat])
async def async_browse_media(
self,
media_content_type: MediaType | str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Return a BrowseMedia instance.
The BrowseMedia instance will be used by the
"media_player/browse_media" websocket command.
"""
return await media_source.async_browse_media(
self.hass,
media_content_id,
# This allows filtering content. In this case it will only show audio sources.
content_filter=lambda item: item.media_content_type.startswith("audio/"),
)
@exception_wrap
async def async_play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Play a piece of media."""
media = await media_source.async_resolve_media(
self.hass, media_id, self.entity_id
)
await self._bridge.player.play(media.url)
def _update_properties(self) -> None:
"""Update the properties of the media player."""
self._attr_available = True
self._attr_state = STATE_MAP[self._bridge.player.status]
self._attr_volume_level = self._bridge.player.volume / 100
self._attr_is_volume_muted = self._bridge.player.muted
self._attr_repeat = REPEAT_MAP[self._bridge.player.loop_mode]
self._attr_shuffle = self._bridge.player.loop_mode == LoopMode.RANDOM_PLAYBACK
self._attr_sound_mode = EQUALIZER_MAP[self._bridge.player.equalizer_mode]
self._attr_supported_features = DEFAULT_FEATURES
if self._bridge.player.status == PlayingStatus.PLAYING:
if self._bridge.player.total_length != 0:
self._attr_supported_features = (
self._attr_supported_features | SEEKABLE_FEATURES
)
self._attr_source = SOURCE_MAP.get(self._bridge.player.play_mode, "other")
self._attr_media_position = self._bridge.player.current_position / 1000
self._attr_media_position_updated_at = utcnow()
self._attr_media_duration = self._bridge.player.total_length / 1000
self._attr_media_artist = self._bridge.player.artist
self._attr_media_title = self._bridge.player.title
self._attr_media_album_name = self._bridge.player.album
elif self._bridge.player.status == PlayingStatus.STOPPED:
self._attr_media_position = None
self._attr_media_position_updated_at = None
self._attr_media_artist = None
self._attr_media_title = None
self._attr_media_album_name = None

View file

@ -0,0 +1,26 @@
{
"config": {
"flow_title": "{name}",
"step": {
"user": {
"data": {
"host": "[%key:common::config_flow::data::host%]"
},
"data_description": {
"host": "The hostname or IP address of the LinkPlay device."
}
},
"discovery_confirm": {
"description": "Do you want to setup {name}?"
}
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"already_in_progress": "[%key:common::config_flow::abort::already_in_progress%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
}
}
}

View file

@ -0,0 +1,20 @@
"""Utilities for the LinkPlay component."""
from typing import Final
MANUFACTURER_ARTSOUND: Final[str] = "ArtSound"
MANUFACTURER_GENERIC: Final[str] = "Generic"
MODELS_ARTSOUND_SMART_ZONE4: Final[str] = "Smart Zone 4 AMP"
MODELS_ARTSOUND_SMART_HYDE: Final[str] = "Smart Hyde"
MODELS_GENERIC: Final[str] = "Generic"
def get_info_from_project(project: str) -> tuple[str, str]:
"""Get manufacturer and model info based on given project."""
match project:
case "SMART_ZONE4_AMP":
return MANUFACTURER_ARTSOUND, MODELS_ARTSOUND_SMART_ZONE4
case "SMART_HYDE":
return MANUFACTURER_ARTSOUND, MODELS_ARTSOUND_SMART_HYDE
case _:
return MANUFACTURER_GENERIC, MODELS_GENERIC

View file

@ -312,6 +312,7 @@ FLOWS = {
"lidarr",
"lifx",
"linear_garage_door",
"linkplay",
"litejet",
"litterrobot",
"livisi",

View file

@ -3268,6 +3268,12 @@
"config_flow": true,
"iot_class": "cloud_polling"
},
"linkplay": {
"name": "LinkPlay",
"integration_type": "hub",
"config_flow": true,
"iot_class": "local_polling"
},
"linksys_smart": {
"name": "Linksys Smart Wi-Fi",
"integration_type": "hub",

View file

@ -589,6 +589,11 @@ ZEROCONF = {
"name": "gateway*",
},
],
"_linkplay._tcp.local.": [
{
"domain": "linkplay",
},
],
"_lookin._tcp.local.": [
{
"domain": "lookin",

View file

@ -2556,6 +2556,16 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.linkplay.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.litejet.*]
check_untyped_defs = true
disallow_incomplete_defs = true

View file

@ -2285,6 +2285,9 @@ python-juicenet==1.1.0
# homeassistant.components.tplink
python-kasa[speedups]==0.7.0.5
# homeassistant.components.linkplay
python-linkplay==0.0.5
# homeassistant.components.lirc
# python-lirc==1.2.3

View file

@ -1797,6 +1797,9 @@ python-juicenet==1.1.0
# homeassistant.components.tplink
python-kasa[speedups]==0.7.0.5
# homeassistant.components.linkplay
python-linkplay==0.0.5
# homeassistant.components.matter
python-matter-server==6.3.0

View file

@ -0,0 +1 @@
"""Tests for the LinkPlay integration."""

View file

@ -0,0 +1,40 @@
"""Test configuration and mocks for LinkPlay component."""
from collections.abc import Generator
from unittest.mock import AsyncMock, patch
from linkplay.bridge import LinkPlayBridge, LinkPlayDevice
import pytest
HOST = "10.0.0.150"
HOST_REENTRY = "10.0.0.66"
UUID = "FF31F09E-5001-FBDE-0546-2DBFFF31F09E"
NAME = "Smart Zone 1_54B9"
@pytest.fixture
def mock_linkplay_factory_bridge() -> Generator[AsyncMock]:
"""Mock for linkplay_factory_bridge."""
with (
patch(
"homeassistant.components.linkplay.config_flow.linkplay_factory_bridge"
) as factory,
):
bridge = AsyncMock(spec=LinkPlayBridge)
bridge.endpoint = HOST
bridge.device = AsyncMock(spec=LinkPlayDevice)
bridge.device.uuid = UUID
bridge.device.name = NAME
factory.return_value = bridge
yield factory
@pytest.fixture
def mock_setup_entry() -> Generator[AsyncMock]:
"""Override async_setup_entry."""
with patch(
"homeassistant.components.linkplay.async_setup_entry",
return_value=True,
) as mock_setup_entry:
yield mock_setup_entry

View file

@ -0,0 +1,204 @@
"""Tests for the LinkPlay config flow."""
from ipaddress import ip_address
from unittest.mock import AsyncMock
from homeassistant.components.linkplay.const import DOMAIN
from homeassistant.components.zeroconf import ZeroconfServiceInfo
from homeassistant.config_entries import SOURCE_USER, SOURCE_ZEROCONF
from homeassistant.const import CONF_HOST
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from .conftest import HOST, HOST_REENTRY, NAME, UUID
from tests.common import MockConfigEntry
ZEROCONF_DISCOVERY = ZeroconfServiceInfo(
ip_address=ip_address(HOST),
ip_addresses=[ip_address(HOST)],
hostname=f"{NAME}.local.",
name=f"{NAME}._linkplay._tcp.local.",
port=59152,
type="_linkplay._tcp.local.",
properties={
"uuid": f"uuid:{UUID}",
"mac": "00:2F:69:01:84:3A",
"security": "https 2.0",
"upnp": "1.0.0",
"bootid": "1f347886-1dd2-11b2-86ab-aa0cd2803583",
},
)
ZEROCONF_DISCOVERY_RE_ENTRY = ZeroconfServiceInfo(
ip_address=ip_address(HOST_REENTRY),
ip_addresses=[ip_address(HOST_REENTRY)],
hostname=f"{NAME}.local.",
name=f"{NAME}._linkplay._tcp.local.",
port=59152,
type="_linkplay._tcp.local.",
properties={
"uuid": f"uuid:{UUID}",
"mac": "00:2F:69:01:84:3A",
"security": "https 2.0",
"upnp": "1.0.0",
"bootid": "1f347886-1dd2-11b2-86ab-aa0cd2803583",
},
)
async def test_user_flow(
hass: HomeAssistant,
mock_linkplay_factory_bridge: AsyncMock,
mock_setup_entry: AsyncMock,
) -> None:
"""Test user setup config flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_HOST: HOST},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == NAME
assert result["data"] == {
CONF_HOST: HOST,
}
assert result["result"].unique_id == UUID
async def test_user_flow_re_entry(
hass: HomeAssistant,
mock_linkplay_factory_bridge: AsyncMock,
mock_setup_entry: AsyncMock,
) -> None:
"""Test user setup config flow when an entry with the same unique id already exists."""
# Create mock entry which already has the same UUID
entry = MockConfigEntry(
data={CONF_HOST: HOST},
domain=DOMAIN,
title=NAME,
unique_id=UUID,
)
entry.add_to_hass(hass)
# Re-create entry with different host
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_HOST: HOST_REENTRY},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_zeroconf_flow(
hass: HomeAssistant,
mock_linkplay_factory_bridge: AsyncMock,
mock_setup_entry: AsyncMock,
) -> None:
"""Test Zeroconf flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data=ZEROCONF_DISCOVERY,
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "discovery_confirm"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == NAME
assert result["data"] == {
CONF_HOST: HOST,
}
assert result["result"].unique_id == UUID
async def test_zeroconf_flow_re_entry(
hass: HomeAssistant,
mock_linkplay_factory_bridge: AsyncMock,
mock_setup_entry: AsyncMock,
) -> None:
"""Test Zeroconf flow when an entry with the same unique id already exists."""
# Create mock entry which already has the same UUID
entry = MockConfigEntry(
data={CONF_HOST: HOST},
domain=DOMAIN,
title=NAME,
unique_id=UUID,
)
entry.add_to_hass(hass)
# Re-create entry with different host
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data=ZEROCONF_DISCOVERY_RE_ENTRY,
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_flow_errors(
hass: HomeAssistant,
mock_linkplay_factory_bridge: AsyncMock,
mock_setup_entry: AsyncMock,
) -> None:
"""Test flow when the device cannot be reached."""
# Temporarily store bridge in a separate variable and set factory to return None
bridge = mock_linkplay_factory_bridge.return_value
mock_linkplay_factory_bridge.return_value = None
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_HOST: HOST},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": "cannot_connect"}
# Make linkplay_factory_bridge return a mock bridge again
mock_linkplay_factory_bridge.return_value = bridge
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_HOST: HOST},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == NAME
assert result["data"] == {
CONF_HOST: HOST,
}
assert result["result"].unique_id == UUID