Raise if referenced entity does not support service (#68394)

This commit is contained in:
Paulus Schoutsen 2022-03-20 16:01:58 -07:00 committed by GitHub
parent 0cbc29caca
commit 8bbbd1947d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 105 additions and 90 deletions

View file

@ -220,7 +220,7 @@ class AlarmControlPanelTemplate(TemplateEntity, AlarmControlPanelEntity):
)
await super().async_added_to_hass()
async def _async_alarm_arm(self, state, script=None, code=None):
async def _async_alarm_arm(self, state, script, code):
"""Arm the panel to specified state with supplied script."""
optimistic_set = False
@ -228,10 +228,7 @@ class AlarmControlPanelTemplate(TemplateEntity, AlarmControlPanelEntity):
self._state = state
optimistic_set = True
if script is not None:
await script.async_run({ATTR_CODE: code}, context=self._context)
else:
_LOGGER.error("No script action defined for %s", state)
await script.async_run({ATTR_CODE: code}, context=self._context)
if optimistic_set:
self.async_write_ha_state()

View file

@ -527,7 +527,7 @@ def async_set_service_schema(
@bind_hass
async def entity_service_call(
async def entity_service_call( # noqa: C901
hass: HomeAssistant,
platforms: Iterable[EntityPlatform],
func: str | Callable[..., Any],
@ -646,6 +646,12 @@ async def entity_service_call(
for feature_set in required_features
)
):
# If entity explicitly referenced, raise an error
if referenced is not None and entity.entity_id in referenced.referenced:
raise HomeAssistantError(
f"Entity {entity.entity_id} does not support this service."
)
continue
entities.append(entity)

View file

@ -39,6 +39,7 @@ from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import device_registry as dr, entity_registry as er, network
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.setup import async_setup_component
@ -1225,15 +1226,18 @@ async def test_entity_control(hass: HomeAssistant):
chromecast.media_controller.pause.assert_called_once_with()
# Media previous
await common.async_media_previous_track(hass, entity_id)
with pytest.raises(HomeAssistantError):
await common.async_media_previous_track(hass, entity_id)
chromecast.media_controller.queue_prev.assert_not_called()
# Media next
await common.async_media_next_track(hass, entity_id)
with pytest.raises(HomeAssistantError):
await common.async_media_next_track(hass, entity_id)
chromecast.media_controller.queue_next.assert_not_called()
# Media seek
await common.async_media_seek(hass, 123, entity_id)
with pytest.raises(HomeAssistantError):
await common.async_media_seek(hass, 123, entity_id)
chromecast.media_controller.seek.assert_not_called()
# Enable support for queue and seek

View file

@ -3,6 +3,7 @@ from dynalite_devices_lib.cover import DynaliteTimeCoverWithTiltDevice
import pytest
from homeassistant.const import ATTR_DEVICE_CLASS, ATTR_FRIENDLY_NAME
from homeassistant.exceptions import HomeAssistantError
from .common import (
ATTR_ARGS,
@ -65,9 +66,10 @@ async def test_cover_without_tilt(hass, mock_device):
"""Test a cover with no tilt."""
mock_device.has_tilt = False
await create_entity_from_device(hass, mock_device)
await hass.services.async_call(
"cover", "open_cover_tilt", {"entity_id": "cover.name"}, blocking=True
)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
"cover", "open_cover_tilt", {"entity_id": "cover.name"}, blocking=True
)
await hass.async_block_till_done()
mock_device.async_open_cover_tilt.assert_not_called()

View file

@ -5,6 +5,7 @@ import pytest
from homeassistant.components.rfxtrx import DOMAIN
from homeassistant.core import State
from homeassistant.exceptions import HomeAssistantError
from tests.common import MockConfigEntry, mock_restore_cache
from tests.components.rfxtrx.conftest import create_rfx_test_cfg
@ -181,19 +182,21 @@ async def test_rfy_cover(hass, rfxtrx):
blocking=True,
)
await hass.services.async_call(
"cover",
"open_cover_tilt",
{"entity_id": "cover.rfy_010203_1"},
blocking=True,
)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
"cover",
"open_cover_tilt",
{"entity_id": "cover.rfy_010203_1"},
blocking=True,
)
await hass.services.async_call(
"cover",
"close_cover_tilt",
{"entity_id": "cover.rfy_010203_1"},
blocking=True,
)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
"cover",
"close_cover_tilt",
{"entity_id": "cover.rfy_010203_1"},
blocking=True,
)
assert rfxtrx.transport.send.mock_calls == [
call(bytearray(b"\x08\x1a\x00\x00\x01\x02\x03\x01\x00")),

View file

@ -27,6 +27,7 @@ from homeassistant.const import (
STATE_ALARM_TRIGGERED,
STATE_UNKNOWN,
)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.entity_component import async_update_entity
@ -337,17 +338,24 @@ async def test_sets_with_correct_code(hass, two_part_alarm):
await _test_service_call(
hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", SECOND_ENTITY_ID, 1, "C", **code
)
await _test_no_service_call(
hass, SERVICE_ALARM_ARM_CUSTOM_BYPASS, "partial_arm", FIRST_ENTITY_ID, 0, **code
)
await _test_no_service_call(
hass,
SERVICE_ALARM_ARM_CUSTOM_BYPASS,
"partial_arm",
SECOND_ENTITY_ID,
1,
**code,
)
with pytest.raises(HomeAssistantError):
await _test_no_service_call(
hass,
SERVICE_ALARM_ARM_CUSTOM_BYPASS,
"partial_arm",
FIRST_ENTITY_ID,
0,
**code,
)
with pytest.raises(HomeAssistantError):
await _test_no_service_call(
hass,
SERVICE_ALARM_ARM_CUSTOM_BYPASS,
"partial_arm",
SECOND_ENTITY_ID,
1,
**code,
)
async def test_sets_with_incorrect_code(hass, two_part_alarm):
@ -379,14 +387,21 @@ async def test_sets_with_incorrect_code(hass, two_part_alarm):
await _test_no_service_call(
hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", SECOND_ENTITY_ID, 1, **code
)
await _test_no_service_call(
hass, SERVICE_ALARM_ARM_CUSTOM_BYPASS, "partial_arm", FIRST_ENTITY_ID, 0, **code
)
await _test_no_service_call(
hass,
SERVICE_ALARM_ARM_CUSTOM_BYPASS,
"partial_arm",
SECOND_ENTITY_ID,
1,
**code,
)
with pytest.raises(HomeAssistantError):
await _test_no_service_call(
hass,
SERVICE_ALARM_ARM_CUSTOM_BYPASS,
"partial_arm",
FIRST_ENTITY_ID,
0,
**code,
)
with pytest.raises(HomeAssistantError):
await _test_no_service_call(
hass,
SERVICE_ALARM_ARM_CUSTOM_BYPASS,
"partial_arm",
SECOND_ENTITY_ID,
1,
**code,
)

View file

@ -60,6 +60,7 @@ from homeassistant.const import (
STATE_UNAVAILABLE,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.typing import ConfigType
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
@ -896,9 +897,10 @@ async def test_turn_on_wol(hass: HomeAssistant) -> None:
async def test_turn_on_without_turnon(hass: HomeAssistant, remote: Mock) -> None:
"""Test turn on."""
await setup_samsungtv(hass, MOCK_CONFIG_NOTURNON)
assert await hass.services.async_call(
DOMAIN, SERVICE_TURN_ON, {ATTR_ENTITY_ID: ENTITY_ID_NOTURNON}, True
)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
DOMAIN, SERVICE_TURN_ON, {ATTR_ENTITY_ID: ENTITY_ID_NOTURNON}, True
)
# nothing called as not supported feature
assert remote.control.call_count == 0

View file

@ -129,38 +129,6 @@ async def test_optimistic_states(hass, start_ha):
assert hass.states.get(TEMPLATE_NAME).state == set_state
@pytest.mark.parametrize("count,domain", [(1, "alarm_control_panel")])
@pytest.mark.parametrize(
"config",
[
{
"alarm_control_panel": {
"platform": "template",
"panels": {
"test_template_panel": {
"value_template": "{{ states('alarm_control_panel.test') }}",
}
},
}
},
],
)
async def test_no_action_scripts(hass, start_ha):
"""Test no action scripts per state."""
hass.states.async_set("alarm_control_panel.test", STATE_ALARM_ARMED_AWAY)
await hass.async_block_till_done()
for func, set_state in [
(common.async_alarm_arm_away, STATE_ALARM_ARMED_AWAY),
(common.async_alarm_arm_home, STATE_ALARM_ARMED_AWAY),
(common.async_alarm_arm_night, STATE_ALARM_ARMED_AWAY),
(common.async_alarm_disarm, STATE_ALARM_ARMED_AWAY),
]:
await func(hass, entity_id=TEMPLATE_NAME)
await hass.async_block_till_done()
assert hass.states.get(TEMPLATE_NAME).state == set_state
@pytest.mark.parametrize("count,domain", [(0, "alarm_control_panel")])
@pytest.mark.parametrize(
"config,msg",

View file

@ -1,9 +1,12 @@
"""The tests for WebOS TV automation triggers."""
from unittest.mock import patch
import pytest
from homeassistant.components import automation
from homeassistant.components.webostv import DOMAIN
from homeassistant.const import SERVICE_RELOAD
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.device_registry import async_get as get_dev_reg
from homeassistant.setup import async_setup_component
@ -57,17 +60,18 @@ async def test_webostv_turn_on_trigger_device_id(hass, calls, client):
with patch("homeassistant.config.load_yaml", return_value={}):
await hass.services.async_call(automation.DOMAIN, SERVICE_RELOAD, blocking=True)
await hass.services.async_call(
"media_player",
"turn_on",
{"entity_id": ENTITY_ID},
blocking=True,
)
calls.clear()
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
"media_player",
"turn_on",
{"entity_id": ENTITY_ID},
blocking=True,
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data["some"] == device.id
assert calls[0].data["id"] == 0
assert len(calls) == 0
async def test_webostv_turn_on_trigger_entity_id(hass, calls, client):

View file

@ -552,6 +552,20 @@ async def test_call_with_required_features(hass, mock_entities):
actual = [call[0][0] for call in test_service_mock.call_args_list]
assert all(entity in actual for entity in expected)
# Test we raise if we target entity ID that does not support the service
test_service_mock.reset_mock()
with pytest.raises(exceptions.HomeAssistantError):
await service.entity_service_call(
hass,
[Mock(entities=mock_entities)],
test_service_mock,
ha.ServiceCall(
"test_domain", "test_service", {"entity_id": "light.living_room"}
),
required_features=[SUPPORT_A],
)
assert test_service_mock.call_count == 0
async def test_call_with_both_required_features(hass, mock_entities):
"""Test service calls invoked only if entity has both features."""