Improve validation of device automations (#102766)

* Improve validation of device automations

* Improve comments

* Address review comment
This commit is contained in:
Erik Montnemery 2023-10-26 09:46:16 +02:00 committed by GitHub
parent 4838b2dee6
commit 087df10d27
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 133 additions and 52 deletions

View file

@ -5,9 +5,9 @@ from typing import cast
import voluptuous as vol import voluptuous as vol
from homeassistant.const import CONF_DEVICE_ID, CONF_DOMAIN, Platform from homeassistant.const import CONF_DEVICE_ID, CONF_DOMAIN, CONF_ENTITY_ID, Platform
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.typing import ConfigType
from . import DeviceAutomationType, async_get_device_automation_platform from . import DeviceAutomationType, async_get_device_automation_platform
@ -55,31 +55,42 @@ async def async_validate_device_automation_config(
platform = await async_get_device_automation_platform( platform = await async_get_device_automation_platform(
hass, validated_config[CONF_DOMAIN], automation_type hass, validated_config[CONF_DOMAIN], automation_type
) )
# Make sure the referenced device and optional entity exist
device_registry = dr.async_get(hass)
if not (device := device_registry.async_get(validated_config[CONF_DEVICE_ID])):
# The device referenced by the device automation does not exist
raise InvalidDeviceAutomationConfig(
f"Unknown device '{validated_config[CONF_DEVICE_ID]}'"
)
if entity_id := validated_config.get(CONF_ENTITY_ID):
try:
er.async_validate_entity_id(er.async_get(hass), entity_id)
except vol.Invalid as err:
raise InvalidDeviceAutomationConfig(
f"Unknown entity '{entity_id}'"
) from err
if not hasattr(platform, DYNAMIC_VALIDATOR[automation_type]): if not hasattr(platform, DYNAMIC_VALIDATOR[automation_type]):
# Pass the unvalidated config to avoid mutating the raw config twice # Pass the unvalidated config to avoid mutating the raw config twice
return cast( return cast(
ConfigType, getattr(platform, STATIC_VALIDATOR[automation_type])(config) ConfigType, getattr(platform, STATIC_VALIDATOR[automation_type])(config)
) )
# Bypass checks for entity platforms # Devices are not linked to config entries from entity platform domains, skip
# the checks below which look for a config entry matching the device automation
# domain
if ( if (
automation_type == DeviceAutomationType.ACTION automation_type == DeviceAutomationType.ACTION
and validated_config[CONF_DOMAIN] in ENTITY_PLATFORMS and validated_config[CONF_DOMAIN] in ENTITY_PLATFORMS
): ):
# Pass the unvalidated config to avoid mutating the raw config twice
return cast( return cast(
ConfigType, ConfigType,
await getattr(platform, DYNAMIC_VALIDATOR[automation_type])(hass, config), await getattr(platform, DYNAMIC_VALIDATOR[automation_type])(hass, config),
) )
# Only call the dynamic validator if the referenced device exists and the relevant # Find a config entry with the same domain as the device automation
# config entry is loaded
registry = dr.async_get(hass)
if not (device := registry.async_get(validated_config[CONF_DEVICE_ID])):
# The device referenced by the device automation does not exist
raise InvalidDeviceAutomationConfig(
f"Unknown device '{validated_config[CONF_DEVICE_ID]}'"
)
device_config_entry = None device_config_entry = None
for entry_id in device.config_entries: for entry_id in device.config_entries:
if ( if (
@ -91,7 +102,7 @@ async def async_validate_device_automation_config(
break break
if not device_config_entry: if not device_config_entry:
# The config entry referenced by the device automation does not exist # There's no config entry with the same domain as the device automation
raise InvalidDeviceAutomationConfig( raise InvalidDeviceAutomationConfig(
f"Device '{validated_config[CONF_DEVICE_ID]}' has no config entry from " f"Device '{validated_config[CONF_DEVICE_ID]}' has no config entry from "
f"domain '{validated_config[CONF_DOMAIN]}'" f"domain '{validated_config[CONF_DOMAIN]}'"

View file

@ -1,6 +1,7 @@
"""The test for light device automation.""" """The test for light device automation."""
from unittest.mock import AsyncMock, Mock, patch from unittest.mock import AsyncMock, Mock, patch
import attr
import pytest import pytest
from pytest_unordered import unordered from pytest_unordered import unordered
import voluptuous as vol import voluptuous as vol
@ -31,6 +32,13 @@ from tests.common import (
from tests.typing import WebSocketGenerator from tests.typing import WebSocketGenerator
@attr.s(frozen=True)
class MockDeviceEntry(dr.DeviceEntry):
"""Device Registry Entry with fixed UUID."""
id: str = attr.ib(default="very_unique")
@pytest.fixture(autouse=True, name="stub_blueprint_populate") @pytest.fixture(autouse=True, name="stub_blueprint_populate")
def stub_blueprint_populate_autouse(stub_blueprint_populate: None) -> None: def stub_blueprint_populate_autouse(stub_blueprint_populate: None) -> None:
"""Stub copying the blueprints to the config folder.""" """Stub copying the blueprints to the config folder."""
@ -1240,17 +1248,56 @@ async def test_automation_with_integration_without_device_trigger(
) )
BAD_AUTOMATIONS = [
(
{"device_id": "very_unique", "domain": "light"},
"required key not provided @ data['entity_id']",
),
(
{"device_id": "wrong", "domain": "light"},
"Unknown device 'wrong'",
),
(
{"device_id": "wrong"},
"required key not provided @ data{path}['domain']",
),
(
{"device_id": "wrong", "domain": "light"},
"Unknown device 'wrong'",
),
(
{"device_id": "very_unique", "domain": "light"},
"required key not provided @ data['entity_id']",
),
(
{"device_id": "very_unique", "domain": "light", "entity_id": "wrong"},
"Unknown entity 'wrong'",
),
]
BAD_TRIGGERS = BAD_CONDITIONS = BAD_AUTOMATIONS + [
(
{"domain": "light"},
"required key not provided @ data{path}['device_id']",
)
]
@patch("homeassistant.helpers.device_registry.DeviceEntry", MockDeviceEntry)
@pytest.mark.parametrize(("action", "expected_error"), BAD_AUTOMATIONS)
async def test_automation_with_bad_action( async def test_automation_with_bad_action(
hass: HomeAssistant, hass: HomeAssistant,
caplog: pytest.LogCaptureFixture, caplog: pytest.LogCaptureFixture,
device_registry: dr.DeviceRegistry, device_registry: dr.DeviceRegistry,
entity_registry: er.EntityRegistry, entity_registry: er.EntityRegistry,
action: dict[str, str],
expected_error: str,
) -> None: ) -> None:
"""Test automation with bad device action.""" """Test automation with bad device action."""
config_entry = MockConfigEntry(domain="fake_integration", data={}) config_entry = MockConfigEntry(domain="fake_integration", data={})
config_entry.state = config_entries.ConfigEntryState.LOADED config_entry.state = config_entries.ConfigEntryState.LOADED
config_entry.add_to_hass(hass) config_entry.add_to_hass(hass)
device_entry = device_registry.async_get_or_create( device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id, config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")},
) )
@ -1262,25 +1309,29 @@ async def test_automation_with_bad_action(
automation.DOMAIN: { automation.DOMAIN: {
"alias": "hello", "alias": "hello",
"trigger": {"platform": "event", "event_type": "test_event1"}, "trigger": {"platform": "event", "event_type": "test_event1"},
"action": {"device_id": device_entry.id, "domain": "light"}, "action": action,
} }
}, },
) )
assert "required key not provided" in caplog.text assert expected_error.format(path="['action'][0]") in caplog.text
@patch("homeassistant.helpers.device_registry.DeviceEntry", MockDeviceEntry)
@pytest.mark.parametrize(("condition", "expected_error"), BAD_CONDITIONS)
async def test_automation_with_bad_condition_action( async def test_automation_with_bad_condition_action(
hass: HomeAssistant, hass: HomeAssistant,
caplog: pytest.LogCaptureFixture, caplog: pytest.LogCaptureFixture,
device_registry: dr.DeviceRegistry, device_registry: dr.DeviceRegistry,
entity_registry: er.EntityRegistry, entity_registry: er.EntityRegistry,
condition: dict[str, str],
expected_error: str,
) -> None: ) -> None:
"""Test automation with bad device action.""" """Test automation with bad device action."""
config_entry = MockConfigEntry(domain="fake_integration", data={}) config_entry = MockConfigEntry(domain="fake_integration", data={})
config_entry.state = config_entries.ConfigEntryState.LOADED config_entry.state = config_entries.ConfigEntryState.LOADED
config_entry.add_to_hass(hass) config_entry.add_to_hass(hass)
device_entry = device_registry.async_get_or_create( device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id, config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")},
) )
@ -1292,42 +1343,32 @@ async def test_automation_with_bad_condition_action(
automation.DOMAIN: { automation.DOMAIN: {
"alias": "hello", "alias": "hello",
"trigger": {"platform": "event", "event_type": "test_event1"}, "trigger": {"platform": "event", "event_type": "test_event1"},
"action": { "action": {"condition": "device"} | condition,
"condition": "device",
"device_id": device_entry.id,
"domain": "light",
},
} }
}, },
) )
assert "required key not provided" in caplog.text assert expected_error.format(path="['action'][0]") in caplog.text
async def test_automation_with_bad_condition_missing_domain(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test automation with bad device condition."""
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"alias": "hello",
"trigger": {"platform": "event", "event_type": "test_event1"},
"condition": {"condition": "device", "device_id": "hello.device"},
"action": {"service": "test.automation", "entity_id": "hello.world"},
}
},
)
assert "required key not provided @ data['condition'][0]['domain']" in caplog.text
@patch("homeassistant.helpers.device_registry.DeviceEntry", MockDeviceEntry)
@pytest.mark.parametrize(("condition", "expected_error"), BAD_CONDITIONS)
async def test_automation_with_bad_condition( async def test_automation_with_bad_condition(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
device_registry: dr.DeviceRegistry,
condition: dict[str, str],
expected_error: str,
) -> None: ) -> None:
"""Test automation with bad device condition.""" """Test automation with bad device condition."""
config_entry = MockConfigEntry(domain="fake_integration", data={})
config_entry.state = config_entries.ConfigEntryState.LOADED
config_entry.add_to_hass(hass)
device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")},
)
assert await async_setup_component( assert await async_setup_component(
hass, hass,
automation.DOMAIN, automation.DOMAIN,
@ -1335,13 +1376,13 @@ async def test_automation_with_bad_condition(
automation.DOMAIN: { automation.DOMAIN: {
"alias": "hello", "alias": "hello",
"trigger": {"platform": "event", "event_type": "test_event1"}, "trigger": {"platform": "event", "event_type": "test_event1"},
"condition": {"condition": "device", "domain": "light"}, "condition": {"condition": "device"} | condition,
"action": {"service": "test.automation", "entity_id": "hello.world"}, "action": {"service": "test.automation", "entity_id": "hello.world"},
} }
}, },
) )
assert "required key not provided" in caplog.text assert expected_error.format(path="['condition'][0]") in caplog.text
@pytest.fixture @pytest.fixture
@ -1475,10 +1516,24 @@ async def test_automation_with_sub_condition(
) )
@patch("homeassistant.helpers.device_registry.DeviceEntry", MockDeviceEntry)
@pytest.mark.parametrize(("condition", "expected_error"), BAD_CONDITIONS)
async def test_automation_with_bad_sub_condition( async def test_automation_with_bad_sub_condition(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
device_registry: dr.DeviceRegistry,
condition: dict[str, str],
expected_error: str,
) -> None: ) -> None:
"""Test automation with bad device condition under and/or conditions.""" """Test automation with bad device condition under and/or conditions."""
config_entry = MockConfigEntry(domain="fake_integration", data={})
config_entry.state = config_entries.ConfigEntryState.LOADED
config_entry.add_to_hass(hass)
device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")},
)
assert await async_setup_component( assert await async_setup_component(
hass, hass,
automation.DOMAIN, automation.DOMAIN,
@ -1488,33 +1543,48 @@ async def test_automation_with_bad_sub_condition(
"trigger": {"platform": "event", "event_type": "test_event1"}, "trigger": {"platform": "event", "event_type": "test_event1"},
"condition": { "condition": {
"condition": "and", "condition": "and",
"conditions": [{"condition": "device", "domain": "light"}], "conditions": [{"condition": "device"} | condition],
}, },
"action": {"service": "test.automation", "entity_id": "hello.world"}, "action": {"service": "test.automation", "entity_id": "hello.world"},
} }
}, },
) )
assert "required key not provided" in caplog.text path = "['condition'][0]['conditions'][0]"
assert expected_error.format(path=path) in caplog.text
@patch("homeassistant.helpers.device_registry.DeviceEntry", MockDeviceEntry)
@pytest.mark.parametrize(("trigger", "expected_error"), BAD_TRIGGERS)
async def test_automation_with_bad_trigger( async def test_automation_with_bad_trigger(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
device_registry: dr.DeviceRegistry,
trigger: dict[str, str],
expected_error: str,
) -> None: ) -> None:
"""Test automation with bad device trigger.""" """Test automation with bad device trigger."""
config_entry = MockConfigEntry(domain="fake_integration", data={})
config_entry.state = config_entries.ConfigEntryState.LOADED
config_entry.add_to_hass(hass)
device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")},
)
assert await async_setup_component( assert await async_setup_component(
hass, hass,
automation.DOMAIN, automation.DOMAIN,
{ {
automation.DOMAIN: { automation.DOMAIN: {
"alias": "hello", "alias": "hello",
"trigger": {"platform": "device", "domain": "light"}, "trigger": {"platform": "device"} | trigger,
"action": {"service": "test.automation", "entity_id": "hello.world"}, "action": {"service": "test.automation", "entity_id": "hello.world"},
} }
}, },
) )
assert "required key not provided" in caplog.text assert expected_error.format(path="") in caplog.text
async def test_websocket_device_not_found( async def test_websocket_device_not_found(