166 lines
5.6 KiB
Python
166 lines
5.6 KiB
Python
"""
|
|
Helpers for Zigbee Home Automation.
|
|
|
|
For more details about this component, please refer to the documentation at
|
|
https://home-assistant.io/integrations/zha/
|
|
"""
|
|
import asyncio
|
|
import collections
|
|
import logging
|
|
|
|
import bellows.ezsp
|
|
import bellows.zigbee.application
|
|
import zigpy.types
|
|
import zigpy_deconz.api
|
|
import zigpy_deconz.zigbee.application
|
|
import zigpy_xbee.api
|
|
import zigpy_xbee.zigbee.application
|
|
import zigpy_zigate.api
|
|
import zigpy_zigate.zigbee.application
|
|
|
|
from homeassistant.core import callback
|
|
|
|
from .const import (
|
|
CLUSTER_TYPE_IN,
|
|
CLUSTER_TYPE_OUT,
|
|
DATA_ZHA,
|
|
DATA_ZHA_GATEWAY,
|
|
DEFAULT_BAUDRATE,
|
|
RadioType,
|
|
)
|
|
from .registries import BINDABLE_CLUSTERS
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
ClusterPair = collections.namedtuple("ClusterPair", "source_cluster target_cluster")
|
|
|
|
|
|
async def safe_read(
|
|
cluster, attributes, allow_cache=True, only_cache=False, manufacturer=None
|
|
):
|
|
"""Swallow all exceptions from network read.
|
|
|
|
If we throw during initialization, setup fails. Rather have an entity that
|
|
exists, but is in a maybe wrong state, than no entity. This method should
|
|
probably only be used during initialization.
|
|
"""
|
|
try:
|
|
result, _ = await cluster.read_attributes(
|
|
attributes,
|
|
allow_cache=allow_cache,
|
|
only_cache=only_cache,
|
|
manufacturer=manufacturer,
|
|
)
|
|
return result
|
|
except Exception: # pylint: disable=broad-except
|
|
return {}
|
|
|
|
|
|
async def check_zigpy_connection(usb_path, radio_type, database_path):
|
|
"""Test zigpy radio connection."""
|
|
if radio_type == RadioType.ezsp.name:
|
|
radio = bellows.ezsp.EZSP()
|
|
ControllerApplication = bellows.zigbee.application.ControllerApplication
|
|
elif radio_type == RadioType.xbee.name:
|
|
radio = zigpy_xbee.api.XBee()
|
|
ControllerApplication = zigpy_xbee.zigbee.application.ControllerApplication
|
|
elif radio_type == RadioType.deconz.name:
|
|
radio = zigpy_deconz.api.Deconz()
|
|
ControllerApplication = zigpy_deconz.zigbee.application.ControllerApplication
|
|
elif radio_type == RadioType.zigate.name:
|
|
radio = zigpy_zigate.api.ZiGate()
|
|
ControllerApplication = zigpy_zigate.zigbee.application.ControllerApplication
|
|
try:
|
|
await radio.connect(usb_path, DEFAULT_BAUDRATE)
|
|
controller = ControllerApplication(radio, database_path)
|
|
await asyncio.wait_for(controller.startup(auto_form=True), timeout=30)
|
|
await controller.shutdown()
|
|
except Exception: # pylint: disable=broad-except
|
|
return False
|
|
return True
|
|
|
|
|
|
def get_attr_id_by_name(cluster, attr_name):
|
|
"""Get the attribute id for a cluster attribute by its name."""
|
|
return next(
|
|
(
|
|
attrid
|
|
for attrid, (attrname, datatype) in cluster.attributes.items()
|
|
if attr_name == attrname
|
|
),
|
|
None,
|
|
)
|
|
|
|
|
|
async def get_matched_clusters(source_zha_device, target_zha_device):
|
|
"""Get matched input/output cluster pairs for 2 devices."""
|
|
source_clusters = source_zha_device.async_get_std_clusters()
|
|
target_clusters = target_zha_device.async_get_std_clusters()
|
|
clusters_to_bind = []
|
|
|
|
for endpoint_id in source_clusters:
|
|
for cluster_id in source_clusters[endpoint_id][CLUSTER_TYPE_OUT]:
|
|
if cluster_id not in BINDABLE_CLUSTERS:
|
|
continue
|
|
for t_endpoint_id in target_clusters:
|
|
if cluster_id in target_clusters[t_endpoint_id][CLUSTER_TYPE_IN]:
|
|
cluster_pair = ClusterPair(
|
|
source_cluster=source_clusters[endpoint_id][CLUSTER_TYPE_OUT][
|
|
cluster_id
|
|
],
|
|
target_cluster=target_clusters[t_endpoint_id][CLUSTER_TYPE_IN][
|
|
cluster_id
|
|
],
|
|
)
|
|
clusters_to_bind.append(cluster_pair)
|
|
return clusters_to_bind
|
|
|
|
|
|
@callback
|
|
def async_is_bindable_target(source_zha_device, target_zha_device):
|
|
"""Determine if target is bindable to source."""
|
|
source_clusters = source_zha_device.async_get_std_clusters()
|
|
target_clusters = target_zha_device.async_get_std_clusters()
|
|
|
|
for endpoint_id in source_clusters:
|
|
for t_endpoint_id in target_clusters:
|
|
matches = set(
|
|
source_clusters[endpoint_id][CLUSTER_TYPE_OUT].keys()
|
|
).intersection(target_clusters[t_endpoint_id][CLUSTER_TYPE_IN].keys())
|
|
if any(bindable in BINDABLE_CLUSTERS for bindable in matches):
|
|
return True
|
|
return False
|
|
|
|
|
|
async def async_get_zha_device(hass, device_id):
|
|
"""Get a ZHA device for the given device registry id."""
|
|
device_registry = await hass.helpers.device_registry.async_get_registry()
|
|
registry_device = device_registry.async_get(device_id)
|
|
zha_gateway = hass.data[DATA_ZHA][DATA_ZHA_GATEWAY]
|
|
ieee_address = list(list(registry_device.identifiers)[0])[1]
|
|
ieee = zigpy.types.EUI64.convert(ieee_address)
|
|
return zha_gateway.devices[ieee]
|
|
|
|
|
|
class LogMixin:
|
|
"""Log helper."""
|
|
|
|
def log(self, level, msg, *args):
|
|
"""Log with level."""
|
|
raise NotImplementedError
|
|
|
|
def debug(self, msg, *args):
|
|
"""Debug level log."""
|
|
return self.log(logging.DEBUG, msg, *args)
|
|
|
|
def info(self, msg, *args):
|
|
"""Info level log."""
|
|
return self.log(logging.INFO, msg, *args)
|
|
|
|
def warning(self, msg, *args):
|
|
"""Warning method log."""
|
|
return self.log(logging.WARNING, msg, *args)
|
|
|
|
def error(self, msg, *args):
|
|
"""Error level log."""
|
|
return self.log(logging.ERROR, msg, *args)
|