Add support for validating and serializing selectors (#66565)

Co-authored-by: Paulus Schoutsen <balloob@gmail.com>
This commit is contained in:
Erik Montnemery 2022-02-18 23:24:08 +01:00 committed by GitHub
parent 2ca6ec0290
commit ec67dcb620
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 342 additions and 131 deletions

View file

@ -939,6 +939,8 @@ def key_dependency(
def custom_serializer(schema: Any) -> Any: def custom_serializer(schema: Any) -> Any:
"""Serialize additional types for voluptuous_serialize.""" """Serialize additional types for voluptuous_serialize."""
from . import selector # pylint: disable=import-outside-toplevel
if schema is positive_time_period_dict: if schema is positive_time_period_dict:
return {"type": "positive_time_period_dict"} return {"type": "positive_time_period_dict"}
@ -951,6 +953,9 @@ def custom_serializer(schema: Any) -> Any:
if isinstance(schema, multi_select): if isinstance(schema, multi_select):
return {"type": "multi_select", "options": schema.options} return {"type": "multi_select", "options": schema.options}
if isinstance(schema, selector.Selector):
return schema.serialize()
return voluptuous_serialize.UNSUPPORTED return voluptuous_serialize.UNSUPPORTED

View file

@ -2,18 +2,22 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Callable from collections.abc import Callable
from datetime import time as time_sys
from typing import Any, cast from typing import Any, cast
import voluptuous as vol import voluptuous as vol
from homeassistant.const import CONF_MODE, CONF_UNIT_OF_MEASUREMENT from homeassistant.const import CONF_MODE, CONF_UNIT_OF_MEASUREMENT
from homeassistant.core import split_entity_id
from homeassistant.util import decorator from homeassistant.util import decorator
from . import config_validation as cv
SELECTORS = decorator.Registry() SELECTORS = decorator.Registry()
def validate_selector(config: Any) -> dict: def _get_selector_class(config: Any) -> type[Selector]:
"""Validate a selector.""" """Get selector class type."""
if not isinstance(config, dict): if not isinstance(config, dict):
raise vol.Invalid("Expected a dictionary") raise vol.Invalid("Expected a dictionary")
@ -25,6 +29,26 @@ def validate_selector(config: Any) -> dict:
if (selector_class := SELECTORS.get(selector_type)) is None: if (selector_class := SELECTORS.get(selector_type)) is None:
raise vol.Invalid(f"Unknown selector type {selector_type} found") raise vol.Invalid(f"Unknown selector type {selector_type} found")
return cast(type[Selector], selector_class)
def selector(config: Any) -> Selector:
"""Instantiate a selector."""
selector_class = _get_selector_class(config)
selector_type = list(config)[0]
# Selectors can be empty
if config[selector_type] is None:
return selector_class({selector_type: {}})
return selector_class(config)
def validate_selector(config: Any) -> dict:
"""Validate a selector."""
selector_class = _get_selector_class(config)
selector_type = list(config)[0]
# Selectors can be empty # Selectors can be empty
if config[selector_type] is None: if config[selector_type] is None:
return {selector_type: {}} return {selector_type: {}}
@ -38,12 +62,24 @@ class Selector:
"""Base class for selectors.""" """Base class for selectors."""
CONFIG_SCHEMA: Callable CONFIG_SCHEMA: Callable
config: Any
selector_type: str
def __init__(self, config: Any) -> None:
"""Instantiate a selector."""
self.config = self.CONFIG_SCHEMA(config[self.selector_type])
def serialize(self) -> Any:
"""Serialize Selector for voluptuous_serialize."""
return {"selector": {self.selector_type: self.config}}
@SELECTORS.register("entity") @SELECTORS.register("entity")
class EntitySelector(Selector): class EntitySelector(Selector):
"""Selector of a single entity.""" """Selector of a single entity."""
selector_type = "entity"
CONFIG_SCHEMA = vol.Schema( CONFIG_SCHEMA = vol.Schema(
{ {
# Integration that provided the entity # Integration that provided the entity
@ -55,11 +91,30 @@ class EntitySelector(Selector):
} }
) )
def __call__(self, data: Any) -> str:
"""Validate the passed selection."""
try:
entity_id = cv.entity_id(data)
domain = split_entity_id(entity_id)[0]
except vol.Invalid:
# Not a valid entity_id, maybe it's an entity entry id
return cv.entity_id_or_uuid(cv.string(data))
else:
if "domain" in self.config and domain != self.config["domain"]:
raise vol.Invalid(
f"Entity {entity_id} belongs to domain {domain}, "
f"expected {self.config['domain']}"
)
return entity_id
@SELECTORS.register("device") @SELECTORS.register("device")
class DeviceSelector(Selector): class DeviceSelector(Selector):
"""Selector of a single device.""" """Selector of a single device."""
selector_type = "device"
CONFIG_SCHEMA = vol.Schema( CONFIG_SCHEMA = vol.Schema(
{ {
# Integration linked to it with a config entry # Integration linked to it with a config entry
@ -73,35 +128,35 @@ class DeviceSelector(Selector):
} }
) )
def __call__(self, data: Any) -> str:
"""Validate the passed selection."""
return cv.string(data)
@SELECTORS.register("area") @SELECTORS.register("area")
class AreaSelector(Selector): class AreaSelector(Selector):
"""Selector of a single area.""" """Selector of a single area."""
selector_type = "area"
CONFIG_SCHEMA = vol.Schema( CONFIG_SCHEMA = vol.Schema(
{ {
vol.Optional("entity"): vol.Schema( vol.Optional("entity"): EntitySelector.CONFIG_SCHEMA,
{ vol.Optional("device"): DeviceSelector.CONFIG_SCHEMA,
vol.Optional("domain"): str,
vol.Optional("device_class"): str,
vol.Optional("integration"): str,
}
),
vol.Optional("device"): vol.Schema(
{
vol.Optional("integration"): str,
vol.Optional("manufacturer"): str,
vol.Optional("model"): str,
}
),
} }
) )
def __call__(self, data: Any) -> str:
"""Validate the passed selection."""
return cv.string(data)
@SELECTORS.register("number") @SELECTORS.register("number")
class NumberSelector(Selector): class NumberSelector(Selector):
"""Selector of a numeric value.""" """Selector of a numeric value."""
selector_type = "number"
CONFIG_SCHEMA = vol.Schema( CONFIG_SCHEMA = vol.Schema(
{ {
vol.Required("min"): vol.Coerce(float), vol.Required("min"): vol.Coerce(float),
@ -114,80 +169,131 @@ class NumberSelector(Selector):
} }
) )
def __call__(self, data: Any) -> float:
"""Validate the passed selection."""
value: float = vol.Coerce(float)(data)
if not self.config["min"] <= value <= self.config["max"]:
raise vol.Invalid(f"Value {value} is too small or too large")
return value
@SELECTORS.register("addon") @SELECTORS.register("addon")
class AddonSelector(Selector): class AddonSelector(Selector):
"""Selector of a add-on.""" """Selector of a add-on."""
selector_type = "addon"
CONFIG_SCHEMA = vol.Schema({}) CONFIG_SCHEMA = vol.Schema({})
def __call__(self, data: Any) -> str:
"""Validate the passed selection."""
return cv.string(data)
@SELECTORS.register("boolean") @SELECTORS.register("boolean")
class BooleanSelector(Selector): class BooleanSelector(Selector):
"""Selector of a boolean value.""" """Selector of a boolean value."""
selector_type = "boolean"
CONFIG_SCHEMA = vol.Schema({}) CONFIG_SCHEMA = vol.Schema({})
def __call__(self, data: Any) -> bool:
"""Validate the passed selection."""
value: bool = vol.Coerce(bool)(data)
return value
@SELECTORS.register("time") @SELECTORS.register("time")
class TimeSelector(Selector): class TimeSelector(Selector):
"""Selector of a time value.""" """Selector of a time value."""
selector_type = "time"
CONFIG_SCHEMA = vol.Schema({}) CONFIG_SCHEMA = vol.Schema({})
def __call__(self, data: Any) -> time_sys:
"""Validate the passed selection."""
return cv.time(data)
@SELECTORS.register("target") @SELECTORS.register("target")
class TargetSelector(Selector): class TargetSelector(Selector):
"""Selector of a target value (area ID, device ID, entity ID etc). """Selector of a target value (area ID, device ID, entity ID etc).
Value should follow cv.ENTITY_SERVICE_FIELDS format. Value should follow cv.TARGET_SERVICE_FIELDS format.
""" """
selector_type = "target"
CONFIG_SCHEMA = vol.Schema( CONFIG_SCHEMA = vol.Schema(
{ {
vol.Optional("entity"): vol.Schema( vol.Optional("entity"): EntitySelector.CONFIG_SCHEMA,
{ vol.Optional("device"): DeviceSelector.CONFIG_SCHEMA,
vol.Optional("domain"): str,
vol.Optional("device_class"): str,
vol.Optional("integration"): str,
}
),
vol.Optional("device"): vol.Schema(
{
vol.Optional("integration"): str,
vol.Optional("manufacturer"): str,
vol.Optional("model"): str,
}
),
} }
) )
TARGET_SELECTION_SCHEMA = vol.Schema(cv.TARGET_SERVICE_FIELDS)
def __call__(self, data: Any) -> dict[str, list[str]]:
"""Validate the passed selection."""
target: dict[str, list[str]] = self.TARGET_SELECTION_SCHEMA(data)
return target
@SELECTORS.register("action") @SELECTORS.register("action")
class ActionSelector(Selector): class ActionSelector(Selector):
"""Selector of an action sequence (script syntax).""" """Selector of an action sequence (script syntax)."""
selector_type = "action"
CONFIG_SCHEMA = vol.Schema({}) CONFIG_SCHEMA = vol.Schema({})
def __call__(self, data: Any) -> Any:
"""Validate the passed selection."""
return data
@SELECTORS.register("object") @SELECTORS.register("object")
class ObjectSelector(Selector): class ObjectSelector(Selector):
"""Selector for an arbitrary object.""" """Selector for an arbitrary object."""
selector_type = "object"
CONFIG_SCHEMA = vol.Schema({}) CONFIG_SCHEMA = vol.Schema({})
def __call__(self, data: Any) -> Any:
"""Validate the passed selection."""
return data
@SELECTORS.register("text") @SELECTORS.register("text")
class StringSelector(Selector): class StringSelector(Selector):
"""Selector for a multi-line text string.""" """Selector for a multi-line text string."""
selector_type = "text"
CONFIG_SCHEMA = vol.Schema({vol.Optional("multiline", default=False): bool}) CONFIG_SCHEMA = vol.Schema({vol.Optional("multiline", default=False): bool})
def __call__(self, data: Any) -> str:
"""Validate the passed selection."""
text = cv.string(data)
return text
@SELECTORS.register("select") @SELECTORS.register("select")
class SelectSelector(Selector): class SelectSelector(Selector):
"""Selector for an single-choice input select.""" """Selector for an single-choice input select."""
selector_type = "select"
CONFIG_SCHEMA = vol.Schema( CONFIG_SCHEMA = vol.Schema(
{vol.Required("options"): vol.All([str], vol.Length(min=1))} {vol.Required("options"): vol.All([str], vol.Length(min=1))}
) )
def __call__(self, data: Any) -> Any:
"""Validate the passed selection."""
selected_option = vol.In(self.config["options"])(cv.string(data))
return selected_option

View file

@ -2,7 +2,10 @@
import pytest import pytest
import voluptuous as vol import voluptuous as vol
from homeassistant.helpers import selector from homeassistant.helpers import config_validation as cv, selector
from homeassistant.util import dt as dt_util
FAKE_UUID = "a266a680b608c32770e6c45bfe6b8411"
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -20,6 +23,8 @@ def test_valid_base_schema(schema):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema",
( (
None,
"not_a_dict",
{}, {},
{"non_existing": {}}, {"non_existing": {}},
# Two keys # Two keys
@ -38,173 +43,268 @@ def test_validate_selector():
assert schema == selector.validate_selector(schema) assert schema == selector.validate_selector(schema)
def _test_selector(
selector_type, schema, valid_selections, invalid_selections, converter=None
):
"""Help test a selector."""
def default_converter(x):
return x
if converter is None:
converter = default_converter
# Validate selector configuration
selector.validate_selector({selector_type: schema})
# Use selector in schema and validate
vol_schema = vol.Schema({"selection": selector.selector({selector_type: schema})})
for selection in valid_selections:
assert vol_schema({"selection": selection}) == {
"selection": converter(selection)
}
for selection in invalid_selections:
with pytest.raises(vol.Invalid):
vol_schema({"selection": selection})
# Serialize selector
selector_instance = selector.selector({selector_type: schema})
assert cv.custom_serializer(selector_instance) == {
"selector": {selector_type: selector_instance.config}
}
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema,valid_selections,invalid_selections",
( (
{}, (None, ("abc123",), (None,)),
{"integration": "zha"}, ({}, ("abc123",), (None,)),
{"manufacturer": "mock-manuf"}, ({"integration": "zha"}, ("abc123",), (None,)),
{"model": "mock-model"}, ({"manufacturer": "mock-manuf"}, ("abc123",), (None,)),
{"manufacturer": "mock-manuf", "model": "mock-model"}, ({"model": "mock-model"}, ("abc123",), (None,)),
{"integration": "zha", "manufacturer": "mock-manuf", "model": "mock-model"}, ({"manufacturer": "mock-manuf", "model": "mock-model"}, ("abc123",), (None,)),
{"entity": {"device_class": "motion"}}, (
{ {"integration": "zha", "manufacturer": "mock-manuf", "model": "mock-model"},
"integration": "zha", ("abc123",),
"manufacturer": "mock-manuf", (None,),
"model": "mock-model", ),
"entity": {"domain": "binary_sensor", "device_class": "motion"}, ({"entity": {"device_class": "motion"}}, ("abc123",), (None,)),
}, (
{
"integration": "zha",
"manufacturer": "mock-manuf",
"model": "mock-model",
"entity": {"domain": "binary_sensor", "device_class": "motion"},
},
("abc123",),
(None,),
),
), ),
) )
def test_device_selector_schema(schema): def test_device_selector_schema(schema, valid_selections, invalid_selections):
"""Test device selector.""" """Test device selector."""
selector.validate_selector({"device": schema}) _test_selector("device", schema, valid_selections, invalid_selections)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema,valid_selections,invalid_selections",
( (
{}, ({}, ("sensor.abc123", FAKE_UUID), (None, "abc123")),
{"integration": "zha"}, ({"integration": "zha"}, ("sensor.abc123", FAKE_UUID), (None, "abc123")),
{"domain": "light"}, ({"domain": "light"}, ("light.abc123", FAKE_UUID), (None, "sensor.abc123")),
{"device_class": "motion"}, ({"device_class": "motion"}, ("sensor.abc123", FAKE_UUID), (None, "abc123")),
{"integration": "zha", "domain": "light"}, (
{"integration": "zha", "domain": "binary_sensor", "device_class": "motion"}, {"integration": "zha", "domain": "light"},
("light.abc123", FAKE_UUID),
(None, "sensor.abc123"),
),
(
{"integration": "zha", "domain": "binary_sensor", "device_class": "motion"},
("binary_sensor.abc123", FAKE_UUID),
(None, "sensor.abc123"),
),
), ),
) )
def test_entity_selector_schema(schema): def test_entity_selector_schema(schema, valid_selections, invalid_selections):
"""Test entity selector.""" """Test entity selector."""
selector.validate_selector({"entity": schema}) _test_selector("entity", schema, valid_selections, invalid_selections)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema,valid_selections,invalid_selections",
( (
{}, ({}, ("abc123",), (None,)),
{"entity": {}}, ({"entity": {}}, ("abc123",), (None,)),
{"entity": {"domain": "light"}}, ({"entity": {"domain": "light"}}, ("abc123",), (None,)),
{"entity": {"domain": "binary_sensor", "device_class": "motion"}}, (
{ {"entity": {"domain": "binary_sensor", "device_class": "motion"}},
"entity": { ("abc123",),
"domain": "binary_sensor", (None,),
"device_class": "motion", ),
"integration": "demo", (
} {
}, "entity": {
{"device": {"integration": "demo", "model": "mock-model"}}, "domain": "binary_sensor",
{ "device_class": "motion",
"entity": {"domain": "binary_sensor", "device_class": "motion"}, "integration": "demo",
"device": {"integration": "demo", "model": "mock-model"}, }
}, },
("abc123",),
(None,),
),
(
{"device": {"integration": "demo", "model": "mock-model"}},
("abc123",),
(None,),
),
(
{
"entity": {"domain": "binary_sensor", "device_class": "motion"},
"device": {"integration": "demo", "model": "mock-model"},
},
("abc123",),
(None,),
),
), ),
) )
def test_area_selector_schema(schema): def test_area_selector_schema(schema, valid_selections, invalid_selections):
"""Test area selector.""" """Test area selector."""
selector.validate_selector({"area": schema}) _test_selector("area", schema, valid_selections, invalid_selections)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema,valid_selections,invalid_selections",
( (
{"min": 10, "max": 50}, (
{"min": -100, "max": 100, "step": 5}, {"min": 10, "max": 50},
{"min": -20, "max": -10, "mode": "box"}, (
{"min": 0, "max": 100, "unit_of_measurement": "seconds", "mode": "slider"}, 10,
{"min": 10, "max": 1000, "mode": "slider", "step": 0.5}, 50,
),
(9, 51),
),
({"min": -100, "max": 100, "step": 5}, (), ()),
({"min": -20, "max": -10, "mode": "box"}, (), ()),
(
{"min": 0, "max": 100, "unit_of_measurement": "seconds", "mode": "slider"},
(),
(),
),
({"min": 10, "max": 1000, "mode": "slider", "step": 0.5}, (), ()),
), ),
) )
def test_number_selector_schema(schema): def test_number_selector_schema(schema, valid_selections, invalid_selections):
"""Test number selector.""" """Test number selector."""
selector.validate_selector({"number": schema}) _test_selector("number", schema, valid_selections, invalid_selections)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema,valid_selections,invalid_selections",
({},), (({}, ("abc123",), (None,)),),
) )
def test_addon_selector_schema(schema): def test_addon_selector_schema(schema, valid_selections, invalid_selections):
"""Test add-on selector.""" """Test add-on selector."""
selector.validate_selector({"addon": schema}) _test_selector("addon", schema, valid_selections, invalid_selections)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema,valid_selections,invalid_selections",
({},), (({}, (1, "one", None), ()),), # Everything can be coarced to bool
) )
def test_boolean_selector_schema(schema): def test_boolean_selector_schema(schema, valid_selections, invalid_selections):
"""Test boolean selector.""" """Test boolean selector."""
selector.validate_selector({"boolean": schema}) _test_selector("boolean", schema, valid_selections, invalid_selections, bool)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema,valid_selections,invalid_selections",
({},), (({}, ("00:00:00",), ("blah", None)),),
) )
def test_time_selector_schema(schema): def test_time_selector_schema(schema, valid_selections, invalid_selections):
"""Test time selector.""" """Test time selector."""
selector.validate_selector({"time": schema}) _test_selector(
"time", schema, valid_selections, invalid_selections, dt_util.parse_time
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema,valid_selections,invalid_selections",
( (
{}, ({}, ({"entity_id": ["sensor.abc123"]},), ("abc123", None)),
{"entity": {}}, ({"entity": {}}, (), ()),
{"entity": {"domain": "light"}}, ({"entity": {"domain": "light"}}, (), ()),
{"entity": {"domain": "binary_sensor", "device_class": "motion"}}, ({"entity": {"domain": "binary_sensor", "device_class": "motion"}}, (), ()),
{ (
"entity": { {
"domain": "binary_sensor", "entity": {
"device_class": "motion", "domain": "binary_sensor",
"integration": "demo", "device_class": "motion",
} "integration": "demo",
}, }
{"device": {"integration": "demo", "model": "mock-model"}}, },
{ (),
"entity": {"domain": "binary_sensor", "device_class": "motion"}, (),
"device": {"integration": "demo", "model": "mock-model"}, ),
}, ({"device": {"integration": "demo", "model": "mock-model"}}, (), ()),
(
{
"entity": {"domain": "binary_sensor", "device_class": "motion"},
"device": {"integration": "demo", "model": "mock-model"},
},
(),
(),
),
), ),
) )
def test_target_selector_schema(schema): def test_target_selector_schema(schema, valid_selections, invalid_selections):
"""Test target selector.""" """Test target selector."""
selector.validate_selector({"target": schema}) _test_selector("target", schema, valid_selections, invalid_selections)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema,valid_selections,invalid_selections",
({},), (({}, ("abc123",), ()),),
) )
def test_action_selector_schema(schema): def test_action_selector_schema(schema, valid_selections, invalid_selections):
"""Test action sequence selector.""" """Test action sequence selector."""
selector.validate_selector({"action": schema}) _test_selector("action", schema, valid_selections, invalid_selections)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema,valid_selections,invalid_selections",
({},), (({}, ("abc123",), ()),),
) )
def test_object_selector_schema(schema): def test_object_selector_schema(schema, valid_selections, invalid_selections):
"""Test object selector.""" """Test object selector."""
selector.validate_selector({"object": schema}) _test_selector("object", schema, valid_selections, invalid_selections)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema,valid_selections,invalid_selections",
({}, {"multiline": True}, {"multiline": False}), (
({}, ("abc123",), (None,)),
({"multiline": True}, (), ()),
({"multiline": False}, (), ()),
),
) )
def test_text_selector_schema(schema): def test_text_selector_schema(schema, valid_selections, invalid_selections):
"""Test text selector.""" """Test text selector."""
selector.validate_selector({"text": schema}) _test_selector("text", schema, valid_selections, invalid_selections)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"schema", "schema,valid_selections,invalid_selections",
({"options": ["red", "green", "blue"]},), (
(
{"options": ["red", "green", "blue"]},
("red", "green", "blue"),
("cat", 0, None),
),
),
) )
def test_select_selector_schema(schema): def test_select_selector_schema(schema, valid_selections, invalid_selections):
"""Test select selector.""" """Test select selector."""
selector.validate_selector({"select": schema}) _test_selector("select", schema, valid_selections, invalid_selections)
@pytest.mark.parametrize( @pytest.mark.parametrize(