Add alarm control panel support to ZHA (#49080)

* start implementation of IAS ACE

* starting alarm control panel

* enums

* use new enums from zigpy

* fix import

* write state

* fix registries after rebase

* remove extra line

* cleanup

* fix deprecation warning

* updates to catch up with codebase evolution

* minor updates

* cleanup

* implement more ias ace functionality

* cleanup

* make config helper work for supplied section

* connect to configuration

* use ha async_create_task

* add some tests

* remove unused restore method

* update tests

* add tests from panel POV

* dynamically include alarm control panel config

* fix import

Co-authored-by: Alexei Chetroi <lexoid@gmail.com>
This commit is contained in:
David F. Mulcahey 2021-04-27 10:58:59 -04:00 committed by GitHub
parent d4ed65e0f5
commit a644c2e8ba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 719 additions and 10 deletions

View file

@ -31,7 +31,6 @@ from .const import (
CUSTOM_CONFIGURATION,
DATA_ZHA,
DATA_ZHA_GATEWAY,
ZHA_OPTIONS,
)
from .registries import BINDABLE_CLUSTERS
from .typing import ZhaDeviceType, ZigpyClusterType
@ -131,15 +130,27 @@ def async_is_bindable_target(source_zha_device, target_zha_device):
@callback
def async_get_zha_config_value(config_entry, config_key, default):
def async_get_zha_config_value(config_entry, section, config_key, default):
"""Get the value for the specified configuration from the zha config entry."""
return (
config_entry.options.get(CUSTOM_CONFIGURATION, {})
.get(ZHA_OPTIONS, {})
.get(section, {})
.get(config_key, default)
)
def async_input_cluster_exists(hass, cluster_id):
"""Determine if a device containing the specified in cluster is paired."""
zha_gateway = hass.data[DATA_ZHA][DATA_ZHA_GATEWAY]
zha_devices = zha_gateway.devices.values()
for zha_device in zha_devices:
clusters_by_endpoint = zha_device.async_get_clusters()
for clusters in clusters_by_endpoint.values():
if cluster_id in clusters[CLUSTER_TYPE_IN]:
return True
return False
async def async_get_zha_device(hass, device_id):
"""Get a ZHA device for the given device registry id."""
device_registry = await hass.helpers.device_registry.async_get_registry()