diff --git a/homeassistant/components/zha/api.py b/homeassistant/components/zha/api.py index 6cbcdf50983..c68136c23da 100644 --- a/homeassistant/components/zha/api.py +++ b/homeassistant/components/zha/api.py @@ -3,7 +3,7 @@ from __future__ import annotations import asyncio import logging -from typing import TYPE_CHECKING, Any, NamedTuple +from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar, cast import voluptuous as vol import zigpy.backups @@ -31,6 +31,7 @@ from .core.const import ( ATTR_LEVEL, ATTR_MANUFACTURER, ATTR_MEMBERS, + ATTR_PARAMS, ATTR_TYPE, ATTR_VALUE, ATTR_WARNING_DEVICE_DURATION, @@ -69,6 +70,7 @@ from .core.group import GroupMember from .core.helpers import ( async_cluster_exists, async_is_bindable_target, + cluster_command_schema_to_vol_schema, convert_install_code, get_matched_clusters, qr_to_install_code, @@ -110,6 +112,17 @@ IEEE_SERVICE = "ieee_based_service" IEEE_SCHEMA = vol.All(cv.string, EUI64.convert) +# typing typevar +_T = TypeVar("_T") + + +def _ensure_list_if_present(value: _T | None) -> list[_T] | list[Any] | None: + """Wrap value in list if it is provided and not one.""" + if value is None: + return None + return cast("list[_T]", value) if isinstance(value, list) else [value] + + SERVICE_PERMIT_PARAMS = { vol.Optional(ATTR_IEEE): IEEE_SCHEMA, vol.Optional(ATTR_DURATION, default=60): vol.All( @@ -181,17 +194,22 @@ SERVICE_SCHEMAS = { ): cv.positive_int, } ), - SERVICE_ISSUE_ZIGBEE_CLUSTER_COMMAND: vol.Schema( - { - vol.Required(ATTR_IEEE): IEEE_SCHEMA, - vol.Required(ATTR_ENDPOINT_ID): cv.positive_int, - vol.Required(ATTR_CLUSTER_ID): cv.positive_int, - vol.Optional(ATTR_CLUSTER_TYPE, default=CLUSTER_TYPE_IN): cv.string, - vol.Required(ATTR_COMMAND): cv.positive_int, - vol.Required(ATTR_COMMAND_TYPE): cv.string, - vol.Optional(ATTR_ARGS, default=[]): cv.ensure_list, - vol.Optional(ATTR_MANUFACTURER): cv.positive_int, - } + SERVICE_ISSUE_ZIGBEE_CLUSTER_COMMAND: vol.All( + vol.Schema( + { + vol.Required(ATTR_IEEE): IEEE_SCHEMA, + vol.Required(ATTR_ENDPOINT_ID): cv.positive_int, + vol.Required(ATTR_CLUSTER_ID): cv.positive_int, + vol.Optional(ATTR_CLUSTER_TYPE, default=CLUSTER_TYPE_IN): cv.string, + vol.Required(ATTR_COMMAND): cv.positive_int, + vol.Required(ATTR_COMMAND_TYPE): cv.string, + vol.Exclusive(ATTR_ARGS, "attrs_params"): _ensure_list_if_present, + vol.Exclusive(ATTR_PARAMS, "attrs_params"): dict, + vol.Optional(ATTR_MANUFACTURER): cv.positive_int, + } + ), + cv.deprecated(ATTR_ARGS), + cv.has_at_least_one_key(ATTR_ARGS, ATTR_PARAMS), ), SERVICE_ISSUE_ZIGBEE_GROUP_COMMAND: vol.Schema( { @@ -711,6 +729,8 @@ async def websocket_device_cluster_commands( hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] ) -> None: """Return a list of cluster commands.""" + import voluptuous_serialize # pylint: disable=import-outside-toplevel + zha_gateway: ZHAGateway = hass.data[DATA_ZHA][DATA_ZHA_GATEWAY] ieee: EUI64 = msg[ATTR_IEEE] endpoint_id: int = msg[ATTR_ENDPOINT_ID] @@ -731,6 +751,10 @@ async def websocket_device_cluster_commands( TYPE: CLIENT, ID: cmd_id, ATTR_NAME: cmd.name, + "schema": voluptuous_serialize.convert( + cluster_command_schema_to_vol_schema(cmd.schema), + custom_serializer=cv.custom_serializer, + ), } ) for cmd_id, cmd in commands[CLUSTER_COMMANDS_SERVER].items(): @@ -739,6 +763,10 @@ async def websocket_device_cluster_commands( TYPE: CLUSTER_COMMAND_SERVER, ID: cmd_id, ATTR_NAME: cmd.name, + "schema": voluptuous_serialize.convert( + cluster_command_schema_to_vol_schema(cmd.schema), + custom_serializer=cv.custom_serializer, + ), } ) _LOGGER.debug( @@ -1285,41 +1313,45 @@ def async_load_api(hass: HomeAssistant) -> None: cluster_type: str = service.data[ATTR_CLUSTER_TYPE] command: int = service.data[ATTR_COMMAND] command_type: str = service.data[ATTR_COMMAND_TYPE] - args: list = service.data[ATTR_ARGS] + args: list | None = service.data.get(ATTR_ARGS) + params: dict | None = service.data.get(ATTR_PARAMS) manufacturer: int | None = service.data.get(ATTR_MANUFACTURER) zha_device = zha_gateway.get_device(ieee) - response = None if zha_device is not None: if cluster_id >= MFG_CLUSTER_ID_START and manufacturer is None: manufacturer = zha_device.manufacturer_code - response = await zha_device.issue_cluster_command( + + await zha_device.issue_cluster_command( endpoint_id, cluster_id, command, command_type, - *args, + args, + params, cluster_type=cluster_type, manufacturer=manufacturer, ) - _LOGGER.debug( - "Issued command for: %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: %s %s: [%s] %s: %s", - ATTR_CLUSTER_ID, - cluster_id, - ATTR_CLUSTER_TYPE, - cluster_type, - ATTR_ENDPOINT_ID, - endpoint_id, - ATTR_COMMAND, - command, - ATTR_COMMAND_TYPE, - command_type, - ATTR_ARGS, - args, - ATTR_MANUFACTURER, - manufacturer, - RESPONSE, - response, - ) + _LOGGER.debug( + "Issued command for: %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s]", + ATTR_CLUSTER_ID, + cluster_id, + ATTR_CLUSTER_TYPE, + cluster_type, + ATTR_ENDPOINT_ID, + endpoint_id, + ATTR_COMMAND, + command, + ATTR_COMMAND_TYPE, + command_type, + ATTR_ARGS, + args, + ATTR_PARAMS, + params, + ATTR_MANUFACTURER, + manufacturer, + ) + else: + raise ValueError(f"Device with IEEE {str(ieee)} not found") async_register_admin_service( hass, diff --git a/homeassistant/components/zha/core/device.py b/homeassistant/components/zha/core/device.py index a0a4521e19d..5eb436cbe53 100644 --- a/homeassistant/components/zha/core/device.py +++ b/homeassistant/components/zha/core/device.py @@ -17,11 +17,14 @@ import zigpy.exceptions from zigpy.profiles import PROFILES import zigpy.quirks from zigpy.types.named import EUI64, NWK +from zigpy.zcl.clusters import Cluster from zigpy.zcl.clusters.general import Groups, Identify +from zigpy.zcl.foundation import Status as ZclStatus, ZCLCommandDef import zigpy.zdo.types as zdo_types from homeassistant.const import ATTR_COMMAND, ATTR_NAME from homeassistant.core import HomeAssistant, callback +from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers.dispatcher import ( async_dispatcher_connect, async_dispatcher_send, @@ -35,6 +38,7 @@ from .const import ( ATTR_ATTRIBUTE, ATTR_AVAILABLE, ATTR_CLUSTER_ID, + ATTR_CLUSTER_TYPE, ATTR_COMMAND_TYPE, ATTR_DEVICE_TYPE, ATTR_ENDPOINT_ID, @@ -49,6 +53,7 @@ from .const import ( ATTR_NEIGHBORS, ATTR_NODE_DESCRIPTOR, ATTR_NWK, + ATTR_PARAMS, ATTR_POWER_SOURCE, ATTR_QUIRK_APPLIED, ATTR_QUIRK_CLASS, @@ -74,7 +79,7 @@ from .const import ( UNKNOWN_MODEL, ZHA_OPTIONS, ) -from .helpers import LogMixin, async_get_zha_config_value +from .helpers import LogMixin, async_get_zha_config_value, convert_to_zcl_values if TYPE_CHECKING: from ..api import ClusterBinding @@ -558,7 +563,7 @@ class ZHADevice(LogMixin): return device_info @callback - def async_get_clusters(self): + def async_get_clusters(self) -> dict[int, dict[str, dict[int, Cluster]]]: """Get all clusters for this device.""" return { ep_id: { @@ -592,9 +597,11 @@ class ZHADevice(LogMixin): } @callback - def async_get_cluster(self, endpoint_id, cluster_id, cluster_type=CLUSTER_TYPE_IN): + def async_get_cluster( + self, endpoint_id: int, cluster_id: int, cluster_type: str = CLUSTER_TYPE_IN + ) -> Cluster: """Get zigbee cluster from this entity.""" - clusters = self.async_get_clusters() + clusters: dict[int, dict[str, dict[int, Cluster]]] = self.async_get_clusters() return clusters[endpoint_id][cluster_type][cluster_id] @callback @@ -660,36 +667,62 @@ class ZHADevice(LogMixin): async def issue_cluster_command( self, - endpoint_id, - cluster_id, - command, - command_type, - *args, - cluster_type=CLUSTER_TYPE_IN, - manufacturer=None, - ): - """Issue a command against specified zigbee cluster on this entity.""" - cluster = self.async_get_cluster(endpoint_id, cluster_id, cluster_type) - if cluster is None: - return None - if command_type == CLUSTER_COMMAND_SERVER: - response = await cluster.command( - command, *args, manufacturer=manufacturer, expect_reply=True + endpoint_id: int, + cluster_id: int, + command: int, + command_type: str, + args: list | None, + params: dict[str, Any] | None, + cluster_type: str = CLUSTER_TYPE_IN, + manufacturer: int | None = None, + ) -> None: + """Issue a command against specified zigbee cluster on this device.""" + try: + cluster: Cluster = self.async_get_cluster( + endpoint_id, cluster_id, cluster_type ) - else: - response = await cluster.client_command(command, *args) - - self.debug( - "Issued cluster command: %s %s %s %s %s %s %s", - f"{ATTR_CLUSTER_ID}: {cluster_id}", - f"{ATTR_COMMAND}: {command}", - f"{ATTR_COMMAND_TYPE}: {command_type}", - f"{ATTR_ARGS}: {args}", - f"{ATTR_CLUSTER_ID}: {cluster_type}", - f"{ATTR_MANUFACTURER}: {manufacturer}", - f"{ATTR_ENDPOINT_ID}: {endpoint_id}", + except KeyError as exc: + raise ValueError( + f"Cluster {cluster_id} not found on endpoint {endpoint_id} while issuing command {command} with args {args}" + ) from exc + commands: dict[int, ZCLCommandDef] = ( + cluster.server_commands + if command_type == CLUSTER_COMMAND_SERVER + else cluster.client_commands ) - return response + if args is not None: + self.warning( + "args [%s] are deprecated and should be passed with the params key. The parameter names are: %s", + args, + [field.name for field in commands[command].schema.fields], + ) + response = await getattr(cluster, commands[command].name)(*args) + else: + assert params is not None + response = await ( + getattr(cluster, commands[command].name)( + **convert_to_zcl_values(params, commands[command].schema) + ) + ) + self.debug( + "Issued cluster command: %s %s %s %s %s %s %s %s", + f"{ATTR_CLUSTER_ID}: [{cluster_id}]", + f"{ATTR_CLUSTER_TYPE}: [{cluster_type}]", + f"{ATTR_ENDPOINT_ID}: [{endpoint_id}]", + f"{ATTR_COMMAND}: [{command}]", + f"{ATTR_COMMAND_TYPE}: [{command_type}]", + f"{ATTR_ARGS}: [{args}]", + f"{ATTR_PARAMS}: [{params}]", + f"{ATTR_MANUFACTURER}: [{manufacturer}]", + ) + if response is None: + return # client commands don't return a response + if isinstance(response, Exception): + raise HomeAssistantError("Failed to issue cluster command") from response + if response[1] is not ZclStatus.SUCCESS: + raise HomeAssistantError( + f"Failed to issue cluster command with status: {response[1]}" + ) async def async_add_to_group(self, group_id: int) -> None: """Add this device to the provided zigbee group.""" diff --git a/homeassistant/components/zha/core/helpers.py b/homeassistant/components/zha/core/helpers.py index 7fd789ac3f5..409d45789b5 100644 --- a/homeassistant/components/zha/core/helpers.py +++ b/homeassistant/components/zha/core/helpers.py @@ -10,9 +10,11 @@ import asyncio import binascii from collections.abc import Callable, Iterator from dataclasses import dataclass +import enum import functools import itertools import logging +import operator from random import uniform import re from typing import TYPE_CHECKING, Any, TypeVar @@ -22,12 +24,13 @@ import zigpy.exceptions import zigpy.types import zigpy.util import zigpy.zcl +from zigpy.zcl.foundation import CommandSchema import zigpy.zdo.types as zdo_types from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant, State, callback from homeassistant.exceptions import IntegrationError -from homeassistant.helpers import device_registry as dr +from homeassistant.helpers import config_validation as cv, device_registry as dr from .const import ( CLUSTER_TYPE_IN, @@ -120,6 +123,83 @@ async def get_matched_clusters( return clusters_to_bind +def cluster_command_schema_to_vol_schema(schema: CommandSchema) -> vol.Schema: + """Convert a cluster command schema to a voluptuous schema.""" + return vol.Schema( + { + vol.Optional(field.name) + if field.optional + else vol.Required(field.name): schema_type_to_vol(field.type) + for field in schema.fields + } + ) + + +def schema_type_to_vol(field_type: Any) -> Any: + """Convert a schema type to a voluptuous type.""" + if issubclass(field_type, enum.Flag) and len(field_type.__members__.keys()): + return cv.multi_select( + [key.replace("_", " ") for key in field_type.__members__.keys()] + ) + if issubclass(field_type, enum.Enum) and len(field_type.__members__.keys()): + return vol.In([key.replace("_", " ") for key in field_type.__members__.keys()]) + if ( + issubclass(field_type, zigpy.types.FixedIntType) + or issubclass(field_type, enum.Flag) + or issubclass(field_type, enum.Enum) + ): + return vol.All( + vol.Coerce(int), vol.Range(field_type.min_value, field_type.max_value) + ) + return str + + +def convert_to_zcl_values( + fields: dict[str, Any], schema: CommandSchema +) -> dict[str, Any]: + """Convert user input to ZCL values.""" + converted_fields: dict[str, Any] = {} + for field in schema.fields: + if field.name not in fields: + continue + value = fields[field.name] + if issubclass(field.type, enum.Flag): + if isinstance(value, list): + value = field.type( + functools.reduce( + operator.ior, + [ + field.type[flag.replace(" ", "_")] + if isinstance(flag, str) + else field.type(flag) + for flag in value + ], + ) + ) + else: + value = ( + field.type[value.replace(" ", "_")] + if isinstance(value, str) + else field.type(value) + ) + elif issubclass(field.type, enum.Enum): + value = ( + field.type[value.replace(" ", "_")] + if isinstance(value, str) + else field.type(value) + ) + else: + value = field.type(value) + _LOGGER.debug( + "Converted ZCL schema field(%s) value from: %s to: %s", + field.name, + fields[field.name], + value, + ) + converted_fields[field.name] = value + return converted_fields + + @callback def async_is_bindable_target(source_zha_device, target_zha_device): """Determine if target is bindable to source.""" diff --git a/homeassistant/components/zha/services.yaml b/homeassistant/components/zha/services.yaml index 0e645da365e..132dae6e745 100644 --- a/homeassistant/components/zha/services.yaml +++ b/homeassistant/components/zha/services.yaml @@ -187,6 +187,11 @@ issue_zigbee_cluster_command: example: "[arg1, arg2, argN]" selector: object: + params: + name: Params + description: parameters to pass to the command + selector: + object: manufacturer: name: Manufacturer description: manufacturer code diff --git a/tests/components/zha/test_helpers.py b/tests/components/zha/test_helpers.py new file mode 100644 index 00000000000..f5fb5c4f5c0 --- /dev/null +++ b/tests/components/zha/test_helpers.py @@ -0,0 +1,197 @@ +"""Tests for ZHA helpers.""" +import logging +from unittest.mock import patch + +import pytest +import voluptuous_serialize +import zigpy.profiles.zha as zha +from zigpy.types.basic import uint16_t +import zigpy.zcl.clusters.general as general +import zigpy.zcl.clusters.lighting as lighting + +from homeassistant.components.zha.core.helpers import ( + cluster_command_schema_to_vol_schema, + convert_to_zcl_values, +) +from homeassistant.const import Platform +import homeassistant.helpers.config_validation as cv + +from .common import async_enable_traffic +from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE + +_LOGGER = logging.getLogger(__name__) + + +@pytest.fixture(autouse=True) +def light_platform_only(): + """Only setup the light and required base platforms to speed up tests.""" + with patch( + "homeassistant.components.zha.PLATFORMS", + ( + Platform.BUTTON, + Platform.LIGHT, + Platform.NUMBER, + Platform.SELECT, + ), + ): + yield + + +@pytest.fixture +async def device_light(hass, zigpy_device_mock, zha_device_joined): + """Test light.""" + + zigpy_device = zigpy_device_mock( + { + 1: { + SIG_EP_INPUT: [ + general.OnOff.cluster_id, + general.LevelControl.cluster_id, + lighting.Color.cluster_id, + general.Groups.cluster_id, + general.Identify.cluster_id, + ], + SIG_EP_OUTPUT: [], + SIG_EP_TYPE: zha.DeviceType.COLOR_DIMMABLE_LIGHT, + SIG_EP_PROFILE: zha.PROFILE_ID, + } + } + ) + color_cluster = zigpy_device.endpoints[1].light_color + color_cluster.PLUGGED_ATTR_READS = { + "color_capabilities": lighting.Color.ColorCapabilities.Color_temperature + | lighting.Color.ColorCapabilities.XY_attributes + } + zha_device = await zha_device_joined(zigpy_device) + zha_device.available = True + return color_cluster, zha_device + + +async def test_zcl_schema_conversions(hass, device_light): + """Test ZHA ZCL schema conversion helpers.""" + color_cluster, zha_device = device_light + await async_enable_traffic(hass, [zha_device]) + command_schema = color_cluster.commands_by_name["color_loop_set"].schema + expected_schema = [ + { + "type": "multi_select", + "options": ["Action", "Direction", "Time", "Start Hue"], + "name": "update_flags", + "required": True, + }, + { + "type": "select", + "options": [ + ("Deactivate", "Deactivate"), + ("Activate from color loop hue", "Activate from color loop hue"), + ("Activate from current hue", "Activate from current hue"), + ], + "name": "action", + "required": True, + }, + { + "type": "select", + "options": [("Decrement", "Decrement"), ("Increment", "Increment")], + "name": "direction", + "required": True, + }, + { + "type": "integer", + "valueMin": 0, + "valueMax": 65535, + "name": "time", + "required": True, + }, + { + "type": "integer", + "valueMin": 0, + "valueMax": 65535, + "name": "start_hue", + "required": True, + }, + { + "type": "integer", + "valueMin": 0, + "valueMax": 255, + "name": "options_mask", + "optional": True, + }, + { + "type": "integer", + "valueMin": 0, + "valueMax": 255, + "name": "options_override", + "optional": True, + }, + ] + vol_schema = voluptuous_serialize.convert( + cluster_command_schema_to_vol_schema(command_schema), + custom_serializer=cv.custom_serializer, + ) + assert vol_schema == expected_schema + + raw_data = { + "update_flags": ["Action", "Start Hue"], + "action": "Activate from current hue", + "direction": "Increment", + "time": 20, + "start_hue": 196, + } + + converted_data = convert_to_zcl_values(raw_data, command_schema) + + assert isinstance( + converted_data["update_flags"], lighting.Color.ColorLoopUpdateFlags + ) + assert lighting.Color.ColorLoopUpdateFlags.Action in converted_data["update_flags"] + assert ( + lighting.Color.ColorLoopUpdateFlags.Start_Hue in converted_data["update_flags"] + ) + + assert isinstance(converted_data["action"], lighting.Color.ColorLoopAction) + assert ( + converted_data["action"] + == lighting.Color.ColorLoopAction.Activate_from_current_hue + ) + + assert isinstance(converted_data["direction"], lighting.Color.ColorLoopDirection) + assert converted_data["direction"] == lighting.Color.ColorLoopDirection.Increment + + assert isinstance(converted_data["time"], uint16_t) + assert converted_data["time"] == 20 + + assert isinstance(converted_data["start_hue"], uint16_t) + assert converted_data["start_hue"] == 196 + + raw_data = { + "update_flags": [0b0000_0001, 0b0000_1000], + "action": 0x02, + "direction": 0x01, + "time": 20, + "start_hue": 196, + } + + converted_data = convert_to_zcl_values(raw_data, command_schema) + + assert isinstance( + converted_data["update_flags"], lighting.Color.ColorLoopUpdateFlags + ) + assert lighting.Color.ColorLoopUpdateFlags.Action in converted_data["update_flags"] + assert ( + lighting.Color.ColorLoopUpdateFlags.Start_Hue in converted_data["update_flags"] + ) + + assert isinstance(converted_data["action"], lighting.Color.ColorLoopAction) + assert ( + converted_data["action"] + == lighting.Color.ColorLoopAction.Activate_from_current_hue + ) + + assert isinstance(converted_data["direction"], lighting.Color.ColorLoopDirection) + assert converted_data["direction"] == lighting.Color.ColorLoopDirection.Increment + + assert isinstance(converted_data["time"], uint16_t) + assert converted_data["time"] == 20 + + assert isinstance(converted_data["start_hue"], uint16_t) + assert converted_data["start_hue"] == 196