Add proper S2 support for adding zwave_js nodes (#56516)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
Raman Gupta 2021-09-28 18:37:45 -04:00 committed by GitHub
parent db30c27455
commit e76ddb4b27
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 294 additions and 95 deletions

View file

@ -3,7 +3,12 @@ import json
from unittest.mock import patch
import pytest
from zwave_js_server.const import CommandClass, InclusionStrategy, LogLevel
from zwave_js_server.const import (
CommandClass,
InclusionStrategy,
LogLevel,
SecurityClass,
)
from zwave_js_server.event import Event
from zwave_js_server.exceptions import (
FailedCommand,
@ -16,6 +21,7 @@ from zwave_js_server.model.value import _get_value_id_from_dict, get_value_id
from homeassistant.components.websocket_api.const import ERR_NOT_FOUND
from homeassistant.components.zwave_js.api import (
CLIENT_SIDE_AUTH,
COMMAND_CLASS_ID,
CONFIG,
ENABLED,
@ -24,13 +30,15 @@ from homeassistant.components.zwave_js.api import (
FILENAME,
FORCE_CONSOLE,
ID,
INCLUSION_STRATEGY,
LEVEL,
LOG_TO_FILE,
NODE_ID,
OPTED_IN,
PIN,
PROPERTY,
PROPERTY_KEY,
SECURE,
SECURITY_CLASSES,
TYPE,
VALUE,
)
@ -354,31 +362,6 @@ async def test_ping_node(
assert msg["error"]["code"] == ERR_NOT_LOADED
async def test_add_node_secure(
hass, nortek_thermostat_added_event, integration, client, hass_ws_client
):
"""Test the add_node websocket command with secure flag."""
entry = integration
ws_client = await hass_ws_client(hass)
client.async_send_command.return_value = {"success": True}
await ws_client.send_json(
{ID: 1, TYPE: "zwave_js/add_node", ENTRY_ID: entry.entry_id, SECURE: True}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert len(client.async_send_command.call_args_list) == 1
assert client.async_send_command.call_args[0][0] == {
"command": "controller.begin_inclusion",
"options": {"strategy": InclusionStrategy.SECURITY_S0},
}
client.async_send_command.reset_mock()
async def test_add_node(
hass, nortek_thermostat_added_event, integration, client, hass_ws_client
):
@ -389,7 +372,12 @@ async def test_add_node(
client.async_send_command.return_value = {"success": True}
await ws_client.send_json(
{ID: 3, TYPE: "zwave_js/add_node", ENTRY_ID: entry.entry_id}
{
ID: 3,
TYPE: "zwave_js/add_node",
ENTRY_ID: entry.entry_id,
INCLUSION_STRATEGY: InclusionStrategy.DEFAULT.value,
}
)
msg = await ws_client.receive_json()
@ -398,7 +386,7 @@ async def test_add_node(
assert len(client.async_send_command.call_args_list) == 1
assert client.async_send_command.call_args[0][0] == {
"command": "controller.begin_inclusion",
"options": {"strategy": InclusionStrategy.INSECURE},
"options": {"strategy": InclusionStrategy.DEFAULT},
}
event = Event(
@ -414,6 +402,37 @@ async def test_add_node(
msg = await ws_client.receive_json()
assert msg["event"]["event"] == "inclusion started"
event = Event(
type="grant security classes",
data={
"source": "controller",
"event": "grant security classes",
"requested": {"securityClasses": [0, 1, 2, 7], "clientSideAuth": False},
},
)
client.driver.receive_event(event)
msg = await ws_client.receive_json()
assert msg["event"]["event"] == "grant security classes"
assert msg["event"]["requested_grant"] == {
"securityClasses": [0, 1, 2, 7],
"clientSideAuth": False,
}
event = Event(
type="validate dsk and enter pin",
data={
"source": "controller",
"event": "validate dsk and enter pin",
"dsk": "test",
},
)
client.driver.receive_event(event)
msg = await ws_client.receive_json()
assert msg["event"]["event"] == "validate dsk and enter pin"
assert msg["event"]["dsk"] == "test"
client.driver.receive_event(nortek_thermostat_added_event)
msg = await ws_client.receive_json()
assert msg["event"]["event"] == "node added"
@ -421,6 +440,7 @@ async def test_add_node(
"node_id": 67,
"status": 0,
"ready": False,
"low_security": False,
}
assert msg["event"]["node"] == node_details
@ -503,6 +523,94 @@ async def test_add_node(
assert msg["error"]["code"] == ERR_NOT_LOADED
async def test_grant_security_classes(hass, integration, client, hass_ws_client):
"""Test the grant_security_classes websocket command."""
entry = integration
ws_client = await hass_ws_client(hass)
client.async_send_command.return_value = {}
await ws_client.send_json(
{
ID: 1,
TYPE: "zwave_js/grant_security_classes",
ENTRY_ID: entry.entry_id,
SECURITY_CLASSES: [SecurityClass.S2_UNAUTHENTICATED],
CLIENT_SIDE_AUTH: False,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert len(client.async_send_command.call_args_list) == 1
assert client.async_send_command.call_args[0][0] == {
"command": "controller.grant_security_classes",
"inclusionGrant": {"securityClasses": [0], "clientSideAuth": False},
}
# Test sending command with not loaded entry fails
await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
await ws_client.send_json(
{
ID: 4,
TYPE: "zwave_js/grant_security_classes",
ENTRY_ID: entry.entry_id,
SECURITY_CLASSES: [SecurityClass.S2_UNAUTHENTICATED],
CLIENT_SIDE_AUTH: False,
}
)
msg = await ws_client.receive_json()
assert not msg["success"]
assert msg["error"]["code"] == ERR_NOT_LOADED
async def test_validate_dsk_and_enter_pin(hass, integration, client, hass_ws_client):
"""Test the validate_dsk_and_enter_pin websocket command."""
entry = integration
ws_client = await hass_ws_client(hass)
client.async_send_command.return_value = {}
await ws_client.send_json(
{
ID: 1,
TYPE: "zwave_js/validate_dsk_and_enter_pin",
ENTRY_ID: entry.entry_id,
PIN: "test",
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert len(client.async_send_command.call_args_list) == 1
assert client.async_send_command.call_args[0][0] == {
"command": "controller.validate_dsk_and_enter_pin",
"pin": "test",
}
# Test sending command with not loaded entry fails
await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
await ws_client.send_json(
{
ID: 4,
TYPE: "zwave_js/validate_dsk_and_enter_pin",
ENTRY_ID: entry.entry_id,
PIN: "test",
}
)
msg = await ws_client.receive_json()
assert not msg["success"]
assert msg["error"]["code"] == ERR_NOT_LOADED
async def test_cancel_inclusion_exclusion(hass, integration, client, hass_ws_client):
"""Test cancelling the inclusion and exclusion process."""
entry = integration
@ -607,7 +715,6 @@ async def test_remove_node(
data={
"source": "controller",
"event": "exclusion started",
"secure": False,
},
)
client.driver.receive_event(event)
@ -666,52 +773,6 @@ async def test_remove_node(
assert msg["error"]["code"] == ERR_NOT_LOADED
async def test_replace_failed_node_secure(
hass,
nortek_thermostat,
integration,
client,
hass_ws_client,
):
"""Test the replace_failed_node websocket command with secure flag."""
entry = integration
ws_client = await hass_ws_client(hass)
dev_reg = dr.async_get(hass)
# Create device registry entry for mock node
dev_reg.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={(DOMAIN, "3245146787-67")},
name="Node 67",
)
client.async_send_command.return_value = {"success": True}
await ws_client.send_json(
{
ID: 1,
TYPE: "zwave_js/replace_failed_node",
ENTRY_ID: entry.entry_id,
NODE_ID: 67,
SECURE: True,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert msg["result"]
assert len(client.async_send_command.call_args_list) == 1
assert client.async_send_command.call_args[0][0] == {
"command": "controller.replace_failed_node",
"nodeId": nortek_thermostat.node_id,
"options": {"strategy": InclusionStrategy.SECURITY_S0},
}
client.async_send_command.reset_mock()
async def test_replace_failed_node(
hass,
nortek_thermostat,
@ -744,6 +805,7 @@ async def test_replace_failed_node(
TYPE: "zwave_js/replace_failed_node",
ENTRY_ID: entry.entry_id,
NODE_ID: 67,
INCLUSION_STRATEGY: InclusionStrategy.DEFAULT.value,
}
)
@ -755,7 +817,7 @@ async def test_replace_failed_node(
assert client.async_send_command.call_args[0][0] == {
"command": "controller.replace_failed_node",
"nodeId": nortek_thermostat.node_id,
"options": {"strategy": InclusionStrategy.INSECURE},
"options": {"strategy": InclusionStrategy.DEFAULT},
}
client.async_send_command.reset_mock()
@ -773,12 +835,42 @@ async def test_replace_failed_node(
msg = await ws_client.receive_json()
assert msg["event"]["event"] == "inclusion started"
event = Event(
type="grant security classes",
data={
"source": "controller",
"event": "grant security classes",
"requested": {"securityClasses": [0, 1, 2, 7], "clientSideAuth": False},
},
)
client.driver.receive_event(event)
msg = await ws_client.receive_json()
assert msg["event"]["event"] == "grant security classes"
assert msg["event"]["requested_grant"] == {
"securityClasses": [0, 1, 2, 7],
"clientSideAuth": False,
}
event = Event(
type="validate dsk and enter pin",
data={
"source": "controller",
"event": "validate dsk and enter pin",
"dsk": "test",
},
)
client.driver.receive_event(event)
msg = await ws_client.receive_json()
assert msg["event"]["event"] == "validate dsk and enter pin"
assert msg["event"]["dsk"] == "test"
event = Event(
type="inclusion stopped",
data={
"source": "controller",
"event": "inclusion stopped",
"secure": False,
},
)
client.driver.receive_event(event)