Config Flow and Entity registry support for Monoprice (#30337)

* Entity registry support for monoprice

* Add test for unique_id

* Add unique id namespace to monoprice

* Config Flow for Monoprice

* Update monoprice tests

* Remove TODOs

* Handle entity unloading

* Fix update test

* Streamline entity handling in monoprice services

* Increase coverage

* Remove devices cache

* Async validation in monoprice config flow
This commit is contained in:
On Freund 2020-03-22 03:12:32 +02:00 committed by GitHub
parent 99d732b974
commit e8bd1b9216
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 614 additions and 454 deletions

View file

@ -1 +1,36 @@
"""The monoprice component."""
"""The Monoprice 6-Zone Amplifier integration."""
import asyncio
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
PLATFORMS = ["media_player"]
async def async_setup(hass: HomeAssistant, config: dict):
"""Set up the Monoprice 6-Zone Amplifier component."""
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Set up Monoprice 6-Zone Amplifier from a config entry."""
for component in PLATFORMS:
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, component)
)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Unload a config entry."""
unload_ok = all(
await asyncio.gather(
*[
hass.config_entries.async_forward_entry_unload(entry, component)
for component in PLATFORMS
]
)
)
return unload_ok

View file

@ -0,0 +1,95 @@
"""Config flow for Monoprice 6-Zone Amplifier integration."""
import logging
from pymonoprice import get_async_monoprice
from serial import SerialException
import voluptuous as vol
from homeassistant import config_entries, core, exceptions
from homeassistant.const import CONF_PORT
from .const import (
CONF_SOURCE_1,
CONF_SOURCE_2,
CONF_SOURCE_3,
CONF_SOURCE_4,
CONF_SOURCE_5,
CONF_SOURCE_6,
CONF_SOURCES,
)
from .const import DOMAIN # pylint:disable=unused-import
_LOGGER = logging.getLogger(__name__)
DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_PORT): str,
vol.Optional(CONF_SOURCE_1): str,
vol.Optional(CONF_SOURCE_2): str,
vol.Optional(CONF_SOURCE_3): str,
vol.Optional(CONF_SOURCE_4): str,
vol.Optional(CONF_SOURCE_5): str,
vol.Optional(CONF_SOURCE_6): str,
}
)
async def validate_input(hass: core.HomeAssistant, data):
"""Validate the user input allows us to connect.
Data has the keys from DATA_SCHEMA with values provided by the user.
"""
try:
await get_async_monoprice(data[CONF_PORT], hass.loop)
except SerialException:
_LOGGER.error("Error connecting to Monoprice controller")
raise CannotConnect
sources_config = {
1: data.get(CONF_SOURCE_1),
2: data.get(CONF_SOURCE_2),
3: data.get(CONF_SOURCE_3),
4: data.get(CONF_SOURCE_4),
5: data.get(CONF_SOURCE_5),
6: data.get(CONF_SOURCE_6),
}
sources = {
index: name.strip()
for index, name in sources_config.items()
if (name is not None and name.strip() != "")
}
# Return info that you want to store in the config entry.
return {CONF_PORT: data[CONF_PORT], CONF_SOURCES: sources}
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Monoprice 6-Zone Amplifier."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL
async def async_step_user(self, user_input=None):
"""Handle the initial step."""
errors = {}
if user_input is not None:
try:
info = await validate_input(self.hass, user_input)
return self.async_create_entry(title=user_input[CONF_PORT], data=info)
except CannotConnect:
errors["base"] = "cannot_connect"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
return self.async_show_form(
step_id="user", data_schema=DATA_SCHEMA, errors=errors
)
class CannotConnect(exceptions.HomeAssistantError):
"""Error to indicate we cannot connect."""
class InvalidAuth(exceptions.HomeAssistantError):
"""Error to indicate there is invalid auth."""

View file

@ -1,5 +1,15 @@
"""Constants for the Monoprice 6-Zone Amplifier Media Player component."""
DOMAIN = "monoprice"
CONF_SOURCES = "sources"
CONF_SOURCE_1 = "source_1"
CONF_SOURCE_2 = "source_2"
CONF_SOURCE_3 = "source_3"
CONF_SOURCE_4 = "source_4"
CONF_SOURCE_5 = "source_5"
CONF_SOURCE_6 = "source_6"
SERVICE_SNAPSHOT = "snapshot"
SERVICE_RESTORE = "restore"

View file

@ -4,5 +4,6 @@
"documentation": "https://www.home-assistant.io/integrations/monoprice",
"requirements": ["pymonoprice==0.3"],
"dependencies": [],
"codeowners": ["@etsinko"]
"codeowners": ["@etsinko"],
"config_flow": true
}

View file

@ -3,9 +3,8 @@ import logging
from pymonoprice import get_monoprice
from serial import SerialException
import voluptuous as vol
from homeassistant.components.media_player import PLATFORM_SCHEMA, MediaPlayerDevice
from homeassistant.components.media_player import MediaPlayerDevice
from homeassistant.components.media_player.const import (
SUPPORT_SELECT_SOURCE,
SUPPORT_TURN_OFF,
@ -14,16 +13,10 @@ from homeassistant.components.media_player.const import (
SUPPORT_VOLUME_SET,
SUPPORT_VOLUME_STEP,
)
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_NAME,
CONF_PORT,
STATE_OFF,
STATE_ON,
)
import homeassistant.helpers.config_validation as cv
from homeassistant.const import CONF_PORT, STATE_OFF, STATE_ON
from homeassistant.helpers import config_validation as cv, entity_platform, service
from .const import DOMAIN, SERVICE_RESTORE, SERVICE_SNAPSHOT
from .const import CONF_SOURCES, DOMAIN, SERVICE_RESTORE, SERVICE_SNAPSHOT
_LOGGER = logging.getLogger(__name__)
@ -36,104 +29,89 @@ SUPPORT_MONOPRICE = (
| SUPPORT_SELECT_SOURCE
)
ZONE_SCHEMA = vol.Schema({vol.Required(CONF_NAME): cv.string})
SOURCE_SCHEMA = vol.Schema({vol.Required(CONF_NAME): cv.string})
def _get_sources(sources_config):
source_id_name = {int(index): name for index, name in sources_config.items()}
CONF_ZONES = "zones"
CONF_SOURCES = "sources"
source_name_id = {v: k for k, v in source_id_name.items()}
DATA_MONOPRICE = "monoprice"
source_names = sorted(source_name_id.keys(), key=lambda v: source_name_id[v])
# Valid zone ids: 11-16 or 21-26 or 31-36
ZONE_IDS = vol.All(
vol.Coerce(int),
vol.Any(
vol.Range(min=11, max=16), vol.Range(min=21, max=26), vol.Range(min=31, max=36)
),
)
# Valid source ids: 1-6
SOURCE_IDS = vol.All(vol.Coerce(int), vol.Range(min=1, max=6))
MEDIA_PLAYER_SCHEMA = vol.Schema({ATTR_ENTITY_ID: cv.comp_entity_ids})
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_PORT): cv.string,
vol.Required(CONF_ZONES): vol.Schema({ZONE_IDS: ZONE_SCHEMA}),
vol.Required(CONF_SOURCES): vol.Schema({SOURCE_IDS: SOURCE_SCHEMA}),
}
)
return [source_id_name, source_name_id, source_names]
def setup_platform(hass, config, add_entities, discovery_info=None):
async def async_setup_entry(hass, config_entry, async_add_devices):
"""Set up the Monoprice 6-zone amplifier platform."""
port = config.get(CONF_PORT)
port = config_entry.data.get(CONF_PORT)
try:
monoprice = get_monoprice(port)
monoprice = await hass.async_add_executor_job(get_monoprice, port)
except SerialException:
_LOGGER.error("Error connecting to Monoprice controller")
return
sources = {
source_id: extra[CONF_NAME] for source_id, extra in config[CONF_SOURCES].items()
}
sources = _get_sources(config_entry.data.get(CONF_SOURCES))
hass.data[DATA_MONOPRICE] = []
for zone_id, extra in config[CONF_ZONES].items():
_LOGGER.info("Adding zone %d - %s", zone_id, extra[CONF_NAME])
hass.data[DATA_MONOPRICE].append(
MonopriceZone(monoprice, sources, zone_id, extra[CONF_NAME])
devices = []
for i in range(1, 4):
for j in range(1, 7):
zone_id = (i * 10) + j
_LOGGER.info("Adding zone %d for port %s", zone_id, port)
devices.append(
MonopriceZone(monoprice, sources, config_entry.entry_id, zone_id)
)
add_entities(hass.data[DATA_MONOPRICE], True)
async_add_devices(devices, True)
def service_handle(service):
platform = entity_platform.current_platform.get()
def _call_service(entities, service_call):
for entity in entities:
if service_call.service == SERVICE_SNAPSHOT:
entity.snapshot()
elif service_call.service == SERVICE_RESTORE:
entity.restore()
@service.verify_domain_control(hass, DOMAIN)
async def async_service_handle(service_call):
"""Handle for services."""
entity_ids = service.data.get(ATTR_ENTITY_ID)
entities = await platform.async_extract_from_service(service_call)
if entity_ids:
devices = [
device
for device in hass.data[DATA_MONOPRICE]
if device.entity_id in entity_ids
]
else:
devices = hass.data[DATA_MONOPRICE]
if not entities:
return
for device in devices:
if service.service == SERVICE_SNAPSHOT:
device.snapshot()
elif service.service == SERVICE_RESTORE:
device.restore()
hass.async_add_executor_job(_call_service, entities, service_call)
hass.services.register(
DOMAIN, SERVICE_SNAPSHOT, service_handle, schema=MEDIA_PLAYER_SCHEMA
hass.services.async_register(
DOMAIN,
SERVICE_SNAPSHOT,
async_service_handle,
schema=cv.make_entity_service_schema({}),
)
hass.services.register(
DOMAIN, SERVICE_RESTORE, service_handle, schema=MEDIA_PLAYER_SCHEMA
hass.services.async_register(
DOMAIN,
SERVICE_RESTORE,
async_service_handle,
schema=cv.make_entity_service_schema({}),
)
class MonopriceZone(MediaPlayerDevice):
"""Representation of a Monoprice amplifier zone."""
def __init__(self, monoprice, sources, zone_id, zone_name):
def __init__(self, monoprice, sources, namespace, zone_id):
"""Initialize new zone."""
self._monoprice = monoprice
# dict source_id -> source name
self._source_id_name = sources
self._source_id_name = sources[0]
# dict source name -> source_id
self._source_name_id = {v: k for k, v in sources.items()}
self._source_name_id = sources[1]
# ordered list of all source names
self._source_names = sorted(
self._source_name_id.keys(), key=lambda v: self._source_name_id[v]
)
self._source_names = sources[2]
self._zone_id = zone_id
self._name = zone_name
self._unique_id = f"{namespace}_{self._zone_id}"
self._name = f"Zone {self._zone_id}"
self._snapshot = None
self._state = None
@ -156,6 +134,26 @@ class MonopriceZone(MediaPlayerDevice):
self._source = None
return True
@property
def entity_registry_enabled_default(self):
"""Return if the entity should be enabled when first added to the entity registry."""
return self._zone_id < 20
@property
def device_info(self):
"""Return device info for this device."""
return {
"identifiers": {(DOMAIN, self.unique_id)},
"name": self.name,
"manufacturer": "Monoprice",
"model": "6-Zone Amplifier",
}
@property
def unique_id(self):
"""Return unique ID for this device."""
return self._unique_id
@property
def name(self):
"""Return the name of the zone."""

View file

@ -0,0 +1,26 @@
{
"config": {
"title": "Monoprice 6-Zone Amplifier",
"step": {
"user": {
"title": "Connect to the device",
"data": {
"port": "Serial port",
"source_1": "Name of source #1",
"source_2": "Name of source #2",
"source_3": "Name of source #3",
"source_4": "Name of source #4",
"source_5": "Name of source #5",
"source_6": "Name of source #6"
}
}
},
"error": {
"cannot_connect": "Failed to connect, please try again",
"unknown": "Unexpected error"
},
"abort": {
"already_configured": "Device is already configured"
}
}
}

View file

@ -69,6 +69,7 @@ FLOWS = [
"mikrotik",
"minecraft_server",
"mobile_app",
"monoprice",
"mqtt",
"myq",
"neato",

View file

@ -0,0 +1,88 @@
"""Test the Monoprice 6-Zone Amplifier config flow."""
from asynctest import patch
from serial import SerialException
from homeassistant import config_entries, setup
from homeassistant.components.monoprice.const import (
CONF_SOURCE_1,
CONF_SOURCE_4,
CONF_SOURCE_5,
CONF_SOURCES,
DOMAIN,
)
from homeassistant.const import CONF_PORT
CONFIG = {
CONF_PORT: "/test/port",
CONF_SOURCE_1: "one",
CONF_SOURCE_4: "four",
CONF_SOURCE_5: " ",
}
async def test_form(hass):
"""Test we get the form."""
await setup.async_setup_component(hass, "persistent_notification", {})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["errors"] == {}
with patch(
"homeassistant.components.monoprice.config_flow.get_async_monoprice",
return_value=True,
), patch(
"homeassistant.components.monoprice.async_setup", return_value=True
) as mock_setup, patch(
"homeassistant.components.monoprice.async_setup_entry", return_value=True,
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], CONFIG
)
assert result2["type"] == "create_entry"
assert result2["title"] == CONFIG[CONF_PORT]
assert result2["data"] == {
CONF_PORT: CONFIG[CONF_PORT],
CONF_SOURCES: {1: CONFIG[CONF_SOURCE_1], 4: CONFIG[CONF_SOURCE_4]},
}
await hass.async_block_till_done()
assert len(mock_setup.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1
async def test_form_cannot_connect(hass):
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch(
"homeassistant.components.monoprice.config_flow.get_async_monoprice",
side_effect=SerialException,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], CONFIG
)
assert result2["type"] == "form"
assert result2["errors"] == {"base": "cannot_connect"}
async def test_generic_exception(hass):
"""Test we handle cannot generic exception."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch(
"homeassistant.components.monoprice.config_flow.get_async_monoprice",
side_effect=Exception,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], CONFIG
)
assert result2["type"] == "form"
assert result2["errors"] == {"base": "unknown"}

View file

@ -1,12 +1,15 @@
"""The tests for Monoprice Media player platform."""
from collections import defaultdict
import unittest
from unittest import mock
import pytest
import voluptuous as vol
from asynctest import patch
from serial import SerialException
from homeassistant.components.media_player.const import (
ATTR_INPUT_SOURCE,
ATTR_INPUT_SOURCE_LIST,
ATTR_MEDIA_VOLUME_LEVEL,
DOMAIN as MEDIA_PLAYER_DOMAIN,
SERVICE_SELECT_SOURCE,
SUPPORT_SELECT_SOURCE,
SUPPORT_TURN_OFF,
SUPPORT_TURN_ON,
@ -15,18 +18,28 @@ from homeassistant.components.media_player.const import (
SUPPORT_VOLUME_STEP,
)
from homeassistant.components.monoprice.const import (
CONF_SOURCES,
DOMAIN,
SERVICE_RESTORE,
SERVICE_SNAPSHOT,
)
from homeassistant.components.monoprice.media_player import (
DATA_MONOPRICE,
PLATFORM_SCHEMA,
setup_platform,
from homeassistant.const import (
CONF_PORT,
SERVICE_TURN_OFF,
SERVICE_TURN_ON,
SERVICE_VOLUME_DOWN,
SERVICE_VOLUME_MUTE,
SERVICE_VOLUME_SET,
SERVICE_VOLUME_UP,
)
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.helpers.entity_component import async_update_entity
import tests.common
from tests.common import MockConfigEntry
MOCK_CONFIG = {CONF_PORT: "fake port", CONF_SOURCES: {"1": "one", "3": "three"}}
ZONE_1_ID = "media_player.zone_11"
ZONE_2_ID = "media_player.zone_12"
class AttrDict(dict):
@ -77,306 +90,203 @@ class MockMonoprice:
self.zones[zone.zone] = AttrDict(zone)
class TestMonopriceSchema(unittest.TestCase):
"""Test Monoprice schema."""
async def test_cannot_connect(hass):
"""Test connection error."""
def test_valid_schema(self):
"""Test valid schema."""
valid_schema = {
"platform": "monoprice",
"port": "/dev/ttyUSB0",
"zones": {
11: {"name": "a"},
12: {"name": "a"},
13: {"name": "a"},
14: {"name": "a"},
15: {"name": "a"},
16: {"name": "a"},
21: {"name": "a"},
22: {"name": "a"},
23: {"name": "a"},
24: {"name": "a"},
25: {"name": "a"},
26: {"name": "a"},
31: {"name": "a"},
32: {"name": "a"},
33: {"name": "a"},
34: {"name": "a"},
35: {"name": "a"},
36: {"name": "a"},
},
"sources": {
1: {"name": "a"},
2: {"name": "a"},
3: {"name": "a"},
4: {"name": "a"},
5: {"name": "a"},
6: {"name": "a"},
},
}
PLATFORM_SCHEMA(valid_schema)
def test_invalid_schemas(self):
"""Test invalid schemas."""
schemas = (
{}, # Empty
None, # None
# Missing port
{
"platform": "monoprice",
"name": "Name",
"zones": {11: {"name": "a"}},
"sources": {1: {"name": "b"}},
},
# Invalid zone number
{
"platform": "monoprice",
"port": "aaa",
"name": "Name",
"zones": {10: {"name": "a"}},
"sources": {1: {"name": "b"}},
},
# Invalid source number
{
"platform": "monoprice",
"port": "aaa",
"name": "Name",
"zones": {11: {"name": "a"}},
"sources": {0: {"name": "b"}},
},
# Zone missing name
{
"platform": "monoprice",
"port": "aaa",
"name": "Name",
"zones": {11: {}},
"sources": {1: {"name": "b"}},
},
# Source missing name
{
"platform": "monoprice",
"port": "aaa",
"name": "Name",
"zones": {11: {"name": "a"}},
"sources": {1: {}},
},
)
for value in schemas:
with pytest.raises(vol.MultipleInvalid):
PLATFORM_SCHEMA(value)
class TestMonopriceMediaPlayer(unittest.TestCase):
"""Test the media_player module."""
def setUp(self):
"""Set up the test case."""
self.monoprice = MockMonoprice()
self.hass = tests.common.get_test_home_assistant()
self.hass.start()
# Note, source dictionary is unsorted!
with mock.patch(
with patch(
"homeassistant.components.monoprice.media_player.get_monoprice",
new=lambda *a: self.monoprice,
side_effect=SerialException,
):
setup_platform(
self.hass,
{
"platform": "monoprice",
"port": "/dev/ttyS0",
"name": "Name",
"zones": {12: {"name": "Zone name"}},
"sources": {
1: {"name": "one"},
3: {"name": "three"},
2: {"name": "two"},
},
},
lambda *args, **kwargs: None,
{},
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
# setup_component(self.hass, DOMAIN, MOCK_CONFIG)
# self.hass.async_block_till_done()
await hass.async_block_till_done()
assert hass.states.get(ZONE_1_ID) is None
async def _setup_monoprice(hass, monoprice):
with patch(
"homeassistant.components.monoprice.media_player.get_monoprice",
new=lambda *a: monoprice,
):
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
# setup_component(self.hass, DOMAIN, MOCK_CONFIG)
# self.hass.async_block_till_done()
await hass.async_block_till_done()
async def _call_media_player_service(hass, name, data):
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, name, service_data=data, blocking=True
)
self.hass.block_till_done()
self.media_player = self.hass.data[DATA_MONOPRICE][0]
self.media_player.hass = self.hass
self.media_player.entity_id = "media_player.zone_1"
def tearDown(self):
"""Tear down the test case."""
self.hass.stop()
def test_setup_platform(self, *args):
"""Test setting up platform."""
# Two services must be registered
assert self.hass.services.has_service(DOMAIN, SERVICE_RESTORE)
assert self.hass.services.has_service(DOMAIN, SERVICE_SNAPSHOT)
assert len(self.hass.data[DATA_MONOPRICE]) == 1
assert self.hass.data[DATA_MONOPRICE][0].name == "Zone name"
async def _call_homeassistant_service(hass, name, data):
await hass.services.async_call(
"homeassistant", name, service_data=data, blocking=True
)
def test_service_calls_with_entity_id(self):
async def _call_monoprice_service(hass, name, data):
await hass.services.async_call(DOMAIN, name, service_data=data, blocking=True)
async def test_service_calls_with_entity_id(hass):
"""Test snapshot save/restore service calls."""
self.media_player.update()
assert "Zone name" == self.media_player.name
assert STATE_ON == self.media_player.state
assert 0.0 == self.media_player.volume_level, 0.0001
assert self.media_player.is_volume_muted
assert "one" == self.media_player.source
# Saving default values
self.hass.services.call(
DOMAIN,
SERVICE_SNAPSHOT,
{"entity_id": "media_player.zone_1"},
blocking=True,
)
# self.hass.block_till_done()
await _setup_monoprice(hass, MockMonoprice())
# Changing media player to new state
self.media_player.set_volume_level(1)
self.media_player.select_source("two")
self.media_player.mute_volume(False)
self.media_player.turn_off()
# Checking that values were indeed changed
self.media_player.update()
assert "Zone name" == self.media_player.name
assert STATE_OFF == self.media_player.state
assert 1.0 == self.media_player.volume_level, 0.0001
assert not self.media_player.is_volume_muted
assert "two" == self.media_player.source
# Restoring wrong media player to its previous state
# Nothing should be done
self.hass.services.call(
DOMAIN, SERVICE_RESTORE, {"entity_id": "media.not_existing"}, blocking=True
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
# self.hass.block_till_done()
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
# Saving existing values
await _call_monoprice_service(hass, SERVICE_SNAPSHOT, {"entity_id": ZONE_1_ID})
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "three"}
)
# Restoring other media player to its previous state
# The zone should not be restored
await _call_monoprice_service(hass, SERVICE_RESTORE, {"entity_id": ZONE_2_ID})
await hass.async_block_till_done()
# Checking that values were not (!) restored
self.media_player.update()
assert "Zone name" == self.media_player.name
assert STATE_OFF == self.media_player.state
assert 1.0 == self.media_player.volume_level, 0.0001
assert not self.media_player.is_volume_muted
assert "two" == self.media_player.source
state = hass.states.get(ZONE_1_ID)
assert 1.0 == state.attributes[ATTR_MEDIA_VOLUME_LEVEL]
assert "three" == state.attributes[ATTR_INPUT_SOURCE]
# Restoring media player to its previous state
self.hass.services.call(
DOMAIN, SERVICE_RESTORE, {"entity_id": "media_player.zone_1"}, blocking=True
)
self.hass.block_till_done()
await _call_monoprice_service(hass, SERVICE_RESTORE, {"entity_id": ZONE_1_ID})
await hass.async_block_till_done()
# Checking that values were restored
assert "Zone name" == self.media_player.name
assert STATE_ON == self.media_player.state
assert 0.0 == self.media_player.volume_level, 0.0001
assert self.media_player.is_volume_muted
assert "one" == self.media_player.source
state = hass.states.get(ZONE_1_ID)
def test_service_calls_without_entity_id(self):
assert 0.0 == state.attributes[ATTR_MEDIA_VOLUME_LEVEL]
assert "one" == state.attributes[ATTR_INPUT_SOURCE]
async def test_service_calls_with_all_entities(hass):
"""Test snapshot save/restore service calls."""
self.media_player.update()
assert "Zone name" == self.media_player.name
assert STATE_ON == self.media_player.state
assert 0.0 == self.media_player.volume_level, 0.0001
assert self.media_player.is_volume_muted
assert "one" == self.media_player.source
# Restoring media player
# since there is no snapshot, nothing should be done
self.hass.services.call(DOMAIN, SERVICE_RESTORE, blocking=True)
self.hass.block_till_done()
self.media_player.update()
assert "Zone name" == self.media_player.name
assert STATE_ON == self.media_player.state
assert 0.0 == self.media_player.volume_level, 0.0001
assert self.media_player.is_volume_muted
assert "one" == self.media_player.source
# Saving default values
self.hass.services.call(DOMAIN, SERVICE_SNAPSHOT, blocking=True)
self.hass.block_till_done()
await _setup_monoprice(hass, MockMonoprice())
# Changing media player to new state
self.media_player.set_volume_level(1)
self.media_player.select_source("two")
self.media_player.mute_volume(False)
self.media_player.turn_off()
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
# Checking that values were indeed changed
self.media_player.update()
assert "Zone name" == self.media_player.name
assert STATE_OFF == self.media_player.state
assert 1.0 == self.media_player.volume_level, 0.0001
assert not self.media_player.is_volume_muted
assert "two" == self.media_player.source
# Saving existing values
await _call_monoprice_service(hass, SERVICE_SNAPSHOT, {"entity_id": "all"})
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "three"}
)
# Restoring media player to its previous state
self.hass.services.call(DOMAIN, SERVICE_RESTORE, blocking=True)
self.hass.block_till_done()
await _call_monoprice_service(hass, SERVICE_RESTORE, {"entity_id": "all"})
await hass.async_block_till_done()
# Checking that values were restored
assert "Zone name" == self.media_player.name
assert STATE_ON == self.media_player.state
assert 0.0 == self.media_player.volume_level, 0.0001
assert self.media_player.is_volume_muted
assert "one" == self.media_player.source
state = hass.states.get(ZONE_1_ID)
def test_update(self):
assert 0.0 == state.attributes[ATTR_MEDIA_VOLUME_LEVEL]
assert "one" == state.attributes[ATTR_INPUT_SOURCE]
async def test_service_calls_without_relevant_entities(hass):
"""Test snapshot save/restore service calls."""
await _setup_monoprice(hass, MockMonoprice())
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
# Saving existing values
await _call_monoprice_service(hass, SERVICE_SNAPSHOT, {"entity_id": "all"})
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "three"}
)
# Restoring media player to its previous state
await _call_monoprice_service(hass, SERVICE_RESTORE, {"entity_id": "light.demo"})
await hass.async_block_till_done()
state = hass.states.get(ZONE_1_ID)
assert 1.0 == state.attributes[ATTR_MEDIA_VOLUME_LEVEL]
assert "three" == state.attributes[ATTR_INPUT_SOURCE]
async def test_restore_without_snapshort(hass):
"""Test restore when snapshot wasn't called."""
await _setup_monoprice(hass, MockMonoprice())
with patch.object(MockMonoprice, "restore_zone") as method_call:
await _call_monoprice_service(hass, SERVICE_RESTORE, {"entity_id": ZONE_1_ID})
await hass.async_block_till_done()
assert not method_call.called
async def test_update(hass):
"""Test updating values from monoprice."""
assert self.media_player.state is None
assert self.media_player.volume_level is None
assert self.media_player.is_volume_muted is None
assert self.media_player.source is None
"""Test snapshot save/restore service calls."""
monoprice = MockMonoprice()
await _setup_monoprice(hass, monoprice)
self.media_player.update()
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
assert STATE_ON == self.media_player.state
assert 0.0 == self.media_player.volume_level, 0.0001
assert self.media_player.is_volume_muted
assert "one" == self.media_player.source
monoprice.set_source(11, 3)
monoprice.set_volume(11, 38)
def test_name(self):
"""Test name property."""
assert "Zone name" == self.media_player.name
await async_update_entity(hass, ZONE_1_ID)
await hass.async_block_till_done()
def test_state(self):
"""Test state property."""
assert self.media_player.state is None
state = hass.states.get(ZONE_1_ID)
self.media_player.update()
assert STATE_ON == self.media_player.state
assert 1.0 == state.attributes[ATTR_MEDIA_VOLUME_LEVEL]
assert "three" == state.attributes[ATTR_INPUT_SOURCE]
self.monoprice.zones[12].power = False
self.media_player.update()
assert STATE_OFF == self.media_player.state
def test_volume_level(self):
"""Test volume level property."""
assert self.media_player.volume_level is None
self.media_player.update()
assert 0.0 == self.media_player.volume_level, 0.0001
self.monoprice.zones[12].volume = 38
self.media_player.update()
assert 1.0 == self.media_player.volume_level, 0.0001
self.monoprice.zones[12].volume = 19
self.media_player.update()
assert 0.5 == self.media_player.volume_level, 0.0001
def test_is_volume_muted(self):
"""Test volume muted property."""
assert self.media_player.is_volume_muted is None
self.media_player.update()
assert self.media_player.is_volume_muted
self.monoprice.zones[12].mute = False
self.media_player.update()
assert not self.media_player.is_volume_muted
def test_supported_features(self):
async def test_supported_features(hass):
"""Test supported features property."""
await _setup_monoprice(hass, MockMonoprice())
state = hass.states.get(ZONE_1_ID)
assert (
SUPPORT_VOLUME_MUTE
| SUPPORT_VOLUME_SET
@ -384,119 +294,115 @@ class TestMonopriceMediaPlayer(unittest.TestCase):
| SUPPORT_TURN_ON
| SUPPORT_TURN_OFF
| SUPPORT_SELECT_SOURCE
== self.media_player.supported_features
== state.attributes["supported_features"]
)
def test_source(self):
"""Test source property."""
assert self.media_player.source is None
self.media_player.update()
assert "one" == self.media_player.source
def test_media_title(self):
"""Test media title property."""
assert self.media_player.media_title is None
self.media_player.update()
assert "one" == self.media_player.media_title
def test_source_list(self):
async def test_source_list(hass):
"""Test source list property."""
await _setup_monoprice(hass, MockMonoprice())
state = hass.states.get(ZONE_1_ID)
# Note, the list is sorted!
assert ["one", "two", "three"] == self.media_player.source_list
assert ["one", "three"] == state.attributes[ATTR_INPUT_SOURCE_LIST]
def test_select_source(self):
async def test_select_source(hass):
"""Test source selection methods."""
self.media_player.update()
monoprice = MockMonoprice()
await _setup_monoprice(hass, monoprice)
assert "one" == self.media_player.source
self.media_player.select_source("two")
assert 2 == self.monoprice.zones[12].source
self.media_player.update()
assert "two" == self.media_player.source
await _call_media_player_service(
hass,
SERVICE_SELECT_SOURCE,
{"entity_id": ZONE_1_ID, ATTR_INPUT_SOURCE: "three"},
)
assert 3 == monoprice.zones[11].source
# Trying to set unknown source
self.media_player.select_source("no name")
assert 2 == self.monoprice.zones[12].source
self.media_player.update()
assert "two" == self.media_player.source
await _call_media_player_service(
hass,
SERVICE_SELECT_SOURCE,
{"entity_id": ZONE_1_ID, ATTR_INPUT_SOURCE: "no name"},
)
assert 3 == monoprice.zones[11].source
def test_turn_on(self):
async def test_unknown_source(hass):
"""Test behavior when device has unknown source."""
monoprice = MockMonoprice()
await _setup_monoprice(hass, monoprice)
monoprice.set_source(11, 5)
await async_update_entity(hass, ZONE_1_ID)
await hass.async_block_till_done()
state = hass.states.get(ZONE_1_ID)
assert state.attributes.get(ATTR_INPUT_SOURCE) is None
async def test_turn_on_off(hass):
"""Test turning on the zone."""
self.monoprice.zones[12].power = False
self.media_player.update()
assert STATE_OFF == self.media_player.state
monoprice = MockMonoprice()
await _setup_monoprice(hass, monoprice)
self.media_player.turn_on()
assert self.monoprice.zones[12].power
self.media_player.update()
assert STATE_ON == self.media_player.state
await _call_media_player_service(hass, SERVICE_TURN_OFF, {"entity_id": ZONE_1_ID})
assert not monoprice.zones[11].power
def test_turn_off(self):
"""Test turning off the zone."""
self.monoprice.zones[12].power = True
self.media_player.update()
assert STATE_ON == self.media_player.state
await _call_media_player_service(hass, SERVICE_TURN_ON, {"entity_id": ZONE_1_ID})
assert monoprice.zones[11].power
self.media_player.turn_off()
assert not self.monoprice.zones[12].power
self.media_player.update()
assert STATE_OFF == self.media_player.state
def test_mute_volume(self):
async def test_mute_volume(hass):
"""Test mute functionality."""
self.monoprice.zones[12].mute = True
self.media_player.update()
assert self.media_player.is_volume_muted
monoprice = MockMonoprice()
await _setup_monoprice(hass, monoprice)
self.media_player.mute_volume(False)
assert not self.monoprice.zones[12].mute
self.media_player.update()
assert not self.media_player.is_volume_muted
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.5}
)
await _call_media_player_service(
hass, SERVICE_VOLUME_MUTE, {"entity_id": ZONE_1_ID, "is_volume_muted": False}
)
assert not monoprice.zones[11].mute
self.media_player.mute_volume(True)
assert self.monoprice.zones[12].mute
self.media_player.update()
assert self.media_player.is_volume_muted
await _call_media_player_service(
hass, SERVICE_VOLUME_MUTE, {"entity_id": ZONE_1_ID, "is_volume_muted": True}
)
assert monoprice.zones[11].mute
def test_set_volume_level(self):
"""Test set volume level."""
self.media_player.set_volume_level(1.0)
assert 38 == self.monoprice.zones[12].volume
assert isinstance(self.monoprice.zones[12].volume, int)
self.media_player.set_volume_level(0.0)
assert 0 == self.monoprice.zones[12].volume
assert isinstance(self.monoprice.zones[12].volume, int)
self.media_player.set_volume_level(0.5)
assert 19 == self.monoprice.zones[12].volume
assert isinstance(self.monoprice.zones[12].volume, int)
def test_volume_up(self):
async def test_volume_up_down(hass):
"""Test increasing volume by one."""
self.monoprice.zones[12].volume = 37
self.media_player.update()
self.media_player.volume_up()
assert 38 == self.monoprice.zones[12].volume
assert isinstance(self.monoprice.zones[12].volume, int)
monoprice = MockMonoprice()
await _setup_monoprice(hass, monoprice)
# Try to raise value beyond max
self.media_player.update()
self.media_player.volume_up()
assert 38 == self.monoprice.zones[12].volume
assert isinstance(self.monoprice.zones[12].volume, int)
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
assert 0 == monoprice.zones[11].volume
def test_volume_down(self):
"""Test decreasing volume by one."""
self.monoprice.zones[12].volume = 1
self.media_player.update()
self.media_player.volume_down()
assert 0 == self.monoprice.zones[12].volume
assert isinstance(self.monoprice.zones[12].volume, int)
await _call_media_player_service(
hass, SERVICE_VOLUME_DOWN, {"entity_id": ZONE_1_ID}
)
# should not go below zero
assert 0 == monoprice.zones[11].volume
# Try to lower value beyond minimum
self.media_player.update()
self.media_player.volume_down()
assert 0 == self.monoprice.zones[12].volume
assert isinstance(self.monoprice.zones[12].volume, int)
await _call_media_player_service(hass, SERVICE_VOLUME_UP, {"entity_id": ZONE_1_ID})
assert 1 == monoprice.zones[11].volume
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
assert 38 == monoprice.zones[11].volume
await _call_media_player_service(hass, SERVICE_VOLUME_UP, {"entity_id": ZONE_1_ID})
# should not go above 38
assert 38 == monoprice.zones[11].volume
await _call_media_player_service(
hass, SERVICE_VOLUME_DOWN, {"entity_id": ZONE_1_ID}
)
assert 37 == monoprice.zones[11].volume