Clean zwave_js api typing (#72484)
* Clean zwave_js api typing * Add temporary type ignore
This commit is contained in:
parent
101b1489c8
commit
f9f87c607e
1 changed files with 28 additions and 10 deletions
|
@ -4,7 +4,7 @@ from __future__ import annotations
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
import dataclasses
|
import dataclasses
|
||||||
from functools import partial, wraps
|
from functools import partial, wraps
|
||||||
from typing import Any
|
from typing import Any, Literal
|
||||||
|
|
||||||
from aiohttp import web, web_exceptions, web_request
|
from aiohttp import web, web_exceptions, web_request
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
@ -113,6 +113,22 @@ DRY_RUN = "dry_run"
|
||||||
|
|
||||||
# constants for inclusion
|
# constants for inclusion
|
||||||
INCLUSION_STRATEGY = "inclusion_strategy"
|
INCLUSION_STRATEGY = "inclusion_strategy"
|
||||||
|
|
||||||
|
# Remove type ignore when bumping library to 0.37.0
|
||||||
|
INCLUSION_STRATEGY_NOT_SMART_START: dict[ # type: ignore[misc]
|
||||||
|
int,
|
||||||
|
Literal[
|
||||||
|
InclusionStrategy.DEFAULT,
|
||||||
|
InclusionStrategy.SECURITY_S0,
|
||||||
|
InclusionStrategy.SECURITY_S2,
|
||||||
|
InclusionStrategy.INSECURE,
|
||||||
|
],
|
||||||
|
] = {
|
||||||
|
InclusionStrategy.DEFAULT.value: InclusionStrategy.DEFAULT,
|
||||||
|
InclusionStrategy.SECURITY_S0.value: InclusionStrategy.SECURITY_S0,
|
||||||
|
InclusionStrategy.SECURITY_S2.value: InclusionStrategy.SECURITY_S2,
|
||||||
|
InclusionStrategy.INSECURE.value: InclusionStrategy.INSECURE,
|
||||||
|
}
|
||||||
PIN = "pin"
|
PIN = "pin"
|
||||||
FORCE_SECURITY = "force_security"
|
FORCE_SECURITY = "force_security"
|
||||||
PLANNED_PROVISIONING_ENTRY = "planned_provisioning_entry"
|
PLANNED_PROVISIONING_ENTRY = "planned_provisioning_entry"
|
||||||
|
@ -143,20 +159,19 @@ MINIMUM_QR_STRING_LENGTH = 52
|
||||||
|
|
||||||
def convert_planned_provisioning_entry(info: dict) -> ProvisioningEntry:
|
def convert_planned_provisioning_entry(info: dict) -> ProvisioningEntry:
|
||||||
"""Handle provisioning entry dict to ProvisioningEntry."""
|
"""Handle provisioning entry dict to ProvisioningEntry."""
|
||||||
info = ProvisioningEntry(
|
return ProvisioningEntry(
|
||||||
dsk=info[DSK],
|
dsk=info[DSK],
|
||||||
security_classes=[SecurityClass(sec_cls) for sec_cls in info[SECURITY_CLASSES]],
|
security_classes=[SecurityClass(sec_cls) for sec_cls in info[SECURITY_CLASSES]],
|
||||||
additional_properties={
|
additional_properties={
|
||||||
k: v for k, v in info.items() if k not in (DSK, SECURITY_CLASSES)
|
k: v for k, v in info.items() if k not in (DSK, SECURITY_CLASSES)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
return info
|
|
||||||
|
|
||||||
|
|
||||||
def convert_qr_provisioning_information(info: dict) -> QRProvisioningInformation:
|
def convert_qr_provisioning_information(info: dict) -> QRProvisioningInformation:
|
||||||
"""Convert QR provisioning information dict to QRProvisioningInformation."""
|
"""Convert QR provisioning information dict to QRProvisioningInformation."""
|
||||||
protocols = [Protocols(proto) for proto in info.get(SUPPORTED_PROTOCOLS, [])]
|
protocols = [Protocols(proto) for proto in info.get(SUPPORTED_PROTOCOLS, [])]
|
||||||
info = QRProvisioningInformation(
|
return QRProvisioningInformation(
|
||||||
version=QRCodeVersion(info[VERSION]),
|
version=QRCodeVersion(info[VERSION]),
|
||||||
security_classes=[SecurityClass(sec_cls) for sec_cls in info[SECURITY_CLASSES]],
|
security_classes=[SecurityClass(sec_cls) for sec_cls in info[SECURITY_CLASSES]],
|
||||||
dsk=info[DSK],
|
dsk=info[DSK],
|
||||||
|
@ -172,7 +187,6 @@ def convert_qr_provisioning_information(info: dict) -> QRProvisioningInformation
|
||||||
supported_protocols=protocols if protocols else None,
|
supported_protocols=protocols if protocols else None,
|
||||||
additional_properties=info.get(ADDITIONAL_PROPERTIES, {}),
|
additional_properties=info.get(ADDITIONAL_PROPERTIES, {}),
|
||||||
)
|
)
|
||||||
return info
|
|
||||||
|
|
||||||
|
|
||||||
# Helper schemas
|
# Helper schemas
|
||||||
|
@ -655,7 +669,7 @@ async def websocket_add_node(
|
||||||
)
|
)
|
||||||
|
|
||||||
connection.subscriptions[msg["id"]] = async_cleanup
|
connection.subscriptions[msg["id"]] = async_cleanup
|
||||||
msg[DATA_UNSUBSCRIBE] = unsubs = [
|
unsubs: list[Callable[[], None]] = [
|
||||||
controller.on("inclusion started", forward_event),
|
controller.on("inclusion started", forward_event),
|
||||||
controller.on("inclusion failed", forward_event),
|
controller.on("inclusion failed", forward_event),
|
||||||
controller.on("inclusion stopped", forward_event),
|
controller.on("inclusion stopped", forward_event),
|
||||||
|
@ -666,10 +680,13 @@ async def websocket_add_node(
|
||||||
hass, EVENT_DEVICE_ADDED_TO_REGISTRY, device_registered
|
hass, EVENT_DEVICE_ADDED_TO_REGISTRY, device_registered
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
msg[DATA_UNSUBSCRIBE] = unsubs
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = await controller.async_begin_inclusion(
|
result = await controller.async_begin_inclusion(
|
||||||
inclusion_strategy, force_security=force_security, provisioning=provisioning
|
INCLUSION_STRATEGY_NOT_SMART_START[inclusion_strategy.value],
|
||||||
|
force_security=force_security,
|
||||||
|
provisioning=provisioning,
|
||||||
)
|
)
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
connection.send_error(
|
connection.send_error(
|
||||||
|
@ -1165,7 +1182,7 @@ async def websocket_replace_failed_node(
|
||||||
)
|
)
|
||||||
|
|
||||||
connection.subscriptions[msg["id"]] = async_cleanup
|
connection.subscriptions[msg["id"]] = async_cleanup
|
||||||
msg[DATA_UNSUBSCRIBE] = unsubs = [
|
unsubs: list[Callable[[], None]] = [
|
||||||
controller.on("inclusion started", forward_event),
|
controller.on("inclusion started", forward_event),
|
||||||
controller.on("inclusion failed", forward_event),
|
controller.on("inclusion failed", forward_event),
|
||||||
controller.on("inclusion stopped", forward_event),
|
controller.on("inclusion stopped", forward_event),
|
||||||
|
@ -1177,11 +1194,12 @@ async def websocket_replace_failed_node(
|
||||||
hass, EVENT_DEVICE_ADDED_TO_REGISTRY, device_registered
|
hass, EVENT_DEVICE_ADDED_TO_REGISTRY, device_registered
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
msg[DATA_UNSUBSCRIBE] = unsubs
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = await controller.async_replace_failed_node(
|
result = await controller.async_replace_failed_node(
|
||||||
node_id,
|
node_id,
|
||||||
inclusion_strategy,
|
INCLUSION_STRATEGY_NOT_SMART_START[inclusion_strategy.value],
|
||||||
force_security=force_security,
|
force_security=force_security,
|
||||||
provisioning=provisioning,
|
provisioning=provisioning,
|
||||||
)
|
)
|
||||||
|
@ -1540,7 +1558,7 @@ async def websocket_get_config_parameters(
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Get a list of configuration parameters for a Z-Wave node."""
|
"""Get a list of configuration parameters for a Z-Wave node."""
|
||||||
values = node.get_configuration_values()
|
values = node.get_configuration_values()
|
||||||
result = {}
|
result: dict[str, Any] = {}
|
||||||
for value_id, zwave_value in values.items():
|
for value_id, zwave_value in values.items():
|
||||||
metadata = zwave_value.metadata
|
metadata = zwave_value.metadata
|
||||||
result[value_id] = {
|
result[value_id] = {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue