Add ability to convert ZCL schemas to vol schemas to ZHA (#79908)

* try serializing cluster command schemas

* use min and max value from zigpy type

* different type assignments

* initial command execution changes

* cleanup

* cleanup and typing

* typing

* typing

* add tests

* mypy

* handle raw values too

* check for None responses

* make backwards compatible

* update yaml for svc change
This commit is contained in:
David F. Mulcahey 2022-10-14 08:15:10 -04:00 committed by GitHub
parent 6a757662e4
commit d75e449c52
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 415 additions and 68 deletions

View file

@ -3,7 +3,7 @@ from __future__ import annotations
import asyncio
import logging
from typing import TYPE_CHECKING, Any, NamedTuple
from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar, cast
import voluptuous as vol
import zigpy.backups
@ -31,6 +31,7 @@ from .core.const import (
ATTR_LEVEL,
ATTR_MANUFACTURER,
ATTR_MEMBERS,
ATTR_PARAMS,
ATTR_TYPE,
ATTR_VALUE,
ATTR_WARNING_DEVICE_DURATION,
@ -69,6 +70,7 @@ from .core.group import GroupMember
from .core.helpers import (
async_cluster_exists,
async_is_bindable_target,
cluster_command_schema_to_vol_schema,
convert_install_code,
get_matched_clusters,
qr_to_install_code,
@ -110,6 +112,17 @@ IEEE_SERVICE = "ieee_based_service"
IEEE_SCHEMA = vol.All(cv.string, EUI64.convert)
# typing typevar
_T = TypeVar("_T")
def _ensure_list_if_present(value: _T | None) -> list[_T] | list[Any] | None:
"""Wrap value in list if it is provided and not one."""
if value is None:
return None
return cast("list[_T]", value) if isinstance(value, list) else [value]
SERVICE_PERMIT_PARAMS = {
vol.Optional(ATTR_IEEE): IEEE_SCHEMA,
vol.Optional(ATTR_DURATION, default=60): vol.All(
@ -181,17 +194,22 @@ SERVICE_SCHEMAS = {
): cv.positive_int,
}
),
SERVICE_ISSUE_ZIGBEE_CLUSTER_COMMAND: vol.Schema(
{
vol.Required(ATTR_IEEE): IEEE_SCHEMA,
vol.Required(ATTR_ENDPOINT_ID): cv.positive_int,
vol.Required(ATTR_CLUSTER_ID): cv.positive_int,
vol.Optional(ATTR_CLUSTER_TYPE, default=CLUSTER_TYPE_IN): cv.string,
vol.Required(ATTR_COMMAND): cv.positive_int,
vol.Required(ATTR_COMMAND_TYPE): cv.string,
vol.Optional(ATTR_ARGS, default=[]): cv.ensure_list,
vol.Optional(ATTR_MANUFACTURER): cv.positive_int,
}
SERVICE_ISSUE_ZIGBEE_CLUSTER_COMMAND: vol.All(
vol.Schema(
{
vol.Required(ATTR_IEEE): IEEE_SCHEMA,
vol.Required(ATTR_ENDPOINT_ID): cv.positive_int,
vol.Required(ATTR_CLUSTER_ID): cv.positive_int,
vol.Optional(ATTR_CLUSTER_TYPE, default=CLUSTER_TYPE_IN): cv.string,
vol.Required(ATTR_COMMAND): cv.positive_int,
vol.Required(ATTR_COMMAND_TYPE): cv.string,
vol.Exclusive(ATTR_ARGS, "attrs_params"): _ensure_list_if_present,
vol.Exclusive(ATTR_PARAMS, "attrs_params"): dict,
vol.Optional(ATTR_MANUFACTURER): cv.positive_int,
}
),
cv.deprecated(ATTR_ARGS),
cv.has_at_least_one_key(ATTR_ARGS, ATTR_PARAMS),
),
SERVICE_ISSUE_ZIGBEE_GROUP_COMMAND: vol.Schema(
{
@ -711,6 +729,8 @@ async def websocket_device_cluster_commands(
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
) -> None:
"""Return a list of cluster commands."""
import voluptuous_serialize # pylint: disable=import-outside-toplevel
zha_gateway: ZHAGateway = hass.data[DATA_ZHA][DATA_ZHA_GATEWAY]
ieee: EUI64 = msg[ATTR_IEEE]
endpoint_id: int = msg[ATTR_ENDPOINT_ID]
@ -731,6 +751,10 @@ async def websocket_device_cluster_commands(
TYPE: CLIENT,
ID: cmd_id,
ATTR_NAME: cmd.name,
"schema": voluptuous_serialize.convert(
cluster_command_schema_to_vol_schema(cmd.schema),
custom_serializer=cv.custom_serializer,
),
}
)
for cmd_id, cmd in commands[CLUSTER_COMMANDS_SERVER].items():
@ -739,6 +763,10 @@ async def websocket_device_cluster_commands(
TYPE: CLUSTER_COMMAND_SERVER,
ID: cmd_id,
ATTR_NAME: cmd.name,
"schema": voluptuous_serialize.convert(
cluster_command_schema_to_vol_schema(cmd.schema),
custom_serializer=cv.custom_serializer,
),
}
)
_LOGGER.debug(
@ -1285,41 +1313,45 @@ def async_load_api(hass: HomeAssistant) -> None:
cluster_type: str = service.data[ATTR_CLUSTER_TYPE]
command: int = service.data[ATTR_COMMAND]
command_type: str = service.data[ATTR_COMMAND_TYPE]
args: list = service.data[ATTR_ARGS]
args: list | None = service.data.get(ATTR_ARGS)
params: dict | None = service.data.get(ATTR_PARAMS)
manufacturer: int | None = service.data.get(ATTR_MANUFACTURER)
zha_device = zha_gateway.get_device(ieee)
response = None
if zha_device is not None:
if cluster_id >= MFG_CLUSTER_ID_START and manufacturer is None:
manufacturer = zha_device.manufacturer_code
response = await zha_device.issue_cluster_command(
await zha_device.issue_cluster_command(
endpoint_id,
cluster_id,
command,
command_type,
*args,
args,
params,
cluster_type=cluster_type,
manufacturer=manufacturer,
)
_LOGGER.debug(
"Issued command for: %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: %s %s: [%s] %s: %s",
ATTR_CLUSTER_ID,
cluster_id,
ATTR_CLUSTER_TYPE,
cluster_type,
ATTR_ENDPOINT_ID,
endpoint_id,
ATTR_COMMAND,
command,
ATTR_COMMAND_TYPE,
command_type,
ATTR_ARGS,
args,
ATTR_MANUFACTURER,
manufacturer,
RESPONSE,
response,
)
_LOGGER.debug(
"Issued command for: %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s] %s: [%s]",
ATTR_CLUSTER_ID,
cluster_id,
ATTR_CLUSTER_TYPE,
cluster_type,
ATTR_ENDPOINT_ID,
endpoint_id,
ATTR_COMMAND,
command,
ATTR_COMMAND_TYPE,
command_type,
ATTR_ARGS,
args,
ATTR_PARAMS,
params,
ATTR_MANUFACTURER,
manufacturer,
)
else:
raise ValueError(f"Device with IEEE {str(ieee)} not found")
async_register_admin_service(
hass,