Intent target matching and media player enhancements (#115445)

* Working

* Tests are passing

* Fix climate

* Requested changes from review
This commit is contained in:
Michael Hansen 2024-05-07 21:01:03 -05:00 committed by GitHub
parent 8401b05d40
commit 7923471b94
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1734 additions and 589 deletions

View file

@ -56,6 +56,7 @@ class GetTemperatureIntent(intent.IntentHandler):
if climate_state is None: if climate_state is None:
raise intent.NoStatesMatchedError( raise intent.NoStatesMatchedError(
reason=intent.MatchFailedReason.AREA,
name=entity_text or entity_name, name=entity_text or entity_name,
area=area_name or area_id, area=area_name or area_id,
floor=None, floor=None,
@ -74,6 +75,7 @@ class GetTemperatureIntent(intent.IntentHandler):
if climate_state is None: if climate_state is None:
raise intent.NoStatesMatchedError( raise intent.NoStatesMatchedError(
reason=intent.MatchFailedReason.NAME,
name=entity_name, name=entity_name,
area=None, area=None,
floor=None, floor=None,

View file

@ -351,10 +351,10 @@ class DefaultAgent(ConversationEntity):
language, language,
assistant=DOMAIN, assistant=DOMAIN,
) )
except intent.NoStatesMatchedError as no_states_error: except intent.MatchFailedError as match_error:
# Intent was valid, but no entities matched the constraints. # Intent was valid, but no entities matched the constraints.
error_response_type, error_response_args = _get_no_states_matched_response( error_response_type, error_response_args = _get_match_error_response(
no_states_error match_error
) )
return _make_error_result( return _make_error_result(
language, language,
@ -364,20 +364,6 @@ class DefaultAgent(ConversationEntity):
), ),
conversation_id, conversation_id,
) )
except intent.DuplicateNamesMatchedError as duplicate_names_error:
# Intent was valid, but two or more entities with the same name matched.
(
error_response_type,
error_response_args,
) = _get_duplicate_names_matched_response(duplicate_names_error)
return _make_error_result(
language,
intent.IntentResponseErrorCode.NO_VALID_TARGETS,
self._get_error_text(
error_response_type, lang_intents, **error_response_args
),
conversation_id,
)
except intent.IntentHandleError: except intent.IntentHandleError:
# Intent was valid and entities matched constraints, but an error # Intent was valid and entities matched constraints, but an error
# occurred during handling. # occurred during handling.
@ -804,34 +790,34 @@ class DefaultAgent(ConversationEntity):
_LOGGER.debug("Exposed entities: %s", entity_names) _LOGGER.debug("Exposed entities: %s", entity_names)
# Expose all areas. # Expose all areas.
#
# We pass in area id here with the expectation that no two areas will
# share the same name or alias.
areas = ar.async_get(self.hass) areas = ar.async_get(self.hass)
area_names = [] area_names = []
for area in areas.async_list_areas(): for area in areas.async_list_areas():
area_names.append((area.name, area.id)) area_names.append((area.name, area.name))
if area.aliases: if not area.aliases:
for alias in area.aliases: continue
if not alias.strip():
continue
area_names.append((alias, area.id)) for alias in area.aliases:
alias = alias.strip()
if not alias:
continue
area_names.append((alias, alias))
# Expose all floors. # Expose all floors.
#
# We pass in floor id here with the expectation that no two floors will
# share the same name or alias.
floors = fr.async_get(self.hass) floors = fr.async_get(self.hass)
floor_names = [] floor_names = []
for floor in floors.async_list_floors(): for floor in floors.async_list_floors():
floor_names.append((floor.name, floor.floor_id)) floor_names.append((floor.name, floor.name))
if floor.aliases: if not floor.aliases:
for alias in floor.aliases: continue
if not alias.strip():
continue
floor_names.append((alias, floor.floor_id)) for alias in floor.aliases:
alias = alias.strip()
if not alias:
continue
floor_names.append((alias, floor.name))
self._slot_lists = { self._slot_lists = {
"area": TextSlotList.from_tuples(area_names, allow_template=False), "area": TextSlotList.from_tuples(area_names, allow_template=False),
@ -1021,61 +1007,77 @@ def _get_unmatched_response(result: RecognizeResult) -> tuple[ErrorKey, dict[str
return ErrorKey.NO_INTENT, {} return ErrorKey.NO_INTENT, {}
def _get_no_states_matched_response( def _get_match_error_response(
no_states_error: intent.NoStatesMatchedError, match_error: intent.MatchFailedError,
) -> tuple[ErrorKey, dict[str, Any]]: ) -> tuple[ErrorKey, dict[str, Any]]:
"""Return key and template arguments for error when intent returns no matching states.""" """Return key and template arguments for error when target matching fails."""
# Device classes should be checked before domains constraints, result = match_error.constraints, match_error.result
if no_states_error.device_classes: reason = result.no_match_reason
device_class = next(iter(no_states_error.device_classes)) # first device class
if no_states_error.area: if (
reason
in (intent.MatchFailedReason.DEVICE_CLASS, intent.MatchFailedReason.DOMAIN)
) and constraints.device_classes:
device_class = next(iter(constraints.device_classes)) # first device class
if constraints.area_name:
# device_class in area # device_class in area
return ErrorKey.NO_DEVICE_CLASS_IN_AREA, { return ErrorKey.NO_DEVICE_CLASS_IN_AREA, {
"device_class": device_class, "device_class": device_class,
"area": no_states_error.area, "area": constraints.area_name,
} }
# device_class only # device_class only
return ErrorKey.NO_DEVICE_CLASS, {"device_class": device_class} return ErrorKey.NO_DEVICE_CLASS, {"device_class": device_class}
if no_states_error.domains: if (reason == intent.MatchFailedReason.DOMAIN) and constraints.domains:
domain = next(iter(no_states_error.domains)) # first domain domain = next(iter(constraints.domains)) # first domain
if no_states_error.area: if constraints.area_name:
# domain in area # domain in area
return ErrorKey.NO_DOMAIN_IN_AREA, { return ErrorKey.NO_DOMAIN_IN_AREA, {
"domain": domain, "domain": domain,
"area": no_states_error.area, "area": constraints.area_name,
} }
if no_states_error.floor: if constraints.floor_name:
# domain in floor # domain in floor
return ErrorKey.NO_DOMAIN_IN_FLOOR, { return ErrorKey.NO_DOMAIN_IN_FLOOR, {
"domain": domain, "domain": domain,
"floor": no_states_error.floor, "floor": constraints.floor_name,
} }
# domain only # domain only
return ErrorKey.NO_DOMAIN, {"domain": domain} return ErrorKey.NO_DOMAIN, {"domain": domain}
if reason == intent.MatchFailedReason.DUPLICATE_NAME:
if constraints.floor_name:
# duplicate on floor
return ErrorKey.DUPLICATE_ENTITIES_IN_FLOOR, {
"entity": result.no_match_name,
"floor": constraints.floor_name,
}
if constraints.area_name:
# duplicate on area
return ErrorKey.DUPLICATE_ENTITIES_IN_AREA, {
"entity": result.no_match_name,
"area": constraints.area_name,
}
return ErrorKey.DUPLICATE_ENTITIES, {"entity": result.no_match_name}
if reason == intent.MatchFailedReason.INVALID_AREA:
# Invalid area name
return ErrorKey.NO_AREA, {"area": result.no_match_name}
if reason == intent.MatchFailedReason.INVALID_FLOOR:
# Invalid floor name
return ErrorKey.NO_FLOOR, {"floor": result.no_match_name}
# Default error # Default error
return ErrorKey.NO_INTENT, {} return ErrorKey.NO_INTENT, {}
def _get_duplicate_names_matched_response(
duplicate_names_error: intent.DuplicateNamesMatchedError,
) -> tuple[ErrorKey, dict[str, Any]]:
"""Return key and template arguments for error when intent returns duplicate matches."""
if duplicate_names_error.area:
return ErrorKey.DUPLICATE_ENTITIES_IN_AREA, {
"entity": duplicate_names_error.name,
"area": duplicate_names_error.area,
}
return ErrorKey.DUPLICATE_ENTITIES, {"entity": duplicate_names_error.name}
def _collect_list_references(expression: Expression, list_names: set[str]) -> None: def _collect_list_references(expression: Expression, list_names: set[str]) -> None:
"""Collect list reference names recursively.""" """Collect list reference names recursively."""
if isinstance(expression, Sequence): if isinstance(expression, Sequence):

View file

@ -35,12 +35,7 @@ from homeassistant.const import (
SERVICE_TURN_ON, SERVICE_TURN_ON,
) )
from homeassistant.core import DOMAIN as HA_DOMAIN, HomeAssistant, State from homeassistant.core import DOMAIN as HA_DOMAIN, HomeAssistant, State
from homeassistant.helpers import ( from homeassistant.helpers import config_validation as cv, integration_platform, intent
area_registry as ar,
config_validation as cv,
integration_platform,
intent,
)
from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN from .const import DOMAIN
@ -176,7 +171,7 @@ class GetStateIntentHandler(intent.IntentHandler):
intent_type = intent.INTENT_GET_STATE intent_type = intent.INTENT_GET_STATE
slot_schema = { slot_schema = {
vol.Any("name", "area"): cv.string, vol.Any("name", "area", "floor"): cv.string,
vol.Optional("domain"): vol.All(cv.ensure_list, [cv.string]), vol.Optional("domain"): vol.All(cv.ensure_list, [cv.string]),
vol.Optional("device_class"): vol.All(cv.ensure_list, [cv.string]), vol.Optional("device_class"): vol.All(cv.ensure_list, [cv.string]),
vol.Optional("state"): vol.All(cv.ensure_list, [cv.string]), vol.Optional("state"): vol.All(cv.ensure_list, [cv.string]),
@ -190,18 +185,13 @@ class GetStateIntentHandler(intent.IntentHandler):
# Entity name to match # Entity name to match
name_slot = slots.get("name", {}) name_slot = slots.get("name", {})
entity_name: str | None = name_slot.get("value") entity_name: str | None = name_slot.get("value")
entity_text: str | None = name_slot.get("text")
# Look up area first to fail early # Get area/floor info
area_slot = slots.get("area", {}) area_slot = slots.get("area", {})
area_id = area_slot.get("value") area_id = area_slot.get("value")
area_name = area_slot.get("text")
area: ar.AreaEntry | None = None floor_slot = slots.get("floor", {})
if area_id is not None: floor_id = floor_slot.get("value")
areas = ar.async_get(hass)
area = areas.async_get_area(area_id)
if area is None:
raise intent.IntentHandleError(f"No area named {area_name}")
# Optional domain/device class filters. # Optional domain/device class filters.
# Convert to sets for speed. # Convert to sets for speed.
@ -218,32 +208,24 @@ class GetStateIntentHandler(intent.IntentHandler):
if "state" in slots: if "state" in slots:
state_names = set(slots["state"]["value"]) state_names = set(slots["state"]["value"])
states = list( match_constraints = intent.MatchTargetsConstraints(
intent.async_match_states( name=entity_name,
hass, area_name=area_id,
name=entity_name, floor_name=floor_id,
area=area, domains=domains,
domains=domains, device_classes=device_classes,
device_classes=device_classes, assistant=intent_obj.assistant,
assistant=intent_obj.assistant,
)
) )
match_result = intent.async_match_targets(hass, match_constraints)
_LOGGER.debug( if (
"Found %s state(s) that matched: name=%s, area=%s, domains=%s, device_classes=%s, assistant=%s", (not match_result.is_match)
len(states), and (match_result.no_match_reason is not None)
entity_name, and (not match_result.no_match_reason.is_no_entities_reason())
area, ):
domains, # Don't try to answer questions for certain errors.
device_classes, # Other match failure reasons are OK.
intent_obj.assistant, raise intent.MatchFailedError(
) result=match_result, constraints=match_constraints
if entity_name and (len(states) > 1):
# Multiple entities matched for the same name
raise intent.DuplicateNamesMatchedError(
name=entity_text or entity_name,
area=area_name or area_id,
) )
# Create response # Create response
@ -251,13 +233,24 @@ class GetStateIntentHandler(intent.IntentHandler):
response.response_type = intent.IntentResponseType.QUERY_ANSWER response.response_type = intent.IntentResponseType.QUERY_ANSWER
success_results: list[intent.IntentResponseTarget] = [] success_results: list[intent.IntentResponseTarget] = []
if area is not None: if match_result.areas:
success_results.append( success_results.extend(
intent.IntentResponseTarget( intent.IntentResponseTarget(
type=intent.IntentResponseTargetType.AREA, type=intent.IntentResponseTargetType.AREA,
name=area.name, name=area.name,
id=area.id, id=area.id,
) )
for area in match_result.areas
)
if match_result.floors:
success_results.extend(
intent.IntentResponseTarget(
type=intent.IntentResponseTargetType.FLOOR,
name=floor.name,
id=floor.floor_id,
)
for floor in match_result.floors
) )
# If we are matching a state name (e.g., "which lights are on?"), then # If we are matching a state name (e.g., "which lights are on?"), then
@ -271,7 +264,7 @@ class GetStateIntentHandler(intent.IntentHandler):
matched_states: list[State] = [] matched_states: list[State] = []
unmatched_states: list[State] = [] unmatched_states: list[State] = []
for state in states: for state in match_result.states:
success_results.append( success_results.append(
intent.IntentResponseTarget( intent.IntentResponseTarget(
type=intent.IntentResponseTargetType.ENTITY, type=intent.IntentResponseTargetType.ENTITY,
@ -309,7 +302,7 @@ class SetPositionIntentHandler(intent.DynamicServiceIntentHandler):
"""Create set position handler.""" """Create set position handler."""
super().__init__( super().__init__(
intent.INTENT_SET_POSITION, intent.INTENT_SET_POSITION,
extra_slots={ATTR_POSITION: vol.All(vol.Range(min=0, max=100))}, required_slots={ATTR_POSITION: vol.All(vol.Range(min=0, max=100))},
) )
def get_domain_and_service( def get_domain_and_service(

View file

@ -2,25 +2,16 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import logging import logging
from typing import Any
import voluptuous as vol import voluptuous as vol
from homeassistant.const import ATTR_ENTITY_ID, SERVICE_TURN_ON from homeassistant.const import SERVICE_TURN_ON
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers import area_registry as ar, config_validation as cv, intent from homeassistant.helpers import intent
import homeassistant.util.color as color_util import homeassistant.util.color as color_util
from . import ( from . import ATTR_BRIGHTNESS_PCT, ATTR_RGB_COLOR, DOMAIN
ATTR_BRIGHTNESS_PCT,
ATTR_RGB_COLOR,
ATTR_SUPPORTED_COLOR_MODES,
DOMAIN,
brightness_supported,
color_supported,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -29,120 +20,17 @@ INTENT_SET = "HassLightSet"
async def async_setup_intents(hass: HomeAssistant) -> None: async def async_setup_intents(hass: HomeAssistant) -> None:
"""Set up the light intents.""" """Set up the light intents."""
intent.async_register(hass, SetIntentHandler()) intent.async_register(
hass,
intent.ServiceIntentHandler(
class SetIntentHandler(intent.IntentHandler): INTENT_SET,
"""Handle set color intents.""" DOMAIN,
SERVICE_TURN_ON,
intent_type = INTENT_SET optional_slots={
slot_schema = { ("color", ATTR_RGB_COLOR): color_util.color_name_to_rgb,
vol.Any("name", "area"): cv.string, ("brightness", ATTR_BRIGHTNESS_PCT): vol.All(
vol.Optional("domain"): vol.All(cv.ensure_list, [cv.string]), vol.Coerce(int), vol.Range(0, 100)
vol.Optional("device_class"): vol.All(cv.ensure_list, [cv.string]), ),
vol.Optional("color"): color_util.color_name_to_rgb, },
vol.Optional("brightness"): vol.All(vol.Coerce(int), vol.Range(0, 100)), ),
} )
async def async_handle(self, intent_obj: intent.Intent) -> intent.IntentResponse:
"""Handle the hass intent."""
hass = intent_obj.hass
service_data: dict[str, Any] = {}
slots = self.async_validate_slots(intent_obj.slots)
name: str | None = slots.get("name", {}).get("value")
if name == "all":
# Don't match on name if targeting all entities
name = None
# Look up area first to fail early
area_name = slots.get("area", {}).get("value")
area: ar.AreaEntry | None = None
if area_name is not None:
areas = ar.async_get(hass)
area = areas.async_get_area(area_name) or areas.async_get_area_by_name(
area_name
)
if area is None:
raise intent.IntentHandleError(f"No area named {area_name}")
# Optional domain/device class filters.
# Convert to sets for speed.
domains: set[str] | None = None
device_classes: set[str] | None = None
if "domain" in slots:
domains = set(slots["domain"]["value"])
if "device_class" in slots:
device_classes = set(slots["device_class"]["value"])
states = list(
intent.async_match_states(
hass,
name=name,
area=area,
domains=domains,
device_classes=device_classes,
)
)
if not states:
raise intent.IntentHandleError("No entities matched")
if "color" in slots:
service_data[ATTR_RGB_COLOR] = slots["color"]["value"]
if "brightness" in slots:
service_data[ATTR_BRIGHTNESS_PCT] = slots["brightness"]["value"]
response = intent_obj.create_response()
needs_brightness = ATTR_BRIGHTNESS_PCT in service_data
needs_color = ATTR_RGB_COLOR in service_data
success_results: list[intent.IntentResponseTarget] = []
failed_results: list[intent.IntentResponseTarget] = []
service_coros = []
if area is not None:
success_results.append(
intent.IntentResponseTarget(
type=intent.IntentResponseTargetType.AREA,
name=area.name,
id=area.id,
)
)
for state in states:
target = intent.IntentResponseTarget(
type=intent.IntentResponseTargetType.ENTITY,
name=state.name,
id=state.entity_id,
)
# Test brightness/color
supported_color_modes = state.attributes.get(ATTR_SUPPORTED_COLOR_MODES)
if (needs_color and not color_supported(supported_color_modes)) or (
needs_brightness and not brightness_supported(supported_color_modes)
):
failed_results.append(target)
continue
service_coros.append(
hass.services.async_call(
DOMAIN,
SERVICE_TURN_ON,
{**service_data, ATTR_ENTITY_ID: state.entity_id},
context=intent_obj.context,
)
)
success_results.append(target)
# Handle service calls in parallel.
await asyncio.gather(*service_coros)
response.async_set_results(
success_results=success_results, failed_results=failed_results
)
return response

View file

@ -12,27 +12,29 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers import intent from homeassistant.helpers import intent
from . import ATTR_MEDIA_VOLUME_LEVEL, DOMAIN from . import ATTR_MEDIA_VOLUME_LEVEL, DOMAIN
from .const import MediaPlayerEntityFeature, MediaPlayerState
INTENT_MEDIA_PAUSE = "HassMediaPause" INTENT_MEDIA_PAUSE = "HassMediaPause"
INTENT_MEDIA_UNPAUSE = "HassMediaUnpause" INTENT_MEDIA_UNPAUSE = "HassMediaUnpause"
INTENT_MEDIA_NEXT = "HassMediaNext" INTENT_MEDIA_NEXT = "HassMediaNext"
INTENT_SET_VOLUME = "HassSetVolume" INTENT_SET_VOLUME = "HassSetVolume"
DATA_LAST_PAUSED = f"{DOMAIN}.last_paused"
async def async_setup_intents(hass: HomeAssistant) -> None: async def async_setup_intents(hass: HomeAssistant) -> None:
"""Set up the media_player intents.""" """Set up the media_player intents."""
intent.async_register( intent.async_register(hass, MediaUnpauseHandler())
hass, intent.async_register(hass, MediaPauseHandler())
intent.ServiceIntentHandler(INTENT_MEDIA_UNPAUSE, DOMAIN, SERVICE_MEDIA_PLAY),
)
intent.async_register(
hass,
intent.ServiceIntentHandler(INTENT_MEDIA_PAUSE, DOMAIN, SERVICE_MEDIA_PAUSE),
)
intent.async_register( intent.async_register(
hass, hass,
intent.ServiceIntentHandler( intent.ServiceIntentHandler(
INTENT_MEDIA_NEXT, DOMAIN, SERVICE_MEDIA_NEXT_TRACK INTENT_MEDIA_NEXT,
DOMAIN,
SERVICE_MEDIA_NEXT_TRACK,
required_domains={DOMAIN},
required_features=MediaPlayerEntityFeature.NEXT_TRACK,
required_states={MediaPlayerState.PLAYING},
), ),
) )
intent.async_register( intent.async_register(
@ -41,10 +43,88 @@ async def async_setup_intents(hass: HomeAssistant) -> None:
INTENT_SET_VOLUME, INTENT_SET_VOLUME,
DOMAIN, DOMAIN,
SERVICE_VOLUME_SET, SERVICE_VOLUME_SET,
extra_slots={ required_domains={DOMAIN},
required_states={MediaPlayerState.PLAYING},
required_features=MediaPlayerEntityFeature.VOLUME_SET,
required_slots={
ATTR_MEDIA_VOLUME_LEVEL: vol.All( ATTR_MEDIA_VOLUME_LEVEL: vol.All(
vol.Range(min=0, max=100), lambda val: val / 100 vol.Range(min=0, max=100), lambda val: val / 100
) )
}, },
), ),
) )
class MediaPauseHandler(intent.ServiceIntentHandler):
"""Handler for pause intent. Records last paused media players."""
def __init__(self) -> None:
"""Initialize handler."""
super().__init__(
INTENT_MEDIA_PAUSE,
DOMAIN,
SERVICE_MEDIA_PAUSE,
required_domains={DOMAIN},
required_features=MediaPlayerEntityFeature.PAUSE,
required_states={MediaPlayerState.PLAYING},
)
async def async_handle_states(
self,
intent_obj: intent.Intent,
match_result: intent.MatchTargetsResult,
match_constraints: intent.MatchTargetsConstraints,
match_preferences: intent.MatchTargetsPreferences | None = None,
) -> intent.IntentResponse:
"""Record last paused media players."""
hass = intent_obj.hass
if match_result.is_match:
# Save entity ids of paused media players
hass.data[DATA_LAST_PAUSED] = {s.entity_id for s in match_result.states}
return await super().async_handle_states(
intent_obj, match_result, match_constraints
)
class MediaUnpauseHandler(intent.ServiceIntentHandler):
"""Handler for unpause/resume intent. Uses last paused media players."""
def __init__(self) -> None:
"""Initialize handler."""
super().__init__(
INTENT_MEDIA_UNPAUSE,
DOMAIN,
SERVICE_MEDIA_PLAY,
required_domains={DOMAIN},
required_states={MediaPlayerState.PAUSED},
)
async def async_handle_states(
self,
intent_obj: intent.Intent,
match_result: intent.MatchTargetsResult,
match_constraints: intent.MatchTargetsConstraints,
match_preferences: intent.MatchTargetsPreferences | None = None,
) -> intent.IntentResponse:
"""Unpause last paused media players."""
hass = intent_obj.hass
if (
match_result.is_match
and (not match_constraints.name)
and (last_paused := hass.data.get(DATA_LAST_PAUSED))
):
# Resume only the previously paused media players if they are in the
# targeted set.
targeted_ids = {s.entity_id for s in match_result.states}
overlapping_ids = targeted_ids.intersection(last_paused)
if overlapping_ids:
match_result.states = [
s for s in match_result.states if s.entity_id in overlapping_ids
]
return await super().async_handle_states(
intent_obj, match_result, match_constraints
)

File diff suppressed because it is too large Load diff

View file

@ -183,7 +183,7 @@ async def test_get_temperature(
assert state.attributes["current_temperature"] == 22.0 assert state.attributes["current_temperature"] == 22.0
# Check area with no climate entities # Check area with no climate entities
with pytest.raises(intent.NoStatesMatchedError) as error: with pytest.raises(intent.MatchFailedError) as error:
response = await intent.async_handle( response = await intent.async_handle(
hass, hass,
"test", "test",
@ -192,14 +192,16 @@ async def test_get_temperature(
) )
# Exception should contain details of what we tried to match # Exception should contain details of what we tried to match
assert isinstance(error.value, intent.NoStatesMatchedError) assert isinstance(error.value, intent.MatchFailedError)
assert error.value.name is None assert error.value.result.no_match_reason == intent.MatchFailedReason.AREA
assert error.value.area == office_area.name constraints = error.value.constraints
assert error.value.domains == {DOMAIN} assert constraints.name is None
assert error.value.device_classes is None assert constraints.area_name == office_area.name
assert constraints.domains == {DOMAIN}
assert constraints.device_classes is None
# Check wrong name # Check wrong name
with pytest.raises(intent.NoStatesMatchedError) as error: with pytest.raises(intent.MatchFailedError) as error:
response = await intent.async_handle( response = await intent.async_handle(
hass, hass,
"test", "test",
@ -207,14 +209,16 @@ async def test_get_temperature(
{"name": {"value": "Does not exist"}}, {"name": {"value": "Does not exist"}},
) )
assert isinstance(error.value, intent.NoStatesMatchedError) assert isinstance(error.value, intent.MatchFailedError)
assert error.value.name == "Does not exist" assert error.value.result.no_match_reason == intent.MatchFailedReason.NAME
assert error.value.area is None constraints = error.value.constraints
assert error.value.domains == {DOMAIN} assert constraints.name == "Does not exist"
assert error.value.device_classes is None assert constraints.area_name is None
assert constraints.domains == {DOMAIN}
assert constraints.device_classes is None
# Check wrong name with area # Check wrong name with area
with pytest.raises(intent.NoStatesMatchedError) as error: with pytest.raises(intent.MatchFailedError) as error:
response = await intent.async_handle( response = await intent.async_handle(
hass, hass,
"test", "test",
@ -222,11 +226,13 @@ async def test_get_temperature(
{"name": {"value": "Climate 1"}, "area": {"value": bedroom_area.name}}, {"name": {"value": "Climate 1"}, "area": {"value": bedroom_area.name}},
) )
assert isinstance(error.value, intent.NoStatesMatchedError) assert isinstance(error.value, intent.MatchFailedError)
assert error.value.name == "Climate 1" assert error.value.result.no_match_reason == intent.MatchFailedReason.AREA
assert error.value.area == bedroom_area.name constraints = error.value.constraints
assert error.value.domains == {DOMAIN} assert constraints.name == "Climate 1"
assert error.value.device_classes is None assert constraints.area_name == bedroom_area.name
assert constraints.domains == {DOMAIN}
assert constraints.device_classes is None
async def test_get_temperature_no_entities( async def test_get_temperature_no_entities(
@ -275,7 +281,7 @@ async def test_get_temperature_no_state(
with ( with (
patch("homeassistant.core.StateMachine.async_all", return_value=[]), patch("homeassistant.core.StateMachine.async_all", return_value=[]),
pytest.raises(intent.NoStatesMatchedError) as error, pytest.raises(intent.MatchFailedError) as error,
): ):
await intent.async_handle( await intent.async_handle(
hass, hass,
@ -285,8 +291,10 @@ async def test_get_temperature_no_state(
) )
# Exception should contain details of what we tried to match # Exception should contain details of what we tried to match
assert isinstance(error.value, intent.NoStatesMatchedError) assert isinstance(error.value, intent.MatchFailedError)
assert error.value.name is None assert error.value.result.no_match_reason == intent.MatchFailedReason.AREA
assert error.value.area == "Living Room" constraints = error.value.constraints
assert error.value.domains == {DOMAIN} assert constraints.name is None
assert error.value.device_classes is None assert constraints.area_name == "Living Room"
assert constraints.domains == {DOMAIN}
assert constraints.device_classes is None

View file

@ -6,12 +6,12 @@ from unittest.mock import AsyncMock, patch
from hassil.recognize import Intent, IntentData, MatchEntity, RecognizeResult from hassil.recognize import Intent, IntentData, MatchEntity, RecognizeResult
import pytest import pytest
from homeassistant.components import conversation from homeassistant.components import conversation, cover
from homeassistant.components.conversation import default_agent from homeassistant.components.conversation import default_agent
from homeassistant.components.homeassistant.exposed_entities import ( from homeassistant.components.homeassistant.exposed_entities import (
async_get_assistant_settings, async_get_assistant_settings,
) )
from homeassistant.const import ATTR_FRIENDLY_NAME from homeassistant.const import ATTR_DEVICE_CLASS, ATTR_FRIENDLY_NAME, STATE_CLOSED
from homeassistant.core import DOMAIN as HASS_DOMAIN, Context, HomeAssistant from homeassistant.core import DOMAIN as HASS_DOMAIN, Context, HomeAssistant
from homeassistant.helpers import ( from homeassistant.helpers import (
area_registry as ar, area_registry as ar,
@ -607,14 +607,23 @@ async def test_error_no_domain_in_floor(
async def test_error_no_device_class(hass: HomeAssistant, init_components) -> None: async def test_error_no_device_class(hass: HomeAssistant, init_components) -> None:
"""Test error message when no entities of a device class exist.""" """Test error message when no entities of a device class exist."""
# Create a cover entity that is not a window.
# This ensures that the filtering below won't exit early because there are
# no entities in the cover domain.
hass.states.async_set(
"cover.garage_door",
STATE_CLOSED,
attributes={ATTR_DEVICE_CLASS: cover.CoverDeviceClass.GARAGE},
)
# We don't have a sentence for opening all windows # We don't have a sentence for opening all windows
cover_domain = MatchEntity(name="domain", value="cover", text="cover")
window_class = MatchEntity(name="device_class", value="window", text="windows") window_class = MatchEntity(name="device_class", value="window", text="windows")
recognize_result = RecognizeResult( recognize_result = RecognizeResult(
intent=Intent("HassTurnOn"), intent=Intent("HassTurnOn"),
intent_data=IntentData([]), intent_data=IntentData([]),
entities={"device_class": window_class}, entities={"domain": cover_domain, "device_class": window_class},
entities_list=[window_class], entities_list=[cover_domain, window_class],
) )
with patch( with patch(
@ -792,7 +801,9 @@ async def test_no_states_matched_default_error(
with patch( with patch(
"homeassistant.components.conversation.default_agent.intent.async_handle", "homeassistant.components.conversation.default_agent.intent.async_handle",
side_effect=intent.NoStatesMatchedError(), side_effect=intent.MatchFailedError(
intent.MatchTargetsResult(False), intent.MatchTargetsConstraints()
),
): ):
result = await conversation.async_converse( result = await conversation.async_converse(
hass, "turn on lights in the kitchen", None, Context(), None hass, "turn on lights in the kitchen", None, Context(), None
@ -863,17 +874,14 @@ async def test_empty_aliases(
assert slot_lists.keys() == {"area", "name", "floor"} assert slot_lists.keys() == {"area", "name", "floor"}
areas = slot_lists["area"] areas = slot_lists["area"]
assert len(areas.values) == 1 assert len(areas.values) == 1
assert areas.values[0].value_out == area_kitchen.id
assert areas.values[0].text_in.text == area_kitchen.normalized_name assert areas.values[0].text_in.text == area_kitchen.normalized_name
names = slot_lists["name"] names = slot_lists["name"]
assert len(names.values) == 1 assert len(names.values) == 1
assert names.values[0].value_out == kitchen_light.name
assert names.values[0].text_in.text == kitchen_light.name assert names.values[0].text_in.text == kitchen_light.name
floors = slot_lists["floor"] floors = slot_lists["floor"]
assert len(floors.values) == 1 assert len(floors.values) == 1
assert floors.values[0].value_out == floor_1.floor_id
assert floors.values[0].text_in.text == floor_1.name assert floors.values[0].text_in.text == floor_1.name

View file

@ -12,9 +12,17 @@ from homeassistant.components import (
) )
from homeassistant.components.cover import intent as cover_intent from homeassistant.components.cover import intent as cover_intent
from homeassistant.components.homeassistant.exposed_entities import async_expose_entity from homeassistant.components.homeassistant.exposed_entities import async_expose_entity
from homeassistant.components.media_player import intent as media_player_intent from homeassistant.components.media_player import (
MediaPlayerEntityFeature,
intent as media_player_intent,
)
from homeassistant.components.vacuum import intent as vaccum_intent from homeassistant.components.vacuum import intent as vaccum_intent
from homeassistant.const import STATE_CLOSED from homeassistant.const import (
ATTR_SUPPORTED_FEATURES,
STATE_CLOSED,
STATE_PAUSED,
STATE_PLAYING,
)
from homeassistant.core import Context, HomeAssistant from homeassistant.core import Context, HomeAssistant
from homeassistant.helpers import ( from homeassistant.helpers import (
area_registry as ar, area_registry as ar,
@ -189,7 +197,13 @@ async def test_media_player_intents(
await media_player_intent.async_setup_intents(hass) await media_player_intent.async_setup_intents(hass)
entity_id = f"{media_player.DOMAIN}.tv" entity_id = f"{media_player.DOMAIN}.tv"
hass.states.async_set(entity_id, media_player.STATE_PLAYING) attributes = {
ATTR_SUPPORTED_FEATURES: MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.VOLUME_SET
}
hass.states.async_set(entity_id, STATE_PLAYING, attributes=attributes)
async_expose_entity(hass, conversation.DOMAIN, entity_id, True) async_expose_entity(hass, conversation.DOMAIN, entity_id, True)
# pause # pause
@ -206,6 +220,9 @@ async def test_media_player_intents(
call = calls[0] call = calls[0]
assert call.data == {"entity_id": entity_id} assert call.data == {"entity_id": entity_id}
# Unpause requires paused state
hass.states.async_set(entity_id, STATE_PAUSED, attributes=attributes)
# unpause # unpause
calls = async_mock_service( calls = async_mock_service(
hass, media_player.DOMAIN, media_player.SERVICE_MEDIA_PLAY hass, media_player.DOMAIN, media_player.SERVICE_MEDIA_PLAY
@ -222,6 +239,9 @@ async def test_media_player_intents(
call = calls[0] call = calls[0]
assert call.data == {"entity_id": entity_id} assert call.data == {"entity_id": entity_id}
# Next track requires playing state
hass.states.async_set(entity_id, STATE_PLAYING, attributes=attributes)
# next # next
calls = async_mock_service( calls = async_mock_service(
hass, media_player.DOMAIN, media_player.SERVICE_MEDIA_NEXT_TRACK hass, media_player.DOMAIN, media_player.SERVICE_MEDIA_NEXT_TRACK

View file

@ -422,7 +422,7 @@ async def test_get_state_intent(
assert not result.matched_states and not result.unmatched_states assert not result.matched_states and not result.unmatched_states
# Test unknown area failure # Test unknown area failure
with pytest.raises(intent.IntentHandleError): with pytest.raises(intent.MatchFailedError):
await intent.async_handle( await intent.async_handle(
hass, hass,
"test", "test",

View file

@ -34,25 +34,6 @@ async def test_intent_set_color(hass: HomeAssistant) -> None:
assert call.data.get(light.ATTR_RGB_COLOR) == (0, 0, 255) assert call.data.get(light.ATTR_RGB_COLOR) == (0, 0, 255)
async def test_intent_set_color_tests_feature(hass: HomeAssistant) -> None:
"""Test the set color intent."""
hass.states.async_set("light.hello", "off")
calls = async_mock_service(hass, light.DOMAIN, light.SERVICE_TURN_ON)
await intent.async_setup_intents(hass)
response = await async_handle(
hass,
"test",
intent.INTENT_SET,
{"name": {"value": "Hello"}, "color": {"value": "blue"}},
)
# Response should contain one failed target
assert len(response.success_results) == 0
assert len(response.failed_results) == 1
assert len(calls) == 0
async def test_intent_set_color_and_brightness(hass: HomeAssistant) -> None: async def test_intent_set_color_and_brightness(hass: HomeAssistant) -> None:
"""Test the set color intent.""" """Test the set color intent."""
hass.states.async_set( hass.states.async_set(

View file

@ -1,5 +1,7 @@
"""The tests for the media_player platform.""" """The tests for the media_player platform."""
import pytest
from homeassistant.components.media_player import ( from homeassistant.components.media_player import (
DOMAIN, DOMAIN,
SERVICE_MEDIA_NEXT_TRACK, SERVICE_MEDIA_NEXT_TRACK,
@ -8,9 +10,20 @@ from homeassistant.components.media_player import (
SERVICE_VOLUME_SET, SERVICE_VOLUME_SET,
intent as media_player_intent, intent as media_player_intent,
) )
from homeassistant.const import STATE_IDLE from homeassistant.components.media_player.const import MediaPlayerEntityFeature
from homeassistant.const import (
ATTR_SUPPORTED_FEATURES,
STATE_IDLE,
STATE_PAUSED,
STATE_PLAYING,
)
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers import intent from homeassistant.helpers import (
area_registry as ar,
entity_registry as er,
floor_registry as fr,
intent,
)
from tests.common import async_mock_service from tests.common import async_mock_service
@ -20,14 +33,19 @@ async def test_pause_media_player_intent(hass: HomeAssistant) -> None:
await media_player_intent.async_setup_intents(hass) await media_player_intent.async_setup_intents(hass)
entity_id = f"{DOMAIN}.test_media_player" entity_id = f"{DOMAIN}.test_media_player"
hass.states.async_set(entity_id, STATE_IDLE) attributes = {ATTR_SUPPORTED_FEATURES: MediaPlayerEntityFeature.PAUSE}
calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_PAUSE)
hass.states.async_set(entity_id, STATE_PLAYING, attributes=attributes)
calls = async_mock_service(
hass,
DOMAIN,
SERVICE_MEDIA_PAUSE,
)
response = await intent.async_handle( response = await intent.async_handle(
hass, hass,
"test", "test",
media_player_intent.INTENT_MEDIA_PAUSE, media_player_intent.INTENT_MEDIA_PAUSE,
{"name": {"value": "test media player"}},
) )
await hass.async_block_till_done() await hass.async_block_till_done()
@ -38,20 +56,45 @@ async def test_pause_media_player_intent(hass: HomeAssistant) -> None:
assert call.service == SERVICE_MEDIA_PAUSE assert call.service == SERVICE_MEDIA_PAUSE
assert call.data == {"entity_id": entity_id} assert call.data == {"entity_id": entity_id}
# Test if not playing
hass.states.async_set(entity_id, STATE_IDLE, attributes=attributes)
with pytest.raises(intent.MatchFailedError):
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_PAUSE,
)
await hass.async_block_till_done()
# Test feature not supported
hass.states.async_set(
entity_id,
STATE_PLAYING,
attributes={ATTR_SUPPORTED_FEATURES: MediaPlayerEntityFeature(0)},
)
with pytest.raises(intent.MatchFailedError):
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_PAUSE,
)
await hass.async_block_till_done()
async def test_unpause_media_player_intent(hass: HomeAssistant) -> None: async def test_unpause_media_player_intent(hass: HomeAssistant) -> None:
"""Test HassMediaUnpause intent for media players.""" """Test HassMediaUnpause intent for media players."""
await media_player_intent.async_setup_intents(hass) await media_player_intent.async_setup_intents(hass)
entity_id = f"{DOMAIN}.test_media_player" entity_id = f"{DOMAIN}.test_media_player"
hass.states.async_set(entity_id, STATE_IDLE) hass.states.async_set(entity_id, STATE_PAUSED)
calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_PLAY) calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_PLAY)
response = await intent.async_handle( response = await intent.async_handle(
hass, hass,
"test", "test",
media_player_intent.INTENT_MEDIA_UNPAUSE, media_player_intent.INTENT_MEDIA_UNPAUSE,
{"name": {"value": "test media player"}},
) )
await hass.async_block_till_done() await hass.async_block_till_done()
@ -62,20 +105,36 @@ async def test_unpause_media_player_intent(hass: HomeAssistant) -> None:
assert call.service == SERVICE_MEDIA_PLAY assert call.service == SERVICE_MEDIA_PLAY
assert call.data == {"entity_id": entity_id} assert call.data == {"entity_id": entity_id}
# Test if not paused
hass.states.async_set(
entity_id,
STATE_PLAYING,
)
with pytest.raises(intent.MatchFailedError):
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_UNPAUSE,
)
await hass.async_block_till_done()
async def test_next_media_player_intent(hass: HomeAssistant) -> None: async def test_next_media_player_intent(hass: HomeAssistant) -> None:
"""Test HassMediaNext intent for media players.""" """Test HassMediaNext intent for media players."""
await media_player_intent.async_setup_intents(hass) await media_player_intent.async_setup_intents(hass)
entity_id = f"{DOMAIN}.test_media_player" entity_id = f"{DOMAIN}.test_media_player"
hass.states.async_set(entity_id, STATE_IDLE) attributes = {ATTR_SUPPORTED_FEATURES: MediaPlayerEntityFeature.NEXT_TRACK}
hass.states.async_set(entity_id, STATE_PLAYING, attributes=attributes)
calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_NEXT_TRACK) calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_NEXT_TRACK)
response = await intent.async_handle( response = await intent.async_handle(
hass, hass,
"test", "test",
media_player_intent.INTENT_MEDIA_NEXT, media_player_intent.INTENT_MEDIA_NEXT,
{"name": {"value": "test media player"}},
) )
await hass.async_block_till_done() await hass.async_block_till_done()
@ -86,20 +145,49 @@ async def test_next_media_player_intent(hass: HomeAssistant) -> None:
assert call.service == SERVICE_MEDIA_NEXT_TRACK assert call.service == SERVICE_MEDIA_NEXT_TRACK
assert call.data == {"entity_id": entity_id} assert call.data == {"entity_id": entity_id}
# Test if not playing
hass.states.async_set(entity_id, STATE_IDLE, attributes=attributes)
with pytest.raises(intent.MatchFailedError):
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_NEXT,
)
await hass.async_block_till_done()
# Test feature not supported
hass.states.async_set(
entity_id,
STATE_PLAYING,
attributes={ATTR_SUPPORTED_FEATURES: MediaPlayerEntityFeature(0)},
)
with pytest.raises(intent.MatchFailedError):
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_NEXT,
{"name": {"value": "test media player"}},
)
await hass.async_block_till_done()
async def test_volume_media_player_intent(hass: HomeAssistant) -> None: async def test_volume_media_player_intent(hass: HomeAssistant) -> None:
"""Test HassSetVolume intent for media players.""" """Test HassSetVolume intent for media players."""
await media_player_intent.async_setup_intents(hass) await media_player_intent.async_setup_intents(hass)
entity_id = f"{DOMAIN}.test_media_player" entity_id = f"{DOMAIN}.test_media_player"
hass.states.async_set(entity_id, STATE_IDLE) attributes = {ATTR_SUPPORTED_FEATURES: MediaPlayerEntityFeature.VOLUME_SET}
hass.states.async_set(entity_id, STATE_PLAYING, attributes=attributes)
calls = async_mock_service(hass, DOMAIN, SERVICE_VOLUME_SET) calls = async_mock_service(hass, DOMAIN, SERVICE_VOLUME_SET)
response = await intent.async_handle( response = await intent.async_handle(
hass, hass,
"test", "test",
media_player_intent.INTENT_SET_VOLUME, media_player_intent.INTENT_SET_VOLUME,
{"name": {"value": "test media player"}, "volume_level": {"value": 50}}, {"volume_level": {"value": 50}},
) )
await hass.async_block_till_done() await hass.async_block_till_done()
@ -109,3 +197,321 @@ async def test_volume_media_player_intent(hass: HomeAssistant) -> None:
assert call.domain == DOMAIN assert call.domain == DOMAIN
assert call.service == SERVICE_VOLUME_SET assert call.service == SERVICE_VOLUME_SET
assert call.data == {"entity_id": entity_id, "volume_level": 0.5} assert call.data == {"entity_id": entity_id, "volume_level": 0.5}
# Test if not playing
hass.states.async_set(entity_id, STATE_IDLE, attributes=attributes)
with pytest.raises(intent.MatchFailedError):
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_SET_VOLUME,
{"volume_level": {"value": 50}},
)
await hass.async_block_till_done()
# Test feature not supported
hass.states.async_set(
entity_id,
STATE_PLAYING,
attributes={ATTR_SUPPORTED_FEATURES: MediaPlayerEntityFeature(0)},
)
with pytest.raises(intent.MatchFailedError):
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_SET_VOLUME,
{"volume_level": {"value": 50}},
)
await hass.async_block_till_done()
async def test_multiple_media_players(
hass: HomeAssistant,
area_registry: ar.AreaRegistry,
entity_registry: er.EntityRegistry,
floor_registry: fr.FloorRegistry,
) -> None:
"""Test HassMedia* intents with multiple media players."""
await media_player_intent.async_setup_intents(hass)
attributes = {
ATTR_SUPPORTED_FEATURES: MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.VOLUME_SET
}
# House layout
# Floor 1 (ground):
# - Kitchen
# - Smart speaker
# - Living room
# - TV
# - Smart speaker
# Floor 2 (upstairs):
# - Bedroom
# - TV
# - Smart speaker
# - Bathroom
# - Smart speaker
# Floor 1
floor_1 = floor_registry.async_create("first floor", aliases={"ground"})
area_kitchen = area_registry.async_get_or_create("kitchen")
area_kitchen = area_registry.async_update(
area_kitchen.id, floor_id=floor_1.floor_id
)
area_living_room = area_registry.async_get_or_create("living room")
area_living_room = area_registry.async_update(
area_living_room.id, floor_id=floor_1.floor_id
)
kitchen_smart_speaker = entity_registry.async_get_or_create(
"media_player", "test", "kitchen_smart_speaker"
)
kitchen_smart_speaker = entity_registry.async_update_entity(
kitchen_smart_speaker.entity_id, name="smart speaker", area_id=area_kitchen.id
)
hass.states.async_set(
kitchen_smart_speaker.entity_id, STATE_PAUSED, attributes=attributes
)
living_room_smart_speaker = entity_registry.async_get_or_create(
"media_player", "test", "living_room_smart_speaker"
)
living_room_smart_speaker = entity_registry.async_update_entity(
living_room_smart_speaker.entity_id,
name="smart speaker",
area_id=area_living_room.id,
)
hass.states.async_set(
living_room_smart_speaker.entity_id, STATE_PAUSED, attributes=attributes
)
living_room_tv = entity_registry.async_get_or_create(
"media_player", "test", "living_room_tv"
)
living_room_tv = entity_registry.async_update_entity(
living_room_tv.entity_id, name="TV", area_id=area_living_room.id
)
hass.states.async_set(
living_room_tv.entity_id, STATE_PLAYING, attributes=attributes
)
# Floor 2
floor_2 = floor_registry.async_create("second floor", aliases={"upstairs"})
area_bedroom = area_registry.async_get_or_create("bedroom")
area_bedroom = area_registry.async_update(
area_bedroom.id, floor_id=floor_2.floor_id
)
area_bathroom = area_registry.async_get_or_create("bathroom")
area_bathroom = area_registry.async_update(
area_bathroom.id, floor_id=floor_2.floor_id
)
bedroom_tv = entity_registry.async_get_or_create(
"media_player", "test", "bedroom_tv"
)
bedroom_tv = entity_registry.async_update_entity(
bedroom_tv.entity_id, name="TV", area_id=area_bedroom.id
)
hass.states.async_set(bedroom_tv.entity_id, STATE_PLAYING, attributes=attributes)
bedroom_smart_speaker = entity_registry.async_get_or_create(
"media_player", "test", "bedroom_smart_speaker"
)
bedroom_smart_speaker = entity_registry.async_update_entity(
bedroom_smart_speaker.entity_id, name="smart speaker", area_id=area_bedroom.id
)
hass.states.async_set(
bedroom_smart_speaker.entity_id, STATE_PAUSED, attributes=attributes
)
bathroom_smart_speaker = entity_registry.async_get_or_create(
"media_player", "test", "bathroom_smart_speaker"
)
bathroom_smart_speaker = entity_registry.async_update_entity(
bathroom_smart_speaker.entity_id, name="smart speaker", area_id=area_bathroom.id
)
hass.states.async_set(
bathroom_smart_speaker.entity_id, STATE_PAUSED, attributes=attributes
)
# -----
# There are multiple TV's currently playing
with pytest.raises(intent.MatchFailedError):
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_PAUSE,
{"name": {"value": "TV"}},
)
await hass.async_block_till_done()
# Pause the upstairs TV
calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_PAUSE)
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_PAUSE,
{"name": {"value": "TV"}, "floor": {"value": "upstairs"}},
)
await hass.async_block_till_done()
assert response.response_type == intent.IntentResponseType.ACTION_DONE
assert len(calls) == 1
assert calls[0].data == {"entity_id": bedroom_tv.entity_id}
hass.states.async_set(bedroom_tv.entity_id, STATE_PAUSED, attributes=attributes)
# Now we can pause the only playing TV (living room)
calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_PAUSE)
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_PAUSE,
{"name": {"value": "TV"}},
)
await hass.async_block_till_done()
assert response.response_type == intent.IntentResponseType.ACTION_DONE
assert len(calls) == 1
assert calls[0].data == {"entity_id": living_room_tv.entity_id}
hass.states.async_set(living_room_tv.entity_id, STATE_PAUSED, attributes=attributes)
# Unpause the kitchen smart speaker (explicit area)
calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_PLAY)
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_UNPAUSE,
{"name": {"value": "smart speaker"}, "area": {"value": "kitchen"}},
)
await hass.async_block_till_done()
assert response.response_type == intent.IntentResponseType.ACTION_DONE
assert len(calls) == 1
assert calls[0].data == {"entity_id": kitchen_smart_speaker.entity_id}
hass.states.async_set(
kitchen_smart_speaker.entity_id, STATE_PLAYING, attributes=attributes
)
# Unpause living room smart speaker (context area)
calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_PLAY)
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_UNPAUSE,
{
"name": {"value": "smart speaker"},
"preferred_area_id": {"value": area_living_room.id},
},
)
await hass.async_block_till_done()
assert response.response_type == intent.IntentResponseType.ACTION_DONE
assert len(calls) == 1
assert calls[0].data == {"entity_id": living_room_smart_speaker.entity_id}
hass.states.async_set(
living_room_smart_speaker.entity_id, STATE_PLAYING, attributes=attributes
)
# Unpause all of the upstairs media players
calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_PLAY)
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_UNPAUSE,
{"floor": {"value": "upstairs"}},
)
await hass.async_block_till_done()
assert response.response_type == intent.IntentResponseType.ACTION_DONE
assert len(calls) == 3
assert {call.data["entity_id"] for call in calls} == {
bedroom_tv.entity_id,
bedroom_smart_speaker.entity_id,
bathroom_smart_speaker.entity_id,
}
for entity in (bedroom_tv, bedroom_smart_speaker, bathroom_smart_speaker):
hass.states.async_set(entity.entity_id, STATE_PLAYING, attributes=attributes)
# Pause bedroom TV (context floor)
calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_PAUSE)
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_PAUSE,
{
"name": {"value": "TV"},
"preferred_floor_id": {"value": floor_2.floor_id},
},
)
await hass.async_block_till_done()
assert response.response_type == intent.IntentResponseType.ACTION_DONE
assert len(calls) == 1
assert calls[0].data == {"entity_id": bedroom_tv.entity_id}
hass.states.async_set(bedroom_tv.entity_id, STATE_PAUSED, attributes=attributes)
# Set volume in the bathroom
calls = async_mock_service(hass, DOMAIN, SERVICE_VOLUME_SET)
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_SET_VOLUME,
{"area": {"value": "bathroom"}, "volume_level": {"value": 50}},
)
await hass.async_block_till_done()
assert response.response_type == intent.IntentResponseType.ACTION_DONE
assert len(calls) == 1
assert calls[0].data == {
"entity_id": bathroom_smart_speaker.entity_id,
"volume_level": 0.5,
}
# Next track in the kitchen (only media player that is playing on ground floor)
hass.states.async_set(
living_room_smart_speaker.entity_id, STATE_PAUSED, attributes=attributes
)
calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_NEXT_TRACK)
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_NEXT,
{"floor": {"value": "ground"}},
)
await hass.async_block_till_done()
assert response.response_type == intent.IntentResponseType.ACTION_DONE
assert len(calls) == 1
assert calls[0].data == {"entity_id": kitchen_smart_speaker.entity_id}
# Pause the kitchen smart speaker (all ground floor media players are now paused)
calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_PAUSE)
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_PAUSE,
{"area": {"value": "kitchen"}},
)
await hass.async_block_till_done()
assert response.response_type == intent.IntentResponseType.ACTION_DONE
assert len(calls) == 1
assert calls[0].data == {"entity_id": kitchen_smart_speaker.entity_id}
hass.states.async_set(
kitchen_smart_speaker.entity_id, STATE_PAUSED, attributes=attributes
)
# Unpause with no context (only kitchen should be resumed)
calls = async_mock_service(hass, DOMAIN, SERVICE_MEDIA_PLAY)
response = await intent.async_handle(
hass,
"test",
media_player_intent.INTENT_MEDIA_UNPAUSE,
)
await hass.async_block_till_done()
assert response.response_type == intent.IntentResponseType.ACTION_DONE
assert len(calls) == 1
assert calls[0].data == {"entity_id": kitchen_smart_speaker.entity_id}
hass.states.async_set(
kitchen_smart_speaker.entity_id, STATE_PLAYING, attributes=attributes
)

View file

@ -6,9 +6,13 @@ from unittest.mock import MagicMock, patch
import pytest import pytest
import voluptuous as vol import voluptuous as vol
from homeassistant.components import conversation from homeassistant.components import conversation, light, switch
from homeassistant.components.switch import SwitchDeviceClass from homeassistant.components.homeassistant.exposed_entities import async_expose_entity
from homeassistant.const import ATTR_FRIENDLY_NAME from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_FRIENDLY_NAME,
ATTR_SUPPORTED_FEATURES,
)
from homeassistant.core import Context, HomeAssistant, State from homeassistant.core import Context, HomeAssistant, State
from homeassistant.helpers import ( from homeassistant.helpers import (
area_registry as ar, area_registry as ar,
@ -20,13 +24,13 @@ from homeassistant.helpers import (
) )
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry from tests.common import MockConfigEntry, async_mock_service
class MockIntentHandler(intent.IntentHandler): class MockIntentHandler(intent.IntentHandler):
"""Provide a mock intent handler.""" """Provide a mock intent handler."""
def __init__(self, slot_schema): def __init__(self, slot_schema) -> None:
"""Initialize the mock handler.""" """Initialize the mock handler."""
self.slot_schema = slot_schema self.slot_schema = slot_schema
@ -73,7 +77,7 @@ async def test_async_match_states(
entity_registry.async_update_entity( entity_registry.async_update_entity(
state2.entity_id, state2.entity_id,
area_id=area_bedroom.id, area_id=area_bedroom.id,
device_class=SwitchDeviceClass.OUTLET, device_class=switch.SwitchDeviceClass.OUTLET,
aliases={"kill switch"}, aliases={"kill switch"},
) )
@ -126,7 +130,7 @@ async def test_async_match_states(
assert list( assert list(
intent.async_match_states( intent.async_match_states(
hass, hass,
device_classes={SwitchDeviceClass.OUTLET}, device_classes={switch.SwitchDeviceClass.OUTLET},
area_name="bedroom", area_name="bedroom",
states=[state1, state2], states=[state1, state2],
) )
@ -162,6 +166,346 @@ async def test_async_match_states(
) )
async def test_async_match_targets(
hass: HomeAssistant,
area_registry: ar.AreaRegistry,
entity_registry: er.EntityRegistry,
floor_registry: fr.FloorRegistry,
device_registry: dr.DeviceRegistry,
) -> None:
"""Tests for async_match_targets function."""
# Needed for exposure
assert await async_setup_component(hass, "homeassistant", {})
# House layout
# Floor 1 (ground):
# - Kitchen
# - Outlet
# - Bathroom
# - Light
# Floor 2 (upstairs)
# - Bedroom
# - Switch
# - Bathroom
# - Light
# Floor 3 (also upstairs)
# - Bedroom
# - Switch
# - Bathroom
# - Light
# Floor 1
floor_1 = floor_registry.async_create("first floor", aliases={"ground"})
area_kitchen = area_registry.async_get_or_create("kitchen")
area_kitchen = area_registry.async_update(
area_kitchen.id, floor_id=floor_1.floor_id
)
area_bathroom_1 = area_registry.async_get_or_create("first floor bathroom")
area_bathroom_1 = area_registry.async_update(
area_bathroom_1.id, aliases={"bathroom"}, floor_id=floor_1.floor_id
)
kitchen_outlet = entity_registry.async_get_or_create(
"switch", "test", "kitchen_outlet"
)
kitchen_outlet = entity_registry.async_update_entity(
kitchen_outlet.entity_id,
name="kitchen outlet",
device_class=switch.SwitchDeviceClass.OUTLET,
area_id=area_kitchen.id,
)
state_kitchen_outlet = State(kitchen_outlet.entity_id, "on")
bathroom_light_1 = entity_registry.async_get_or_create(
"light", "test", "bathroom_light_1"
)
bathroom_light_1 = entity_registry.async_update_entity(
bathroom_light_1.entity_id,
name="bathroom light",
aliases={"overhead light"},
area_id=area_bathroom_1.id,
)
state_bathroom_light_1 = State(bathroom_light_1.entity_id, "off")
# Floor 2
floor_2 = floor_registry.async_create("second floor", aliases={"upstairs"})
area_bedroom_2 = area_registry.async_get_or_create("bedroom")
area_bedroom_2 = area_registry.async_update(
area_bedroom_2.id, floor_id=floor_2.floor_id
)
area_bathroom_2 = area_registry.async_get_or_create("second floor bathroom")
area_bathroom_2 = area_registry.async_update(
area_bathroom_2.id, aliases={"bathroom"}, floor_id=floor_2.floor_id
)
bedroom_switch_2 = entity_registry.async_get_or_create(
"switch", "test", "bedroom_switch_2"
)
bedroom_switch_2 = entity_registry.async_update_entity(
bedroom_switch_2.entity_id,
name="second floor bedroom switch",
area_id=area_bedroom_2.id,
)
state_bedroom_switch_2 = State(
bedroom_switch_2.entity_id,
"off",
)
bathroom_light_2 = entity_registry.async_get_or_create(
"light", "test", "bathroom_light_2"
)
bathroom_light_2 = entity_registry.async_update_entity(
bathroom_light_2.entity_id,
aliases={"bathroom light", "overhead light"},
area_id=area_bathroom_2.id,
supported_features=light.LightEntityFeature.EFFECT,
)
state_bathroom_light_2 = State(bathroom_light_2.entity_id, "off")
# Floor 3
floor_3 = floor_registry.async_create("third floor", aliases={"upstairs"})
area_bedroom_3 = area_registry.async_get_or_create("bedroom")
area_bedroom_3 = area_registry.async_update(
area_bedroom_3.id, floor_id=floor_3.floor_id
)
area_bathroom_3 = area_registry.async_get_or_create("third floor bathroom")
area_bathroom_3 = area_registry.async_update(
area_bathroom_3.id, aliases={"bathroom"}, floor_id=floor_3.floor_id
)
bedroom_switch_3 = entity_registry.async_get_or_create(
"switch", "test", "bedroom_switch_3"
)
bedroom_switch_3 = entity_registry.async_update_entity(
bedroom_switch_3.entity_id,
name="third floor bedroom switch",
area_id=area_bedroom_3.id,
)
state_bedroom_switch_3 = State(
bedroom_switch_3.entity_id,
"off",
attributes={ATTR_DEVICE_CLASS: switch.SwitchDeviceClass.OUTLET},
)
bathroom_light_3 = entity_registry.async_get_or_create(
"light", "test", "bathroom_light_3"
)
bathroom_light_3 = entity_registry.async_update_entity(
bathroom_light_3.entity_id,
name="overhead light",
area_id=area_bathroom_3.id,
)
state_bathroom_light_3 = State(
bathroom_light_3.entity_id,
"on",
attributes={
ATTR_FRIENDLY_NAME: "bathroom light",
ATTR_SUPPORTED_FEATURES: light.LightEntityFeature.EFFECT,
},
)
# -----
bathroom_light_states = [
state_bathroom_light_1,
state_bathroom_light_2,
state_bathroom_light_3,
]
states = [
*bathroom_light_states,
state_kitchen_outlet,
state_bedroom_switch_2,
state_bedroom_switch_3,
]
# Not a unique name
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(name="bathroom light"),
states=states,
)
assert not result.is_match
assert result.no_match_reason == intent.MatchFailedReason.DUPLICATE_NAME
assert result.no_match_name == "bathroom light"
# Works with duplicate names allowed
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(
name="bathroom light", allow_duplicate_names=True
),
states=states,
)
assert result.is_match
assert {s.entity_id for s in result.states} == {
s.entity_id for s in bathroom_light_states
}
# Also works when name is not a constraint
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(domains={"light"}),
states=states,
)
assert result.is_match
assert {s.entity_id for s in result.states} == {
s.entity_id for s in bathroom_light_states
}
# We can disambiguate by preferred floor (from context)
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(name="bathroom light"),
intent.MatchTargetsPreferences(floor_id=floor_3.floor_id),
states=states,
)
assert result.is_match
assert len(result.states) == 1
assert result.states[0].entity_id == bathroom_light_3.entity_id
# Also disambiguate by preferred area (from context)
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(name="bathroom light"),
intent.MatchTargetsPreferences(area_id=area_bathroom_2.id),
states=states,
)
assert result.is_match
assert len(result.states) == 1
assert result.states[0].entity_id == bathroom_light_2.entity_id
# Disambiguate by floor name, if unique
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(name="bathroom light", floor_name="ground"),
states=states,
)
assert result.is_match
assert len(result.states) == 1
assert result.states[0].entity_id == bathroom_light_1.entity_id
# Doesn't work if floor name/alias is not unique
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(name="bathroom light", floor_name="upstairs"),
states=states,
)
assert not result.is_match
assert result.no_match_reason == intent.MatchFailedReason.DUPLICATE_NAME
# Disambiguate by area name, if unique
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(
name="bathroom light", area_name="first floor bathroom"
),
states=states,
)
assert result.is_match
assert len(result.states) == 1
assert result.states[0].entity_id == bathroom_light_1.entity_id
# Doesn't work if area name/alias is not unique
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(name="bathroom light", area_name="bathroom"),
states=states,
)
assert not result.is_match
assert result.no_match_reason == intent.MatchFailedReason.DUPLICATE_NAME
# Does work if floor/area name combo is unique
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(
name="bathroom light", area_name="bathroom", floor_name="ground"
),
states=states,
)
assert result.is_match
assert len(result.states) == 1
assert result.states[0].entity_id == bathroom_light_1.entity_id
# Doesn't work if area is not part of the floor
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(
name="bathroom light",
area_name="second floor bathroom",
floor_name="ground",
),
states=states,
)
assert not result.is_match
assert result.no_match_reason == intent.MatchFailedReason.AREA
# Check state constraint (only third floor bathroom light is on)
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(domains={"light"}, states={"on"}),
states=states,
)
assert result.is_match
assert len(result.states) == 1
assert result.states[0].entity_id == bathroom_light_3.entity_id
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(
domains={"light"}, states={"on"}, floor_name="ground"
),
states=states,
)
assert not result.is_match
# Check assistant constraint (exposure)
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(assistant="test"),
states=states,
)
assert not result.is_match
async_expose_entity(hass, "test", bathroom_light_1.entity_id, True)
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(assistant="test"),
states=states,
)
assert result.is_match
assert len(result.states) == 1
assert result.states[0].entity_id == bathroom_light_1.entity_id
# Check device class constraint
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(
domains={"switch"}, device_classes={switch.SwitchDeviceClass.OUTLET}
),
states=states,
)
assert result.is_match
assert len(result.states) == 2
assert {s.entity_id for s in result.states} == {
kitchen_outlet.entity_id,
bedroom_switch_3.entity_id,
}
# Check features constraint (second and third floor bathroom lights have effects)
result = intent.async_match_targets(
hass,
intent.MatchTargetsConstraints(
domains={"light"}, features=light.LightEntityFeature.EFFECT
),
states=states,
)
assert result.is_match
assert len(result.states) == 2
assert {s.entity_id for s in result.states} == {
bathroom_light_2.entity_id,
bathroom_light_3.entity_id,
}
async def test_match_device_area( async def test_match_device_area(
hass: HomeAssistant, hass: HomeAssistant,
area_registry: ar.AreaRegistry, area_registry: ar.AreaRegistry,
@ -353,24 +697,72 @@ async def test_validate_then_run_in_background(hass: HomeAssistant) -> None:
async def test_invalid_area_floor_names(hass: HomeAssistant) -> None: async def test_invalid_area_floor_names(hass: HomeAssistant) -> None:
"""Test that we throw an intent handle error with invalid area/floor names.""" """Test that we throw an appropriate errors with invalid area/floor names."""
handler = intent.ServiceIntentHandler( handler = intent.ServiceIntentHandler(
"TestType", "light", "turn_on", "Turned {} on" "TestType", "light", "turn_on", "Turned {} on"
) )
intent.async_register(hass, handler) intent.async_register(hass, handler)
with pytest.raises(intent.IntentHandleError): with pytest.raises(intent.MatchFailedError) as err:
await intent.async_handle( await intent.async_handle(
hass, hass,
"test", "test",
"TestType", "TestType",
slots={"area": {"value": "invalid area"}}, slots={"area": {"value": "invalid area"}},
) )
assert err.value.result.no_match_reason == intent.MatchFailedReason.INVALID_AREA
with pytest.raises(intent.IntentHandleError): with pytest.raises(intent.MatchFailedError) as err:
await intent.async_handle( await intent.async_handle(
hass, hass,
"test", "test",
"TestType", "TestType",
slots={"floor": {"value": "invalid floor"}}, slots={"floor": {"value": "invalid floor"}},
) )
assert (
err.value.result.no_match_reason == intent.MatchFailedReason.INVALID_FLOOR
)
async def test_service_intent_handler_required_domains(hass: HomeAssistant) -> None:
"""Test that required_domains restricts the domain of a ServiceIntentHandler."""
hass.states.async_set("light.kitchen", "off")
hass.states.async_set("switch.bedroom", "off")
calls = async_mock_service(hass, "homeassistant", "turn_on")
handler = intent.ServiceIntentHandler(
"TestType",
"homeassistant",
"turn_on",
"Turned {} on",
required_domains={"light"},
)
intent.async_register(hass, handler)
# Should work fine
result = await intent.async_handle(
hass,
"test",
"TestType",
slots={"name": {"value": "kitchen"}, "domain": {"value": "light"}},
)
assert result.response_type == intent.IntentResponseType.ACTION_DONE
assert len(calls) == 1
# Fails because the intent handler is restricted to lights only
with pytest.raises(intent.MatchFailedError):
await intent.async_handle(
hass,
"test",
"TestType",
slots={"name": {"value": "bedroom"}},
)
# Still fails even if we provide the domain
with pytest.raises(intent.MatchFailedError):
await intent.async_handle(
hass,
"test",
"TestType",
slots={"name": {"value": "bedroom"}, "domain": {"value": "switch"}},
)