Add ws66i core integration (#56094)

* Add ws66i core integration

* Remove all ws66i translations

* Update ws66i unit tests to meet minimum code coverage

* Update ws66i based on @bdraco review

* General improvements after 2nd PR review

* Disable entities if amp shutoff, set default source names, set 30sec polling

* Add _attr_ and change async_on_unload

* Improve entity generation

* Implement coordinator

* Made options fields required, retry connection on failed attempts, use ZoneStatus for attributes

* Refactor WS66i entity properties, raise HomeAssistantError on restore service if no snapshot

* Update to pyws66i v1.1

* Add quality scale of silver to manifest

* Update config_flow test
This commit is contained in:
Shawn Saenger 2022-05-08 15:52:39 -06:00 committed by GitHub
parent 0b25b44820
commit 5e737bfe4f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 1538 additions and 0 deletions

View file

@ -1172,6 +1172,8 @@ build.json @home-assistant/supervisor
/tests/components/workday/ @fabaff
/homeassistant/components/worldclock/ @fabaff
/tests/components/worldclock/ @fabaff
/homeassistant/components/ws66i/ @ssaenger
/tests/components/ws66i/ @ssaenger
/homeassistant/components/xbox/ @hunterjm
/tests/components/xbox/ @hunterjm
/homeassistant/components/xbox_live/ @MartinHjelmare

View file

@ -0,0 +1,124 @@
"""The Soundavo WS66i 6-Zone Amplifier integration."""
from __future__ import annotations
import logging
from pyws66i import WS66i, get_ws66i
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_IP_ADDRESS, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
from .const import CONF_SOURCES, DOMAIN
from .coordinator import Ws66iDataUpdateCoordinator
from .models import SourceRep, Ws66iData
_LOGGER = logging.getLogger(__name__)
PLATFORMS = ["media_player"]
@callback
def _get_sources_from_dict(data) -> SourceRep:
sources_config = data[CONF_SOURCES]
# Dict index to custom name
source_id_name = {int(index): name for index, name in sources_config.items()}
# Dict custom name to index
source_name_id = {v: k for k, v in source_id_name.items()}
# List of custom names
source_names = sorted(source_name_id.keys(), key=lambda v: source_name_id[v])
return SourceRep(source_id_name, source_name_id, source_names)
def _find_zones(hass: HomeAssistant, ws66i: WS66i) -> list[int]:
"""Generate zones list by searching for presence of zones."""
# Zones 11 - 16 are the master amp
# Zones 21,31 - 26,36 are the daisy-chained amps
zone_list = []
for amp_num in range(1, 4):
if amp_num > 1:
# Don't add entities that aren't present
status = ws66i.zone_status(amp_num * 10 + 1)
if status is None:
break
for zone_num in range(1, 7):
zone_id = (amp_num * 10) + zone_num
zone_list.append(zone_id)
_LOGGER.info("Detected %d amp(s)", amp_num - 1)
return zone_list
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Soundavo WS66i 6-Zone Amplifier from a config entry."""
# Get the source names from the options flow
options: dict[str, dict[str, str]]
options = {CONF_SOURCES: entry.options[CONF_SOURCES]}
# Get the WS66i object and open up a connection to it
ws66i = get_ws66i(entry.data[CONF_IP_ADDRESS])
try:
await hass.async_add_executor_job(ws66i.open)
except ConnectionError as err:
# Amplifier is probably turned off
raise ConfigEntryNotReady("Could not connect to WS66i Amp. Is it off?") from err
# Create the zone Representation dataclass
source_rep: SourceRep = _get_sources_from_dict(options)
# Create a list of discovered zones
zones = await hass.async_add_executor_job(_find_zones, hass, ws66i)
# Create the coordinator for the WS66i
coordinator: Ws66iDataUpdateCoordinator = Ws66iDataUpdateCoordinator(
hass,
ws66i,
zones,
)
# Fetch initial data, retry on failed poll
await coordinator.async_config_entry_first_refresh()
# Create the Ws66iData data class save it to hass
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = Ws66iData(
host_ip=entry.data[CONF_IP_ADDRESS],
device=ws66i,
sources=source_rep,
coordinator=coordinator,
zones=zones,
)
def shutdown(event):
"""Close the WS66i connection to the amplifier and save snapshots."""
ws66i.close()
entry.async_on_unload(entry.add_update_listener(_update_listener))
entry.async_on_unload(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
)
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
ws66i: WS66i = hass.data[DOMAIN][entry.entry_id].device
ws66i.close()
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
async def _update_listener(hass: HomeAssistant, entry: ConfigEntry):
"""Handle options update."""
await hass.config_entries.async_reload(entry.entry_id)

View file

@ -0,0 +1,146 @@
"""Config flow for WS66i 6-Zone Amplifier integration."""
import logging
from pyws66i import WS66i, get_ws66i
import voluptuous as vol
from homeassistant import config_entries, core, exceptions
from homeassistant.const import CONF_IP_ADDRESS
from .const import (
CONF_SOURCE_1,
CONF_SOURCE_2,
CONF_SOURCE_3,
CONF_SOURCE_4,
CONF_SOURCE_5,
CONF_SOURCE_6,
CONF_SOURCES,
DOMAIN,
INIT_OPTIONS_DEFAULT,
)
_LOGGER = logging.getLogger(__name__)
SOURCES = [
CONF_SOURCE_1,
CONF_SOURCE_2,
CONF_SOURCE_3,
CONF_SOURCE_4,
CONF_SOURCE_5,
CONF_SOURCE_6,
]
OPTIONS_SCHEMA = {vol.Optional(source): str for source in SOURCES}
DATA_SCHEMA = vol.Schema({vol.Required(CONF_IP_ADDRESS): str})
FIRST_ZONE = 11
@core.callback
def _sources_from_config(data):
sources_config = {
str(idx + 1): data.get(source) for idx, source in enumerate(SOURCES)
}
return {
index: name.strip()
for index, name in sources_config.items()
if (name is not None and name.strip() != "")
}
async def validate_input(hass: core.HomeAssistant, input_data):
"""Validate the user input allows us to connect.
Data has the keys from DATA_SCHEMA with values provided by the user.
"""
ws66i: WS66i = get_ws66i(input_data[CONF_IP_ADDRESS])
await hass.async_add_executor_job(ws66i.open)
# No exception. run a simple test to make sure we opened correct port
# Test on FIRST_ZONE because this zone will always be valid
ret_val = await hass.async_add_executor_job(ws66i.zone_status, FIRST_ZONE)
if ret_val is None:
ws66i.close()
raise ConnectionError("Not a valid WS66i connection")
# Validation done. No issues. Close the connection
ws66i.close()
# Return info that you want to store in the config entry.
return {CONF_IP_ADDRESS: input_data[CONF_IP_ADDRESS]}
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for WS66i 6-Zone Amplifier."""
VERSION = 1
async def async_step_user(self, user_input=None):
"""Handle the initial step."""
errors = {}
if user_input is not None:
try:
info = await validate_input(self.hass, user_input)
# Data is valid. Add default values for options flow.
return self.async_create_entry(
title="WS66i Amp",
data=info,
options={CONF_SOURCES: INIT_OPTIONS_DEFAULT},
)
except ConnectionError:
errors["base"] = "cannot_connect"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
return self.async_show_form(
step_id="user", data_schema=DATA_SCHEMA, errors=errors
)
@staticmethod
@core.callback
def async_get_options_flow(config_entry):
"""Define the config flow to handle options."""
return Ws66iOptionsFlowHandler(config_entry)
@core.callback
def _key_for_source(index, source, previous_sources):
key = vol.Required(
source, description={"suggested_value": previous_sources[str(index)]}
)
return key
class Ws66iOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle a WS66i options flow."""
def __init__(self, config_entry):
"""Initialize."""
self.config_entry = config_entry
async def async_step_init(self, user_input=None):
"""Manage the options."""
if user_input is not None:
return self.async_create_entry(
title="Source Names",
data={CONF_SOURCES: _sources_from_config(user_input)},
)
# Fill form with previous source names
previous_sources = self.config_entry.options[CONF_SOURCES]
options = {
_key_for_source(idx + 1, source, previous_sources): str
for idx, source in enumerate(SOURCES)
}
return self.async_show_form(
step_id="init",
data_schema=vol.Schema(options),
)
class CannotConnect(exceptions.HomeAssistantError):
"""Error to indicate we cannot connect."""

View file

@ -0,0 +1,24 @@
"""Constants for the Soundavo WS66i 6-Zone Amplifier Media Player component."""
DOMAIN = "ws66i"
CONF_SOURCES = "sources"
CONF_SOURCE_1 = "source_1"
CONF_SOURCE_2 = "source_2"
CONF_SOURCE_3 = "source_3"
CONF_SOURCE_4 = "source_4"
CONF_SOURCE_5 = "source_5"
CONF_SOURCE_6 = "source_6"
INIT_OPTIONS_DEFAULT = {
"1": "Source 1",
"2": "Source 2",
"3": "Source 3",
"4": "Source 4",
"5": "Source 5",
"6": "Source 6",
}
SERVICE_SNAPSHOT = "snapshot"
SERVICE_RESTORE = "restore"

View file

@ -0,0 +1,53 @@
"""Coordinator for WS66i."""
from __future__ import annotations
from datetime import timedelta
import logging
from pyws66i import WS66i, ZoneStatus
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
_LOGGER = logging.getLogger(__name__)
POLL_INTERVAL = timedelta(seconds=30)
class Ws66iDataUpdateCoordinator(DataUpdateCoordinator):
"""DataUpdateCoordinator to gather data for WS66i Zones."""
def __init__(
self,
hass: HomeAssistant,
my_api: WS66i,
zones: list[int],
) -> None:
"""Initialize DataUpdateCoordinator to gather data for specific zones."""
super().__init__(
hass,
_LOGGER,
name="WS66i",
update_interval=POLL_INTERVAL,
)
self._ws66i = my_api
self._zones = zones
def _update_all_zones(self) -> list[ZoneStatus]:
"""Fetch data for each of the zones."""
data = []
for zone_id in self._zones:
data_zone = self._ws66i.zone_status(zone_id)
if data_zone is None:
raise UpdateFailed(f"Failed to update zone {zone_id}")
data.append(data_zone)
# HA will call my entity's _handle_coordinator_update()
return data
async def _async_update_data(self) -> list[ZoneStatus]:
"""Fetch data for each of the zones."""
# HA will call my entity's _handle_coordinator_update()
# The data I pass back here can be accessed through coordinator.data.
return await self.hass.async_add_executor_job(self._update_all_zones)

View file

@ -0,0 +1,10 @@
{
"domain": "ws66i",
"name": "Soundavo WS66i 6-Zone Amplifier",
"documentation": "https://www.home-assistant.io/integrations/ws66i",
"requirements": ["pyws66i==1.1"],
"codeowners": ["@ssaenger"],
"config_flow": true,
"quality_scale": "silver",
"iot_class": "local_polling"
}

View file

@ -0,0 +1,213 @@
"""Support for interfacing with WS66i 6 zone home audio controller."""
from copy import deepcopy
import logging
from pyws66i import WS66i, ZoneStatus
from homeassistant.components.media_player import MediaPlayerEntity
from homeassistant.components.media_player.const import (
SUPPORT_SELECT_SOURCE,
SUPPORT_TURN_OFF,
SUPPORT_TURN_ON,
SUPPORT_VOLUME_MUTE,
SUPPORT_VOLUME_SET,
SUPPORT_VOLUME_STEP,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import (
AddEntitiesCallback,
async_get_current_platform,
)
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN, SERVICE_RESTORE, SERVICE_SNAPSHOT
from .coordinator import Ws66iDataUpdateCoordinator
from .models import Ws66iData
_LOGGER = logging.getLogger(__name__)
PARALLEL_UPDATES = 1
SUPPORT_WS66I = (
SUPPORT_VOLUME_MUTE
| SUPPORT_VOLUME_SET
| SUPPORT_VOLUME_STEP
| SUPPORT_TURN_ON
| SUPPORT_TURN_OFF
| SUPPORT_SELECT_SOURCE
)
MAX_VOL = 38
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the WS66i 6-zone amplifier platform from a config entry."""
ws66i_data: Ws66iData = hass.data[DOMAIN][config_entry.entry_id]
# Build and add the entities from the data class
async_add_entities(
Ws66iZone(
device=ws66i_data.device,
ws66i_data=ws66i_data,
entry_id=config_entry.entry_id,
zone_id=zone_id,
data_idx=idx,
coordinator=ws66i_data.coordinator,
)
for idx, zone_id in enumerate(ws66i_data.zones)
)
# Set up services
platform = async_get_current_platform()
platform.async_register_entity_service(
SERVICE_SNAPSHOT,
{},
"snapshot",
)
platform.async_register_entity_service(
SERVICE_RESTORE,
{},
"async_restore",
)
class Ws66iZone(CoordinatorEntity, MediaPlayerEntity):
"""Representation of a WS66i amplifier zone."""
def __init__(
self,
device: WS66i,
ws66i_data: Ws66iData,
entry_id: str,
zone_id: int,
data_idx: int,
coordinator: Ws66iDataUpdateCoordinator,
) -> None:
"""Initialize a zone entity."""
super().__init__(coordinator)
self._ws66i: WS66i = device
self._ws66i_data: Ws66iData = ws66i_data
self._zone_id: int = zone_id
self._zone_id_idx: int = data_idx
self._coordinator = coordinator
self._snapshot: ZoneStatus = None
self._status: ZoneStatus = coordinator.data[data_idx]
self._attr_source_list = ws66i_data.sources.name_list
self._attr_unique_id = f"{entry_id}_{self._zone_id}"
self._attr_name = f"Zone {self._zone_id}"
self._attr_supported_features = SUPPORT_WS66I
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, str(self.unique_id))},
name=self.name,
manufacturer="Soundavo",
model="WS66i 6-Zone Amplifier",
)
self._set_attrs_from_status()
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
# This will be called for each of the entities after the coordinator
# finishes executing _async_update_data()
# Save a reference to the zone status that this entity represents
self._status = self.coordinator.data[self._zone_id_idx]
self._set_attrs_from_status()
# Parent will notify HA of the update
super()._handle_coordinator_update()
@callback
def _set_attrs_from_status(self) -> None:
status = self._status
sources = self._ws66i_data.sources.id_name
self._attr_state = STATE_ON if status.power else STATE_OFF
self._attr_volume_level = status.volume / float(MAX_VOL)
self._attr_is_volume_muted = status.mute
self._attr_source = self._attr_media_title = sources[status.source]
@callback
def _async_update_attrs_write_ha_state(self) -> None:
self._set_attrs_from_status()
self.async_write_ha_state()
@callback
def snapshot(self):
"""Save zone's current state."""
self._snapshot = deepcopy(self._status)
async def async_restore(self):
"""Restore saved state."""
if not self._snapshot:
raise HomeAssistantError("There is no snapshot to restore")
await self.hass.async_add_executor_job(self._ws66i.restore_zone, self._snapshot)
self._status = self._snapshot
self._async_update_attrs_write_ha_state()
async def async_select_source(self, source):
"""Set input source."""
idx = self._ws66i_data.sources.name_id[source]
await self.hass.async_add_executor_job(
self._ws66i.set_source, self._zone_id, idx
)
self._status.source = idx
self._async_update_attrs_write_ha_state()
async def async_turn_on(self):
"""Turn the media player on."""
await self.hass.async_add_executor_job(
self._ws66i.set_power, self._zone_id, True
)
self._status.power = True
self._async_update_attrs_write_ha_state()
async def async_turn_off(self):
"""Turn the media player off."""
await self.hass.async_add_executor_job(
self._ws66i.set_power, self._zone_id, False
)
self._status.power = False
self._async_update_attrs_write_ha_state()
async def async_mute_volume(self, mute):
"""Mute (true) or unmute (false) media player."""
await self.hass.async_add_executor_job(
self._ws66i.set_mute, self._zone_id, mute
)
self._status.mute = bool(mute)
self._async_update_attrs_write_ha_state()
async def async_set_volume_level(self, volume):
"""Set volume level, range 0..1."""
await self.hass.async_add_executor_job(
self._ws66i.set_volume, self._zone_id, int(volume * MAX_VOL)
)
self._status.volume = int(volume * MAX_VOL)
self._async_update_attrs_write_ha_state()
async def async_volume_up(self):
"""Volume up the media player."""
await self.hass.async_add_executor_job(
self._ws66i.set_volume, self._zone_id, min(self._status.volume + 1, MAX_VOL)
)
self._status.volume = min(self._status.volume + 1, MAX_VOL)
self._async_update_attrs_write_ha_state()
async def async_volume_down(self):
"""Volume down media player."""
await self.hass.async_add_executor_job(
self._ws66i.set_volume, self._zone_id, max(self._status.volume - 1, 0)
)
self._status.volume = max(self._status.volume - 1, 0)
self._async_update_attrs_write_ha_state()

View file

@ -0,0 +1,30 @@
"""The ws66i integration models."""
from __future__ import annotations
from dataclasses import dataclass
from pyws66i import WS66i
from .coordinator import Ws66iDataUpdateCoordinator
# A dataclass is basically a struct in C/C++
@dataclass
class SourceRep:
"""Different representations of the amp sources."""
id_name: dict[int, str]
name_id: dict[str, int]
name_list: list[str]
@dataclass
class Ws66iData:
"""Data for the ws66i integration."""
host_ip: str
device: WS66i
sources: SourceRep
coordinator: Ws66iDataUpdateCoordinator
zones: list[int]

View file

@ -0,0 +1,15 @@
snapshot:
name: Snapshot
description: Take a snapshot of the media player zone.
target:
entity:
integration: ws66i
domain: media_player
restore:
name: Restore
description: Restore a snapshot of the media player zone.
target:
entity:
integration: ws66i
domain: media_player

View file

@ -0,0 +1,34 @@
{
"config": {
"step": {
"user": {
"title": "Connect to the device",
"data": {
"ip_address": "[%key:common::config_flow::data::ip%]"
}
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
}
},
"options": {
"step": {
"init": {
"title": "Configure sources",
"data": {
"source_1": "Name of source #1",
"source_2": "Name of source #2",
"source_3": "Name of source #3",
"source_4": "Name of source #4",
"source_5": "Name of source #5",
"source_6": "Name of source #6"
}
}
}
}
}

View file

@ -0,0 +1,35 @@
{
"config": {
"abort": {
"already_configured": "Device is already configured"
},
"error": {
"cannot_connect": "Failed to connect",
"unknown": "Unexpected error"
},
"step": {
"user": {
"data": {
"ip_address": "IP Address"
},
"title": "Connect to the device"
}
}
},
"options": {
"step": {
"init": {
"data": {
"source_1": "Name of source #1",
"source_2": "Name of source #2",
"source_3": "Name of source #3",
"source_4": "Name of source #4",
"source_5": "Name of source #5",
"source_6": "Name of source #6"
},
"title": "Configure sources"
}
}
}
}

View file

@ -400,6 +400,7 @@ FLOWS = {
"wiz",
"wled",
"wolflink",
"ws66i",
"xbox",
"xiaomi_aqara",
"xiaomi_miio",

View file

@ -2022,6 +2022,9 @@ pywilight==0.0.70
# homeassistant.components.wiz
pywizlight==0.5.13
# homeassistant.components.ws66i
pyws66i==1.1
# homeassistant.components.xeoma
pyxeoma==1.4.1

View file

@ -1336,6 +1336,9 @@ pywilight==0.0.70
# homeassistant.components.wiz
pywizlight==0.5.13
# homeassistant.components.ws66i
pyws66i==1.1
# homeassistant.components.zerproc
pyzerproc==0.4.8

View file

@ -0,0 +1 @@
"""Tests for the ws66i component."""

View file

@ -0,0 +1,152 @@
"""Test the WS66i 6-Zone Amplifier config flow."""
from unittest.mock import patch
from homeassistant import config_entries, data_entry_flow, setup
from homeassistant.components.ws66i.const import (
CONF_SOURCE_1,
CONF_SOURCE_2,
CONF_SOURCE_3,
CONF_SOURCE_4,
CONF_SOURCE_5,
CONF_SOURCE_6,
CONF_SOURCES,
DOMAIN,
INIT_OPTIONS_DEFAULT,
)
from homeassistant.const import CONF_IP_ADDRESS
from tests.common import MockConfigEntry
from tests.components.ws66i.test_media_player import AttrDict
CONFIG = {CONF_IP_ADDRESS: "1.1.1.1"}
async def test_form(hass):
"""Test we get the form."""
await setup.async_setup_component(hass, "persistent_notification", {})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["errors"] == {}
with patch(
"homeassistant.components.ws66i.config_flow.get_ws66i",
) as mock_ws66i, patch(
"homeassistant.components.ws66i.async_setup_entry",
return_value=True,
) as mock_setup_entry:
ws66i_instance = mock_ws66i.return_value
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], CONFIG
)
await hass.async_block_till_done()
ws66i_instance.open.assert_called_once()
ws66i_instance.close.assert_called_once()
assert result2["type"] == "create_entry"
assert result2["title"] == "WS66i Amp"
assert result2["data"] == {CONF_IP_ADDRESS: CONFIG[CONF_IP_ADDRESS]}
assert len(mock_setup_entry.mock_calls) == 1
async def test_form_cannot_connect(hass):
"""Test cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch("homeassistant.components.ws66i.config_flow.get_ws66i") as mock_ws66i:
ws66i_instance = mock_ws66i.return_value
ws66i_instance.open.side_effect = ConnectionError
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], CONFIG
)
assert result2["type"] == "form"
assert result2["errors"] == {"base": "cannot_connect"}
async def test_form_wrong_ip(hass):
"""Test cannot connect error with bad IP."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch("homeassistant.components.ws66i.config_flow.get_ws66i") as mock_ws66i:
ws66i_instance = mock_ws66i.return_value
ws66i_instance.zone_status.return_value = None
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], CONFIG
)
assert result2["type"] == "form"
assert result2["errors"] == {"base": "cannot_connect"}
async def test_generic_exception(hass):
"""Test generic exception."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch("homeassistant.components.ws66i.config_flow.get_ws66i") as mock_ws66i:
ws66i_instance = mock_ws66i.return_value
ws66i_instance.open.side_effect = Exception
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], CONFIG
)
assert result2["type"] == "form"
assert result2["errors"] == {"base": "unknown"}
async def test_options_flow(hass):
"""Test config flow options."""
conf = {CONF_IP_ADDRESS: "1.1.1.1", CONF_SOURCES: INIT_OPTIONS_DEFAULT}
config_entry = MockConfigEntry(
domain=DOMAIN,
data=conf,
options={CONF_SOURCES: INIT_OPTIONS_DEFAULT},
)
config_entry.add_to_hass(hass)
with patch("homeassistant.components.ws66i.get_ws66i") as mock_ws66i:
ws66i_instance = mock_ws66i.return_value
ws66i_instance.zone_status.return_value = AttrDict(
power=True, volume=0, mute=True, source=1, treble=0, bass=0, balance=10
)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_SOURCE_1: "one",
CONF_SOURCE_2: "too",
CONF_SOURCE_3: "tree",
CONF_SOURCE_4: "for",
CONF_SOURCE_5: "feeve",
CONF_SOURCE_6: "roku",
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert config_entry.options[CONF_SOURCES] == {
"1": "one",
"2": "too",
"3": "tree",
"4": "for",
"5": "feeve",
"6": "roku",
}

View file

@ -0,0 +1,692 @@
"""The tests for WS66i Media player platform."""
from collections import defaultdict
from unittest.mock import patch
import pytest
from homeassistant.components.media_player.const import (
ATTR_INPUT_SOURCE,
ATTR_INPUT_SOURCE_LIST,
ATTR_MEDIA_VOLUME_LEVEL,
DOMAIN as MEDIA_PLAYER_DOMAIN,
SERVICE_SELECT_SOURCE,
SUPPORT_SELECT_SOURCE,
SUPPORT_TURN_OFF,
SUPPORT_TURN_ON,
SUPPORT_VOLUME_MUTE,
SUPPORT_VOLUME_SET,
SUPPORT_VOLUME_STEP,
)
from homeassistant.components.ws66i.const import (
CONF_SOURCES,
DOMAIN,
INIT_OPTIONS_DEFAULT,
SERVICE_RESTORE,
SERVICE_SNAPSHOT,
)
from homeassistant.const import (
CONF_IP_ADDRESS,
SERVICE_TURN_OFF,
SERVICE_TURN_ON,
SERVICE_VOLUME_DOWN,
SERVICE_VOLUME_MUTE,
SERVICE_VOLUME_SET,
SERVICE_VOLUME_UP,
STATE_ON,
STATE_UNAVAILABLE,
)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry as er
from tests.common import MockConfigEntry
MOCK_SOURCE_DIC = {
"1": "one",
"2": "two",
"3": "three",
"4": "four",
"5": "five",
"6": "six",
}
MOCK_CONFIG = {CONF_IP_ADDRESS: "fake ip"}
MOCK_OPTIONS = {CONF_SOURCES: MOCK_SOURCE_DIC}
MOCK_DEFAULT_OPTIONS = {CONF_SOURCES: INIT_OPTIONS_DEFAULT}
ZONE_1_ID = "media_player.zone_11"
ZONE_2_ID = "media_player.zone_12"
ZONE_7_ID = "media_player.zone_21"
class AttrDict(dict):
"""Helper class for mocking attributes."""
def __setattr__(self, name, value):
"""Set attribute."""
self[name] = value
def __getattr__(self, item):
"""Get attribute."""
try:
return self[item]
except KeyError as err:
# The reason for doing this is because of the deepcopy in my code
raise AttributeError(item) from err
class MockWs66i:
"""Mock for pyws66i object."""
def __init__(self, fail_open=False, fail_zone_check=None):
"""Init mock object."""
self.zones = defaultdict(
lambda: AttrDict(
power=True, volume=0, mute=True, source=1, treble=0, bass=0, balance=10
)
)
self.fail_open = fail_open
self.fail_zone_check = fail_zone_check
def open(self):
"""Open socket. Do nothing."""
if self.fail_open is True:
raise ConnectionError()
def close(self):
"""Close socket. Do nothing."""
def zone_status(self, zone_id):
"""Get zone status."""
if self.fail_zone_check is not None and zone_id in self.fail_zone_check:
return None
status = self.zones[zone_id]
status.zone = zone_id
return AttrDict(status)
def set_source(self, zone_id, source_idx):
"""Set source for zone."""
self.zones[zone_id].source = source_idx
def set_power(self, zone_id, power):
"""Turn zone on/off."""
self.zones[zone_id].power = power
def set_mute(self, zone_id, mute):
"""Mute/unmute zone."""
self.zones[zone_id].mute = mute
def set_volume(self, zone_id, volume):
"""Set volume for zone."""
self.zones[zone_id].volume = volume
def restore_zone(self, zone):
"""Restore zone status."""
self.zones[zone.zone] = AttrDict(zone)
async def test_setup_success(hass):
"""Test connection success."""
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: MockWs66i(),
):
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert hass.states.get(ZONE_1_ID) is not None
async def _setup_ws66i(hass, ws66i) -> MockConfigEntry:
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: ws66i,
):
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_DEFAULT_OPTIONS
)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return config_entry
async def _setup_ws66i_with_options(hass, ws66i) -> MockConfigEntry:
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: ws66i,
):
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return config_entry
async def _call_media_player_service(hass, name, data):
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, name, service_data=data, blocking=True
)
async def _call_ws66i_service(hass, name, data):
await hass.services.async_call(DOMAIN, name, service_data=data, blocking=True)
async def test_cannot_connect(hass):
"""Test connection error."""
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: MockWs66i(fail_open=True),
):
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert hass.states.get(ZONE_1_ID) is None
async def test_cannot_connect_2(hass):
"""Test connection error pt 2."""
# Another way to test same case as test_cannot_connect
ws66i = MockWs66i()
with patch.object(MockWs66i, "open", side_effect=ConnectionError):
await _setup_ws66i(hass, ws66i)
assert hass.states.get(ZONE_1_ID) is None
async def test_service_calls_with_entity_id(hass):
"""Test snapshot save/restore service calls."""
_ = await _setup_ws66i_with_options(hass, MockWs66i())
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
# Saving existing values
await _call_ws66i_service(hass, SERVICE_SNAPSHOT, {"entity_id": ZONE_1_ID})
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "three"}
)
await hass.async_block_till_done()
# Restoring other media player to its previous state
# The zone should not be restored
with pytest.raises(HomeAssistantError):
await _call_ws66i_service(hass, SERVICE_RESTORE, {"entity_id": ZONE_2_ID})
await hass.async_block_till_done()
# Checking that values were not (!) restored
state = hass.states.get(ZONE_1_ID)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 1.0
assert state.attributes[ATTR_INPUT_SOURCE] == "three"
# Restoring media player to its previous state
await _call_ws66i_service(hass, SERVICE_RESTORE, {"entity_id": ZONE_1_ID})
await hass.async_block_till_done()
state = hass.states.get(ZONE_1_ID)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 0.0
assert state.attributes[ATTR_INPUT_SOURCE] == "one"
async def test_service_calls_with_all_entities(hass):
"""Test snapshot save/restore service calls with entity id all."""
_ = await _setup_ws66i_with_options(hass, MockWs66i())
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
# Saving existing values
await _call_ws66i_service(hass, SERVICE_SNAPSHOT, {"entity_id": "all"})
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "three"}
)
# await coordinator.async_refresh()
# await hass.async_block_till_done()
# Restoring media player to its previous state
await _call_ws66i_service(hass, SERVICE_RESTORE, {"entity_id": "all"})
await hass.async_block_till_done()
state = hass.states.get(ZONE_1_ID)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 0.0
assert state.attributes[ATTR_INPUT_SOURCE] == "one"
async def test_service_calls_without_relevant_entities(hass):
"""Test snapshot save/restore service calls with bad entity id."""
config_entry = await _setup_ws66i_with_options(hass, MockWs66i())
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
ws66i_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = ws66i_data.coordinator
await coordinator.async_refresh()
await hass.async_block_till_done()
# Saving existing values
await _call_ws66i_service(hass, SERVICE_SNAPSHOT, {"entity_id": "all"})
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "three"}
)
await coordinator.async_refresh()
await hass.async_block_till_done()
# Restoring media player to its previous state
await _call_ws66i_service(hass, SERVICE_RESTORE, {"entity_id": "light.demo"})
await hass.async_block_till_done()
state = hass.states.get(ZONE_1_ID)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 1.0
assert state.attributes[ATTR_INPUT_SOURCE] == "three"
async def test_restore_without_snapshot(hass):
"""Test restore when snapshot wasn't called."""
await _setup_ws66i(hass, MockWs66i())
with patch.object(MockWs66i, "restore_zone") as method_call:
with pytest.raises(HomeAssistantError):
await _call_ws66i_service(hass, SERVICE_RESTORE, {"entity_id": ZONE_1_ID})
await hass.async_block_till_done()
assert not method_call.called
async def test_update(hass):
"""Test updating values from ws66i."""
ws66i = MockWs66i()
config_entry = await _setup_ws66i_with_options(hass, ws66i)
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
ws66i.set_source(11, 3)
ws66i.set_volume(11, 38)
ws66i_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = ws66i_data.coordinator
with patch.object(MockWs66i, "open") as method_call:
await coordinator.async_refresh()
await hass.async_block_till_done()
assert not method_call.called
state = hass.states.get(ZONE_1_ID)
assert hass.states.is_state(ZONE_1_ID, STATE_ON)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 1.0
assert state.attributes[ATTR_INPUT_SOURCE] == "three"
async def test_failed_update(hass):
"""Test updating failure from ws66i."""
ws66i = MockWs66i()
config_entry = await _setup_ws66i_with_options(hass, ws66i)
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
ws66i.set_source(11, 3)
ws66i.set_volume(11, 38)
ws66i_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = ws66i_data.coordinator
await coordinator.async_refresh()
await hass.async_block_till_done()
# Failed update, close called
with patch.object(MockWs66i, "zone_status", return_value=None):
await coordinator.async_refresh()
await hass.async_block_till_done()
assert hass.states.is_state(ZONE_1_ID, STATE_UNAVAILABLE)
# A connection re-attempt fails
with patch.object(MockWs66i, "zone_status", return_value=None):
await coordinator.async_refresh()
await hass.async_block_till_done()
# A connection re-attempt succeeds
await coordinator.async_refresh()
await hass.async_block_till_done()
# confirm entity is back on
state = hass.states.get(ZONE_1_ID)
assert hass.states.is_state(ZONE_1_ID, STATE_ON)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 1.0
assert state.attributes[ATTR_INPUT_SOURCE] == "three"
async def test_supported_features(hass):
"""Test supported features property."""
await _setup_ws66i(hass, MockWs66i())
state = hass.states.get(ZONE_1_ID)
assert (
SUPPORT_VOLUME_MUTE
| SUPPORT_VOLUME_SET
| SUPPORT_VOLUME_STEP
| SUPPORT_TURN_ON
| SUPPORT_TURN_OFF
| SUPPORT_SELECT_SOURCE
== state.attributes["supported_features"]
)
async def test_source_list(hass):
"""Test source list property."""
await _setup_ws66i(hass, MockWs66i())
state = hass.states.get(ZONE_1_ID)
# Note, the list is sorted!
assert state.attributes[ATTR_INPUT_SOURCE_LIST] == list(
INIT_OPTIONS_DEFAULT.values()
)
async def test_source_list_with_options(hass):
"""Test source list property."""
await _setup_ws66i_with_options(hass, MockWs66i())
state = hass.states.get(ZONE_1_ID)
# Note, the list is sorted!
assert state.attributes[ATTR_INPUT_SOURCE_LIST] == list(MOCK_SOURCE_DIC.values())
async def test_select_source(hass):
"""Test source selection methods."""
ws66i = MockWs66i()
await _setup_ws66i_with_options(hass, ws66i)
await _call_media_player_service(
hass,
SERVICE_SELECT_SOURCE,
{"entity_id": ZONE_1_ID, ATTR_INPUT_SOURCE: "three"},
)
assert ws66i.zones[11].source == 3
async def test_source_select(hass):
"""Test behavior when device has unknown source."""
ws66i = MockWs66i()
config_entry = await _setup_ws66i_with_options(hass, ws66i)
ws66i.set_source(11, 5)
ws66i_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = ws66i_data.coordinator
await coordinator.async_refresh()
await hass.async_block_till_done()
state = hass.states.get(ZONE_1_ID)
assert state.attributes.get(ATTR_INPUT_SOURCE) == "five"
async def test_turn_on_off(hass):
"""Test turning on the zone."""
ws66i = MockWs66i()
await _setup_ws66i(hass, ws66i)
await _call_media_player_service(hass, SERVICE_TURN_OFF, {"entity_id": ZONE_1_ID})
assert not ws66i.zones[11].power
await _call_media_player_service(hass, SERVICE_TURN_ON, {"entity_id": ZONE_1_ID})
assert ws66i.zones[11].power
async def test_mute_volume(hass):
"""Test mute functionality."""
ws66i = MockWs66i()
await _setup_ws66i(hass, ws66i)
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.5}
)
await _call_media_player_service(
hass, SERVICE_VOLUME_MUTE, {"entity_id": ZONE_1_ID, "is_volume_muted": False}
)
assert not ws66i.zones[11].mute
await _call_media_player_service(
hass, SERVICE_VOLUME_MUTE, {"entity_id": ZONE_1_ID, "is_volume_muted": True}
)
assert ws66i.zones[11].mute
async def test_volume_up_down(hass):
"""Test increasing volume by one."""
ws66i = MockWs66i()
config_entry = await _setup_ws66i(hass, ws66i)
ws66i_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = ws66i_data.coordinator
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
assert ws66i.zones[11].volume == 0
await _call_media_player_service(
hass, SERVICE_VOLUME_DOWN, {"entity_id": ZONE_1_ID}
)
await coordinator.async_refresh()
await hass.async_block_till_done()
# should not go below zero
assert ws66i.zones[11].volume == 0
await _call_media_player_service(hass, SERVICE_VOLUME_UP, {"entity_id": ZONE_1_ID})
await coordinator.async_refresh()
await hass.async_block_till_done()
assert ws66i.zones[11].volume == 1
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await coordinator.async_refresh()
await hass.async_block_till_done()
assert ws66i.zones[11].volume == 38
await _call_media_player_service(hass, SERVICE_VOLUME_UP, {"entity_id": ZONE_1_ID})
await coordinator.async_refresh()
await hass.async_block_till_done()
# should not go above 38
assert ws66i.zones[11].volume == 38
await _call_media_player_service(
hass, SERVICE_VOLUME_DOWN, {"entity_id": ZONE_1_ID}
)
assert ws66i.zones[11].volume == 37
async def test_first_run_with_available_zones(hass):
"""Test first run with all zones available."""
ws66i = MockWs66i()
await _setup_ws66i(hass, ws66i)
registry = er.async_get(hass)
entry = registry.async_get(ZONE_7_ID)
assert not entry.disabled
async def test_first_run_with_failing_zones(hass):
"""Test first run with failed zones."""
ws66i = MockWs66i()
with patch.object(MockWs66i, "zone_status", return_value=None):
await _setup_ws66i(hass, ws66i)
registry = er.async_get(hass)
entry = registry.async_get(ZONE_1_ID)
assert entry is None
entry = registry.async_get(ZONE_7_ID)
assert entry is None
async def test_register_all_entities(hass):
"""Test run with all entities registered."""
ws66i = MockWs66i()
await _setup_ws66i(hass, ws66i)
registry = er.async_get(hass)
entry = registry.async_get(ZONE_1_ID)
assert not entry.disabled
entry = registry.async_get(ZONE_7_ID)
assert not entry.disabled
async def test_register_entities_in_1_amp_only(hass):
"""Test run with only zones 11-16 registered."""
ws66i = MockWs66i(fail_zone_check=[21])
await _setup_ws66i(hass, ws66i)
registry = er.async_get(hass)
entry = registry.async_get(ZONE_1_ID)
assert not entry.disabled
entry = registry.async_get(ZONE_2_ID)
assert not entry.disabled
entry = registry.async_get(ZONE_7_ID)
assert entry is None
async def test_unload_config_entry(hass):
"""Test unloading config entry."""
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: MockWs66i(),
):
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert hass.data[DOMAIN][config_entry.entry_id]
with patch.object(MockWs66i, "close") as method_call:
await config_entry.async_unload(hass)
await hass.async_block_till_done()
assert method_call.called
assert not hass.data[DOMAIN]
async def test_restore_snapshot_on_reconnect(hass):
"""Test restoring a saved snapshot when reconnecting to amp."""
ws66i = MockWs66i()
config_entry = await _setup_ws66i_with_options(hass, ws66i)
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
# Save a snapshot
await _call_ws66i_service(hass, SERVICE_SNAPSHOT, {"entity_id": ZONE_1_ID})
ws66i_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = ws66i_data.coordinator
# Failed update,
with patch.object(MockWs66i, "zone_status", return_value=None):
await coordinator.async_refresh()
await hass.async_block_till_done()
assert hass.states.is_state(ZONE_1_ID, STATE_UNAVAILABLE)
# A connection re-attempt succeeds
await coordinator.async_refresh()
await hass.async_block_till_done()
# confirm entity is back on
state = hass.states.get(ZONE_1_ID)
assert hass.states.is_state(ZONE_1_ID, STATE_ON)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 0.0
assert state.attributes[ATTR_INPUT_SOURCE] == "one"
# Change states
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "six"}
)
# Now confirm that the snapshot before the disconnect works
await _call_ws66i_service(hass, SERVICE_RESTORE, {"entity_id": ZONE_1_ID})
await hass.async_block_till_done()
state = hass.states.get(ZONE_1_ID)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 0.0
assert state.attributes[ATTR_INPUT_SOURCE] == "one"