diff --git a/homeassistant/components/zwave_js/api.py b/homeassistant/components/zwave_js/api.py index 38d5b99147d..de2662bfa27 100644 --- a/homeassistant/components/zwave_js/api.py +++ b/homeassistant/components/zwave_js/api.py @@ -15,7 +15,10 @@ from zwave_js_server.const import ( CommandClass, InclusionStrategy, LogLevel, + Protocols, + QRCodeVersion, SecurityClass, + ZwaveFeature, ) from zwave_js_server.exceptions import ( BaseZwaveJSServerError, @@ -25,7 +28,12 @@ from zwave_js_server.exceptions import ( SetValueFailed, ) from zwave_js_server.firmware import begin_firmware_update -from zwave_js_server.model.controller import ControllerStatistics, InclusionGrant +from zwave_js_server.model.controller import ( + ControllerStatistics, + InclusionGrant, + ProvisioningEntry, + QRProvisioningInformation, +) from zwave_js_server.model.firmware import ( FirmwareUpdateFinished, FirmwareUpdateProgress, @@ -33,12 +41,14 @@ from zwave_js_server.model.firmware import ( from zwave_js_server.model.log_config import LogConfig from zwave_js_server.model.log_message import LogMessage from zwave_js_server.model.node import Node, NodeStatistics +from zwave_js_server.model.utils import async_parse_qr_code_string from zwave_js_server.util.node import async_set_config_parameter from homeassistant.components import websocket_api from homeassistant.components.http.view import HomeAssistantView from homeassistant.components.websocket_api.connection import ActiveConnection from homeassistant.components.websocket_api.const import ( + ERR_INVALID_FORMAT, ERR_NOT_FOUND, ERR_NOT_SUPPORTED, ERR_UNKNOWN_ERROR, @@ -80,8 +90,6 @@ TYPE = "type" PROPERTY = "property" PROPERTY_KEY = "property_key" VALUE = "value" -INCLUSION_STRATEGY = "inclusion_strategy" -PIN = "pin" # constants for log config commands CONFIG = "config" @@ -106,6 +114,113 @@ CLIENT_SIDE_AUTH = "client_side_auth" # constants for migration DRY_RUN = "dry_run" +# constants for inclusion +INCLUSION_STRATEGY = "inclusion_strategy" +PIN = "pin" +FORCE_SECURITY = "force_security" +PLANNED_PROVISIONING_ENTRY = "planned_provisioning_entry" +QR_PROVISIONING_INFORMATION = "qr_provisioning_information" +QR_CODE_STRING = "qr_code_string" + +DSK = "dsk" + +VERSION = "version" +GENERIC_DEVICE_CLASS = "generic_device_class" +SPECIFIC_DEVICE_CLASS = "specific_device_class" +INSTALLER_ICON_TYPE = "installer_icon_type" +MANUFACTURER_ID = "manufacturer_id" +PRODUCT_TYPE = "product_type" +PRODUCT_ID = "product_id" +APPLICATION_VERSION = "application_version" +MAX_INCLUSION_REQUEST_INTERVAL = "max_inclusion_request_interval" +UUID = "uuid" +SUPPORTED_PROTOCOLS = "supported_protocols" + +FEATURE = "feature" +UNPROVISION = "unprovision" + +# https://github.com/zwave-js/node-zwave-js/blob/master/packages/core/src/security/QR.ts#L41 +MINIMUM_QR_STRING_LENGTH = 52 + + +def convert_planned_provisioning_entry(info: dict) -> ProvisioningEntry: + """Handle provisioning entry dict to ProvisioningEntry.""" + info = ProvisioningEntry( + dsk=info[DSK], + security_classes=[SecurityClass(sec_cls) for sec_cls in info[SECURITY_CLASSES]], + additional_properties={ + k: v for k, v in info.items() if k not in (DSK, SECURITY_CLASSES) + }, + ) + return info + + +def convert_qr_provisioning_information(info: dict) -> QRProvisioningInformation: + """Convert QR provisioning information dict to QRProvisioningInformation.""" + protocols = [Protocols(proto) for proto in info.get(SUPPORTED_PROTOCOLS, [])] + info = QRProvisioningInformation( + version=QRCodeVersion(info[VERSION]), + security_classes=[SecurityClass(sec_cls) for sec_cls in info[SECURITY_CLASSES]], + dsk=info[DSK], + generic_device_class=info[GENERIC_DEVICE_CLASS], + specific_device_class=info[SPECIFIC_DEVICE_CLASS], + installer_icon_type=info[INSTALLER_ICON_TYPE], + manufacturer_id=info[MANUFACTURER_ID], + product_type=info[PRODUCT_TYPE], + product_id=info[PRODUCT_ID], + application_version=info[APPLICATION_VERSION], + max_inclusion_request_interval=info.get(MAX_INCLUSION_REQUEST_INTERVAL), + uuid=info.get(UUID), + supported_protocols=protocols if protocols else None, + ) + return info + + +# Helper schemas +PLANNED_PROVISIONING_ENTRY_SCHEMA = vol.All( + vol.Schema( + { + vol.Required(DSK): str, + vol.Required(SECURITY_CLASSES): vol.All( + cv.ensure_list, + [vol.Coerce(SecurityClass)], + ), + }, + # Provisioning entries can have extra keys for SmartStart + extra=vol.ALLOW_EXTRA, + ), + convert_planned_provisioning_entry, +) + +QR_PROVISIONING_INFORMATION_SCHEMA = vol.All( + vol.Schema( + { + vol.Required(VERSION): vol.Coerce(QRCodeVersion), + vol.Required(SECURITY_CLASSES): vol.All( + cv.ensure_list, + [vol.Coerce(SecurityClass)], + ), + vol.Required(DSK): str, + vol.Required(GENERIC_DEVICE_CLASS): int, + vol.Required(SPECIFIC_DEVICE_CLASS): int, + vol.Required(INSTALLER_ICON_TYPE): int, + vol.Required(MANUFACTURER_ID): int, + vol.Required(PRODUCT_TYPE): int, + vol.Required(PRODUCT_ID): int, + vol.Required(APPLICATION_VERSION): str, + vol.Optional(MAX_INCLUSION_REQUEST_INTERVAL): vol.Any(int, None), + vol.Optional(UUID): vol.Any(str, None), + vol.Optional(SUPPORTED_PROTOCOLS): vol.All( + cv.ensure_list, + [vol.Coerce(Protocols)], + ), + } + ), + convert_qr_provisioning_information, +) + +QR_CODE_STRING_SCHEMA = vol.All(str, vol.Length(min=MINIMUM_QR_STRING_LENGTH)) + def async_get_entry(orig_func: Callable) -> Callable: """Decorate async function to get entry.""" @@ -194,6 +309,11 @@ def async_register_api(hass: HomeAssistant) -> None: websocket_api.async_register_command(hass, websocket_add_node) websocket_api.async_register_command(hass, websocket_grant_security_classes) websocket_api.async_register_command(hass, websocket_validate_dsk_and_enter_pin) + websocket_api.async_register_command(hass, websocket_provision_smart_start_node) + websocket_api.async_register_command(hass, websocket_unprovision_smart_start_node) + websocket_api.async_register_command(hass, websocket_get_provisioning_entries) + websocket_api.async_register_command(hass, websocket_parse_qr_code_string) + websocket_api.async_register_command(hass, websocket_supports_feature) websocket_api.async_register_command(hass, websocket_stop_inclusion) websocket_api.async_register_command(hass, websocket_stop_exclusion) websocket_api.async_register_command(hass, websocket_remove_node) @@ -434,9 +554,24 @@ async def websocket_ping_node( { vol.Required(TYPE): "zwave_js/add_node", vol.Required(ENTRY_ID): str, - vol.Optional(INCLUSION_STRATEGY, default=InclusionStrategy.DEFAULT): vol.In( - [strategy.value for strategy in InclusionStrategy] + vol.Optional(INCLUSION_STRATEGY, default=InclusionStrategy.DEFAULT): vol.All( + vol.Coerce(int), + vol.In( + [ + strategy.value + for strategy in InclusionStrategy + if strategy != InclusionStrategy.SMART_START + ] + ), ), + vol.Optional(FORCE_SECURITY): bool, + vol.Exclusive( + PLANNED_PROVISIONING_ENTRY, "options" + ): PLANNED_PROVISIONING_ENTRY_SCHEMA, + vol.Exclusive( + QR_PROVISIONING_INFORMATION, "options" + ): QR_PROVISIONING_INFORMATION_SCHEMA, + vol.Exclusive(QR_CODE_STRING, "options"): QR_CODE_STRING_SCHEMA, } ) @websocket_api.async_response @@ -452,6 +587,12 @@ async def websocket_add_node( """Add a node to the Z-Wave network.""" controller = client.driver.controller inclusion_strategy = InclusionStrategy(msg[INCLUSION_STRATEGY]) + force_security = msg.get(FORCE_SECURITY) + provisioning = ( + msg.get(PLANNED_PROVISIONING_ENTRY) + or msg.get(QR_PROVISIONING_INFORMATION) + or msg.get(QR_CODE_STRING) + ) @callback def async_cleanup() -> None: @@ -542,7 +683,18 @@ async def websocket_add_node( ), ] - result = await controller.async_begin_inclusion(inclusion_strategy) + try: + result = await controller.async_begin_inclusion( + inclusion_strategy, force_security=force_security, provisioning=provisioning + ) + except ValueError as err: + connection.send_error( + msg[ID], + ERR_INVALID_FORMAT, + err.args[0], + ) + return + connection.send_result( msg[ID], result, @@ -554,9 +706,10 @@ async def websocket_add_node( { vol.Required(TYPE): "zwave_js/grant_security_classes", vol.Required(ENTRY_ID): str, - vol.Required(SECURITY_CLASSES): [ - vol.In([sec_cls.value for sec_cls in SecurityClass]) - ], + vol.Required(SECURITY_CLASSES): vol.All( + cv.ensure_list, + [vol.Coerce(SecurityClass)], + ), vol.Optional(CLIENT_SIDE_AUTH, default=False): bool, } ) @@ -570,7 +723,7 @@ async def websocket_grant_security_classes( entry: ConfigEntry, client: Client, ) -> None: - """Add a node to the Z-Wave network.""" + """Choose SecurityClass grants as part of S2 inclusion process.""" inclusion_grant = InclusionGrant( [SecurityClass(sec_cls) for sec_cls in msg[SECURITY_CLASSES]], msg[CLIENT_SIDE_AUTH], @@ -597,11 +750,179 @@ async def websocket_validate_dsk_and_enter_pin( entry: ConfigEntry, client: Client, ) -> None: - """Add a node to the Z-Wave network.""" + """Validate DSK and enter PIN as part of S2 inclusion process.""" await client.driver.controller.async_validate_dsk_and_enter_pin(msg[PIN]) connection.send_result(msg[ID]) +@websocket_api.require_admin +@websocket_api.websocket_command( + { + vol.Required(TYPE): "zwave_js/provision_smart_start_node", + vol.Required(ENTRY_ID): str, + vol.Exclusive( + PLANNED_PROVISIONING_ENTRY, "options" + ): PLANNED_PROVISIONING_ENTRY_SCHEMA, + vol.Exclusive( + QR_PROVISIONING_INFORMATION, "options" + ): QR_PROVISIONING_INFORMATION_SCHEMA, + vol.Exclusive(QR_CODE_STRING, "options"): QR_CODE_STRING_SCHEMA, + } +) +@websocket_api.async_response +@async_handle_failed_command +@async_get_entry +async def websocket_provision_smart_start_node( + hass: HomeAssistant, + connection: ActiveConnection, + msg: dict, + entry: ConfigEntry, + client: Client, +) -> None: + """Pre-provision a smart start node.""" + try: + cv.has_at_least_one_key( + PLANNED_PROVISIONING_ENTRY, QR_PROVISIONING_INFORMATION, QR_CODE_STRING + )(msg) + except vol.Invalid as err: + connection.send_error( + msg[ID], + ERR_INVALID_FORMAT, + err.args[0], + ) + return + + provisioning_info = ( + msg.get(PLANNED_PROVISIONING_ENTRY) + or msg.get(QR_PROVISIONING_INFORMATION) + or msg[QR_CODE_STRING] + ) + + if ( + QR_PROVISIONING_INFORMATION in msg + and provisioning_info.version == QRCodeVersion.S2 + ): + connection.send_error( + msg[ID], + ERR_INVALID_FORMAT, + "QR code version S2 is not supported for this command", + ) + return + await client.driver.controller.async_provision_smart_start_node(provisioning_info) + connection.send_result(msg[ID]) + + +@websocket_api.require_admin +@websocket_api.websocket_command( + { + vol.Required(TYPE): "zwave_js/unprovision_smart_start_node", + vol.Required(ENTRY_ID): str, + vol.Exclusive(DSK, "input"): str, + vol.Exclusive(NODE_ID, "input"): int, + } +) +@websocket_api.async_response +@async_handle_failed_command +@async_get_entry +async def websocket_unprovision_smart_start_node( + hass: HomeAssistant, + connection: ActiveConnection, + msg: dict, + entry: ConfigEntry, + client: Client, +) -> None: + """Unprovision a smart start node.""" + try: + cv.has_at_least_one_key(DSK, NODE_ID)(msg) + except vol.Invalid as err: + connection.send_error( + msg[ID], + ERR_INVALID_FORMAT, + err.args[0], + ) + return + dsk_or_node_id = msg.get(DSK) or msg[NODE_ID] + await client.driver.controller.async_unprovision_smart_start_node(dsk_or_node_id) + connection.send_result(msg[ID]) + + +@websocket_api.require_admin +@websocket_api.websocket_command( + { + vol.Required(TYPE): "zwave_js/get_provisioning_entries", + vol.Required(ENTRY_ID): str, + } +) +@websocket_api.async_response +@async_handle_failed_command +@async_get_entry +async def websocket_get_provisioning_entries( + hass: HomeAssistant, + connection: ActiveConnection, + msg: dict, + entry: ConfigEntry, + client: Client, +) -> None: + """Get provisioning entries (entries that have been pre-provisioned).""" + provisioning_entries = ( + await client.driver.controller.async_get_provisioning_entries() + ) + connection.send_result( + msg[ID], [dataclasses.asdict(entry) for entry in provisioning_entries] + ) + + +@websocket_api.require_admin +@websocket_api.websocket_command( + { + vol.Required(TYPE): "zwave_js/parse_qr_code_string", + vol.Required(ENTRY_ID): str, + vol.Required(QR_CODE_STRING): QR_CODE_STRING_SCHEMA, + } +) +@websocket_api.async_response +@async_handle_failed_command +@async_get_entry +async def websocket_parse_qr_code_string( + hass: HomeAssistant, + connection: ActiveConnection, + msg: dict, + entry: ConfigEntry, + client: Client, +) -> None: + """Parse a QR Code String and return QRProvisioningInformation dict.""" + qr_provisioning_information = await async_parse_qr_code_string( + client, msg[QR_CODE_STRING] + ) + connection.send_result(msg[ID], dataclasses.asdict(qr_provisioning_information)) + + +@websocket_api.require_admin +@websocket_api.websocket_command( + { + vol.Required(TYPE): "zwave_js/supports_feature", + vol.Required(ENTRY_ID): str, + vol.Required(FEATURE): vol.Coerce(ZwaveFeature), + } +) +@websocket_api.async_response +@async_handle_failed_command +@async_get_entry +async def websocket_supports_feature( + hass: HomeAssistant, + connection: ActiveConnection, + msg: dict, + entry: ConfigEntry, + client: Client, +) -> None: + """Check if controller supports a particular feature.""" + supported = await client.driver.controller.async_supports_feature(msg[FEATURE]) + connection.send_result( + msg[ID], + {"supported": supported}, + ) + + @websocket_api.require_admin @websocket_api.websocket_command( { @@ -659,6 +980,7 @@ async def websocket_stop_exclusion( { vol.Required(TYPE): "zwave_js/remove_node", vol.Required(ENTRY_ID): str, + vol.Optional(UNPROVISION): bool, } ) @websocket_api.async_response @@ -707,7 +1029,7 @@ async def websocket_remove_node( controller.on("node removed", node_removed), ] - result = await controller.async_begin_exclusion() + result = await controller.async_begin_exclusion(msg.get(UNPROVISION)) connection.send_result( msg[ID], result, @@ -720,9 +1042,24 @@ async def websocket_remove_node( vol.Required(TYPE): "zwave_js/replace_failed_node", vol.Required(ENTRY_ID): str, vol.Required(NODE_ID): int, - vol.Optional(INCLUSION_STRATEGY, default=InclusionStrategy.DEFAULT): vol.In( - [strategy.value for strategy in InclusionStrategy] + vol.Optional(INCLUSION_STRATEGY, default=InclusionStrategy.DEFAULT): vol.All( + vol.Coerce(int), + vol.In( + [ + strategy.value + for strategy in InclusionStrategy + if strategy != InclusionStrategy.SMART_START + ] + ), ), + vol.Optional(FORCE_SECURITY): bool, + vol.Exclusive( + PLANNED_PROVISIONING_ENTRY, "options" + ): PLANNED_PROVISIONING_ENTRY_SCHEMA, + vol.Exclusive( + QR_PROVISIONING_INFORMATION, "options" + ): QR_PROVISIONING_INFORMATION_SCHEMA, + vol.Exclusive(QR_CODE_STRING, "options"): QR_CODE_STRING_SCHEMA, } ) @websocket_api.async_response @@ -739,6 +1076,12 @@ async def websocket_replace_failed_node( controller = client.driver.controller node_id = msg[NODE_ID] inclusion_strategy = InclusionStrategy(msg[INCLUSION_STRATEGY]) + force_security = msg.get(FORCE_SECURITY) + provisioning = ( + msg.get(PLANNED_PROVISIONING_ENTRY) + or msg.get(QR_PROVISIONING_INFORMATION) + or msg.get(QR_CODE_STRING) + ) @callback def async_cleanup() -> None: @@ -842,7 +1185,21 @@ async def websocket_replace_failed_node( ), ] - result = await controller.async_replace_failed_node(node_id, inclusion_strategy) + try: + result = await controller.async_replace_failed_node( + node_id, + inclusion_strategy, + force_security=force_security, + provisioning=provisioning, + ) + except ValueError as err: + connection.send_error( + msg[ID], + ERR_INVALID_FORMAT, + err.args[0], + ) + return + connection.send_result( msg[ID], result, @@ -1309,13 +1666,12 @@ async def websocket_subscribe_log_updates( { vol.Optional(ENABLED): cv.boolean, vol.Optional(LEVEL): vol.All( - cv.string, + str, vol.Lower, - vol.In([log_level.value for log_level in LogLevel]), - lambda val: LogLevel(val), # pylint: disable=unnecessary-lambda + vol.Coerce(LogLevel), ), vol.Optional(LOG_TO_FILE): cv.boolean, - vol.Optional(FILENAME): cv.string, + vol.Optional(FILENAME): str, vol.Optional(FORCE_CONSOLE): cv.boolean, } ), diff --git a/tests/components/zwave_js/test_api.py b/tests/components/zwave_js/test_api.py index e6bfbb45393..4ca733786dc 100644 --- a/tests/components/zwave_js/test_api.py +++ b/tests/components/zwave_js/test_api.py @@ -9,7 +9,10 @@ from zwave_js_server.const import ( CommandClass, InclusionStrategy, LogLevel, + Protocols, + QRCodeVersion, SecurityClass, + ZwaveFeature, ) from zwave_js_server.event import Event from zwave_js_server.exceptions import ( @@ -19,31 +22,49 @@ from zwave_js_server.exceptions import ( NotFoundError, SetValueFailed, ) +from zwave_js_server.model.controller import ( + ProvisioningEntry, + QRProvisioningInformation, +) from zwave_js_server.model.node import Node from zwave_js_server.model.value import _get_value_id_from_dict, get_value_id from homeassistant.components.websocket_api.const import ERR_NOT_FOUND from homeassistant.components.zwave_js.api import ( + APPLICATION_VERSION, CLIENT_SIDE_AUTH, COMMAND_CLASS_ID, CONFIG, + DSK, ENABLED, ENTRY_ID, ERR_NOT_LOADED, + FEATURE, FILENAME, FORCE_CONSOLE, + GENERIC_DEVICE_CLASS, ID, INCLUSION_STRATEGY, + INSTALLER_ICON_TYPE, LEVEL, LOG_TO_FILE, + MANUFACTURER_ID, NODE_ID, OPTED_IN, PIN, + PLANNED_PROVISIONING_ENTRY, + PRODUCT_ID, + PRODUCT_TYPE, PROPERTY, PROPERTY_KEY, + QR_CODE_STRING, + QR_PROVISIONING_INFORMATION, SECURITY_CLASSES, + SPECIFIC_DEVICE_CLASS, TYPE, + UNPROVISION, VALUE, + VERSION, ) from homeassistant.components.zwave_js.const import ( CONF_DATA_COLLECTION_OPTED_IN, @@ -421,9 +442,10 @@ async def test_add_node( client.async_send_command.return_value = {"success": True} + # Test inclusion with no provisioning input await ws_client.send_json( { - ID: 3, + ID: 1, TYPE: "zwave_js/add_node", ENTRY_ID: entry.entry_id, INCLUSION_STRATEGY: InclusionStrategy.DEFAULT.value, @@ -542,6 +564,193 @@ async def test_add_node( msg = await ws_client.receive_json() assert msg["event"]["event"] == "interview failed" + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + # Test S2 planned provisioning entry + await ws_client.send_json( + { + ID: 2, + TYPE: "zwave_js/add_node", + ENTRY_ID: entry.entry_id, + INCLUSION_STRATEGY: InclusionStrategy.SECURITY_S2.value, + PLANNED_PROVISIONING_ENTRY: { + DSK: "test", + SECURITY_CLASSES: [0], + }, + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.begin_inclusion", + "options": { + "strategy": InclusionStrategy.SECURITY_S2, + "provisioning": ProvisioningEntry( + "test", [SecurityClass.S2_UNAUTHENTICATED] + ).to_dict(), + }, + } + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + # Test S2 QR provisioning information + await ws_client.send_json( + { + ID: 3, + TYPE: "zwave_js/add_node", + ENTRY_ID: entry.entry_id, + INCLUSION_STRATEGY: InclusionStrategy.SECURITY_S2.value, + QR_PROVISIONING_INFORMATION: { + VERSION: 0, + SECURITY_CLASSES: [0], + DSK: "test", + GENERIC_DEVICE_CLASS: 1, + SPECIFIC_DEVICE_CLASS: 1, + INSTALLER_ICON_TYPE: 1, + MANUFACTURER_ID: 1, + PRODUCT_TYPE: 1, + PRODUCT_ID: 1, + APPLICATION_VERSION: "test", + }, + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.begin_inclusion", + "options": { + "strategy": InclusionStrategy.SECURITY_S2, + "provisioning": QRProvisioningInformation( + QRCodeVersion.S2, + [SecurityClass.S2_UNAUTHENTICATED], + "test", + 1, + 1, + 1, + 1, + 1, + 1, + "test", + None, + None, + None, + ).to_dict(), + }, + } + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + # Test S2 QR code string + await ws_client.send_json( + { + ID: 4, + TYPE: "zwave_js/add_node", + ENTRY_ID: entry.entry_id, + INCLUSION_STRATEGY: InclusionStrategy.SECURITY_S2.value, + QR_CODE_STRING: "90testtesttesttesttesttesttesttesttesttesttesttesttest", + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.begin_inclusion", + "options": { + "strategy": InclusionStrategy.SECURITY_S2, + "provisioning": "90testtesttesttesttesttesttesttesttesttesttesttesttest", + }, + } + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + # Test Smart Start QR provisioning information with S2 inclusion strategy fails + await ws_client.send_json( + { + ID: 5, + TYPE: "zwave_js/add_node", + ENTRY_ID: entry.entry_id, + INCLUSION_STRATEGY: InclusionStrategy.SECURITY_S2.value, + QR_PROVISIONING_INFORMATION: { + VERSION: 1, + SECURITY_CLASSES: [0], + DSK: "test", + GENERIC_DEVICE_CLASS: 1, + SPECIFIC_DEVICE_CLASS: 1, + INSTALLER_ICON_TYPE: 1, + MANUFACTURER_ID: 1, + PRODUCT_TYPE: 1, + PRODUCT_ID: 1, + APPLICATION_VERSION: "test", + }, + } + ) + + msg = await ws_client.receive_json() + assert not msg["success"] + + assert len(client.async_send_command.call_args_list) == 0 + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + # Test QR provisioning information with S0 inclusion strategy fails + await ws_client.send_json( + { + ID: 5, + TYPE: "zwave_js/add_node", + ENTRY_ID: entry.entry_id, + INCLUSION_STRATEGY: InclusionStrategy.SECURITY_S0, + QR_PROVISIONING_INFORMATION: { + VERSION: 1, + SECURITY_CLASSES: [0], + DSK: "test", + GENERIC_DEVICE_CLASS: 1, + SPECIFIC_DEVICE_CLASS: 1, + INSTALLER_ICON_TYPE: 1, + MANUFACTURER_ID: 1, + PRODUCT_TYPE: 1, + PRODUCT_ID: 1, + APPLICATION_VERSION: "test", + }, + } + ) + + msg = await ws_client.receive_json() + assert not msg["success"] + + assert len(client.async_send_command.call_args_list) == 0 + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {" success": True} + + # Test ValueError is caught as failure + await ws_client.send_json( + { + ID: 6, + TYPE: "zwave_js/add_node", + ENTRY_ID: entry.entry_id, + INCLUSION_STRATEGY: InclusionStrategy.DEFAULT.value, + QR_CODE_STRING: "90testtesttesttesttesttesttesttesttesttesttesttesttest", + } + ) + + msg = await ws_client.receive_json() + assert not msg["success"] + + assert len(client.async_send_command.call_args_list) == 0 + # Test FailedZWaveCommand is caught with patch( "zwave_js_server.model.controller.Controller.async_begin_inclusion", @@ -549,7 +758,7 @@ async def test_add_node( ): await ws_client.send_json( { - ID: 4, + ID: 7, TYPE: "zwave_js/add_node", ENTRY_ID: entry.entry_id, } @@ -565,7 +774,7 @@ async def test_add_node( await hass.async_block_till_done() await ws_client.send_json( - {ID: 5, TYPE: "zwave_js/add_node", ENTRY_ID: entry.entry_id} + {ID: 8, TYPE: "zwave_js/add_node", ENTRY_ID: entry.entry_id} ) msg = await ws_client.receive_json() @@ -661,6 +870,465 @@ async def test_validate_dsk_and_enter_pin(hass, integration, client, hass_ws_cli assert msg["error"]["code"] == ERR_NOT_LOADED +async def test_provision_smart_start_node(hass, integration, client, hass_ws_client): + """Test provision_smart_start_node websocket command.""" + entry = integration + ws_client = await hass_ws_client(hass) + + client.async_send_command.return_value = {"success": True} + + # Test provisioning entry + await ws_client.send_json( + { + ID: 2, + TYPE: "zwave_js/provision_smart_start_node", + ENTRY_ID: entry.entry_id, + PLANNED_PROVISIONING_ENTRY: { + DSK: "test", + SECURITY_CLASSES: [0], + }, + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.provision_smart_start_node", + "entry": ProvisioningEntry( + "test", [SecurityClass.S2_UNAUTHENTICATED] + ).to_dict(), + } + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + # Test QR provisioning information + await ws_client.send_json( + { + ID: 3, + TYPE: "zwave_js/provision_smart_start_node", + ENTRY_ID: entry.entry_id, + QR_PROVISIONING_INFORMATION: { + VERSION: 1, + SECURITY_CLASSES: [0], + DSK: "test", + GENERIC_DEVICE_CLASS: 1, + SPECIFIC_DEVICE_CLASS: 1, + INSTALLER_ICON_TYPE: 1, + MANUFACTURER_ID: 1, + PRODUCT_TYPE: 1, + PRODUCT_ID: 1, + APPLICATION_VERSION: "test", + }, + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.provision_smart_start_node", + "entry": QRProvisioningInformation( + QRCodeVersion.SMART_START, + [SecurityClass.S2_UNAUTHENTICATED], + "test", + 1, + 1, + 1, + 1, + 1, + 1, + "test", + None, + None, + None, + ).to_dict(), + } + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + # Test QR code string + await ws_client.send_json( + { + ID: 4, + TYPE: "zwave_js/provision_smart_start_node", + ENTRY_ID: entry.entry_id, + QR_CODE_STRING: "90testtesttesttesttesttesttesttesttesttesttesttesttest", + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.provision_smart_start_node", + "entry": "90testtesttesttesttesttesttesttesttesttesttesttesttest", + } + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + # Test QR provisioning information with S2 version throws error + await ws_client.send_json( + { + ID: 5, + TYPE: "zwave_js/provision_smart_start_node", + ENTRY_ID: entry.entry_id, + QR_PROVISIONING_INFORMATION: { + VERSION: 0, + SECURITY_CLASSES: [0], + DSK: "test", + GENERIC_DEVICE_CLASS: 1, + SPECIFIC_DEVICE_CLASS: 1, + INSTALLER_ICON_TYPE: 1, + MANUFACTURER_ID: 1, + PRODUCT_TYPE: 1, + PRODUCT_ID: 1, + APPLICATION_VERSION: "test", + }, + } + ) + + msg = await ws_client.receive_json() + assert not msg["success"] + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + assert len(client.async_send_command.call_args_list) == 0 + + # Test no provisioning parameter provided causes failure + await ws_client.send_json( + { + ID: 6, + TYPE: "zwave_js/provision_smart_start_node", + ENTRY_ID: entry.entry_id, + } + ) + msg = await ws_client.receive_json() + + assert not msg["success"] + + # Test FailedZWaveCommand is caught + with patch( + "zwave_js_server.model.controller.Controller.async_provision_smart_start_node", + side_effect=FailedZWaveCommand("failed_command", 1, "error message"), + ): + await ws_client.send_json( + { + ID: 7, + TYPE: "zwave_js/provision_smart_start_node", + ENTRY_ID: entry.entry_id, + QR_CODE_STRING: "90testtesttesttesttesttesttesttesttesttesttesttesttest", + } + ) + msg = await ws_client.receive_json() + + assert not msg["success"] + assert msg["error"]["code"] == "zwave_error" + assert msg["error"]["message"] == "Z-Wave error 1: error message" + + # Test sending command with not loaded entry fails + await hass.config_entries.async_unload(entry.entry_id) + await hass.async_block_till_done() + + await ws_client.send_json( + { + ID: 8, + TYPE: "zwave_js/provision_smart_start_node", + ENTRY_ID: entry.entry_id, + QR_CODE_STRING: "90testtesttesttesttesttesttesttesttesttesttesttesttest", + } + ) + msg = await ws_client.receive_json() + + assert not msg["success"] + assert msg["error"]["code"] == ERR_NOT_LOADED + + +async def test_unprovision_smart_start_node(hass, integration, client, hass_ws_client): + """Test unprovision_smart_start_node websocket command.""" + entry = integration + ws_client = await hass_ws_client(hass) + + client.async_send_command.return_value = {} + + # Test node ID as input + await ws_client.send_json( + { + ID: 1, + TYPE: "zwave_js/unprovision_smart_start_node", + ENTRY_ID: entry.entry_id, + NODE_ID: 1, + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.unprovision_smart_start_node", + "dskOrNodeId": 1, + } + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {} + + # Test DSK as input + await ws_client.send_json( + { + ID: 2, + TYPE: "zwave_js/unprovision_smart_start_node", + ENTRY_ID: entry.entry_id, + DSK: "test", + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.unprovision_smart_start_node", + "dskOrNodeId": "test", + } + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {} + + # Test not including DSK or node ID as input fails + await ws_client.send_json( + { + ID: 3, + TYPE: "zwave_js/unprovision_smart_start_node", + ENTRY_ID: entry.entry_id, + } + ) + + msg = await ws_client.receive_json() + assert not msg["success"] + + assert len(client.async_send_command.call_args_list) == 0 + + # Test FailedZWaveCommand is caught + with patch( + "zwave_js_server.model.controller.Controller.async_unprovision_smart_start_node", + side_effect=FailedZWaveCommand("failed_command", 1, "error message"), + ): + await ws_client.send_json( + { + ID: 6, + TYPE: "zwave_js/unprovision_smart_start_node", + ENTRY_ID: entry.entry_id, + DSK: "test", + } + ) + msg = await ws_client.receive_json() + + assert not msg["success"] + assert msg["error"]["code"] == "zwave_error" + assert msg["error"]["message"] == "Z-Wave error 1: error message" + + # Test sending command with not loaded entry fails + await hass.config_entries.async_unload(entry.entry_id) + await hass.async_block_till_done() + + await ws_client.send_json( + { + ID: 7, + TYPE: "zwave_js/unprovision_smart_start_node", + ENTRY_ID: entry.entry_id, + DSK: "test", + } + ) + msg = await ws_client.receive_json() + + assert not msg["success"] + assert msg["error"]["code"] == ERR_NOT_LOADED + + +async def test_get_provisioning_entries(hass, integration, client, hass_ws_client): + """Test get_provisioning_entries websocket command.""" + entry = integration + ws_client = await hass_ws_client(hass) + + client.async_send_command.return_value = { + "entries": [{"dsk": "test", "securityClasses": [0], "fake": "test"}] + } + + await ws_client.send_json( + { + ID: 1, + TYPE: "zwave_js/get_provisioning_entries", + ENTRY_ID: entry.entry_id, + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + assert msg["result"] == [ + { + "dsk": "test", + "security_classes": [SecurityClass.S2_UNAUTHENTICATED], + "additional_properties": {"fake": "test"}, + } + ] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.get_provisioning_entries", + } + + # Test FailedZWaveCommand is caught + with patch( + "zwave_js_server.model.controller.Controller.async_get_provisioning_entries", + side_effect=FailedZWaveCommand("failed_command", 1, "error message"), + ): + await ws_client.send_json( + { + ID: 6, + TYPE: "zwave_js/get_provisioning_entries", + ENTRY_ID: entry.entry_id, + } + ) + msg = await ws_client.receive_json() + + assert not msg["success"] + assert msg["error"]["code"] == "zwave_error" + assert msg["error"]["message"] == "Z-Wave error 1: error message" + + # Test sending command with not loaded entry fails + await hass.config_entries.async_unload(entry.entry_id) + await hass.async_block_till_done() + + await ws_client.send_json( + {ID: 7, TYPE: "zwave_js/get_provisioning_entries", ENTRY_ID: entry.entry_id} + ) + msg = await ws_client.receive_json() + + assert not msg["success"] + assert msg["error"]["code"] == ERR_NOT_LOADED + + +async def test_parse_qr_code_string(hass, integration, client, hass_ws_client): + """Test parse_qr_code_string websocket command.""" + entry = integration + ws_client = await hass_ws_client(hass) + + client.async_send_command.return_value = { + "qrProvisioningInformation": { + "version": 0, + "securityClasses": [0], + "dsk": "test", + "genericDeviceClass": 1, + "specificDeviceClass": 1, + "installerIconType": 1, + "manufacturerId": 1, + "productType": 1, + "productId": 1, + "applicationVersion": "test", + "maxInclusionRequestInterval": 1, + "uuid": "test", + "supportedProtocols": [0], + } + } + + await ws_client.send_json( + { + ID: 1, + TYPE: "zwave_js/parse_qr_code_string", + ENTRY_ID: entry.entry_id, + QR_CODE_STRING: "90testtesttesttesttesttesttesttesttesttesttesttesttest", + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + assert msg["result"] == { + "version": 0, + "security_classes": [SecurityClass.S2_UNAUTHENTICATED], + "dsk": "test", + "generic_device_class": 1, + "specific_device_class": 1, + "installer_icon_type": 1, + "manufacturer_id": 1, + "product_type": 1, + "product_id": 1, + "application_version": "test", + "max_inclusion_request_interval": 1, + "uuid": "test", + "supported_protocols": [Protocols.ZWAVE], + } + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "utils.parse_qr_code_string", + "qr": "90testtesttesttesttesttesttesttesttesttesttesttesttest", + } + + # Test FailedZWaveCommand is caught + with patch( + "homeassistant.components.zwave_js.api.async_parse_qr_code_string", + side_effect=FailedZWaveCommand("failed_command", 1, "error message"), + ): + await ws_client.send_json( + { + ID: 6, + TYPE: "zwave_js/parse_qr_code_string", + ENTRY_ID: entry.entry_id, + QR_CODE_STRING: "90testtesttesttesttesttesttesttesttesttesttesttesttest", + } + ) + msg = await ws_client.receive_json() + + assert not msg["success"] + assert msg["error"]["code"] == "zwave_error" + assert msg["error"]["message"] == "Z-Wave error 1: error message" + + # Test sending command with not loaded entry fails + await hass.config_entries.async_unload(entry.entry_id) + await hass.async_block_till_done() + + await ws_client.send_json( + { + ID: 7, + TYPE: "zwave_js/parse_qr_code_string", + ENTRY_ID: entry.entry_id, + QR_CODE_STRING: "90testtesttesttesttesttesttesttesttesttesttesttesttest", + } + ) + msg = await ws_client.receive_json() + + assert not msg["success"] + assert msg["error"]["code"] == ERR_NOT_LOADED + + +async def test_supports_feature(hass, integration, client, hass_ws_client): + """Test supports_feature websocket command.""" + entry = integration + ws_client = await hass_ws_client(hass) + + client.async_send_command.return_value = {"supported": True} + + await ws_client.send_json( + { + ID: 1, + TYPE: "zwave_js/supports_feature", + ENTRY_ID: entry.entry_id, + FEATURE: ZwaveFeature.SMART_START, + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + assert msg["result"] == {"supported": True} + + async def test_cancel_inclusion_exclusion(hass, integration, client, hass_ws_client): """Test cancelling the inclusion and exclusion process.""" entry = integration @@ -754,12 +1422,17 @@ async def test_remove_node( client.async_send_command.return_value = {"success": True} await ws_client.send_json( - {ID: 3, TYPE: "zwave_js/remove_node", ENTRY_ID: entry.entry_id} + {ID: 1, TYPE: "zwave_js/remove_node", ENTRY_ID: entry.entry_id} ) msg = await ws_client.receive_json() assert msg["success"] + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.begin_exclusion", + } + event = Event( type="exclusion started", data={ @@ -792,6 +1465,28 @@ async def test_remove_node( ) assert device is None + # Test unprovision parameter + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + await ws_client.send_json( + { + ID: 2, + TYPE: "zwave_js/remove_node", + ENTRY_ID: entry.entry_id, + UNPROVISION: True, + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.begin_exclusion", + "unprovision": True, + } + # Test FailedZWaveCommand is caught with patch( "zwave_js_server.model.controller.Controller.async_begin_exclusion", @@ -847,11 +1542,12 @@ async def test_replace_failed_node( client.async_send_command.return_value = {"success": True} + # Test replace failed node with no provisioning information # Order of events we receive for a successful replacement is `inclusion started`, # `inclusion stopped`, `node removed`, `node added`, then interview stages. await ws_client.send_json( { - ID: 3, + ID: 1, TYPE: "zwave_js/replace_failed_node", ENTRY_ID: entry.entry_id, NODE_ID: 67, @@ -997,6 +1693,140 @@ async def test_replace_failed_node( msg = await ws_client.receive_json() assert msg["event"]["event"] == "interview failed" + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + # Test S2 planned provisioning entry + await ws_client.send_json( + { + ID: 2, + TYPE: "zwave_js/replace_failed_node", + ENTRY_ID: entry.entry_id, + NODE_ID: 67, + INCLUSION_STRATEGY: InclusionStrategy.SECURITY_S2.value, + PLANNED_PROVISIONING_ENTRY: { + DSK: "test", + SECURITY_CLASSES: [0], + }, + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.replace_failed_node", + "nodeId": 67, + "options": { + "strategy": InclusionStrategy.SECURITY_S2, + "provisioning": ProvisioningEntry( + "test", [SecurityClass.S2_UNAUTHENTICATED] + ).to_dict(), + }, + } + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + # Test S2 QR provisioning information + await ws_client.send_json( + { + ID: 3, + TYPE: "zwave_js/replace_failed_node", + ENTRY_ID: entry.entry_id, + NODE_ID: 67, + INCLUSION_STRATEGY: InclusionStrategy.SECURITY_S2.value, + QR_PROVISIONING_INFORMATION: { + VERSION: 0, + SECURITY_CLASSES: [0], + DSK: "test", + GENERIC_DEVICE_CLASS: 1, + SPECIFIC_DEVICE_CLASS: 1, + INSTALLER_ICON_TYPE: 1, + MANUFACTURER_ID: 1, + PRODUCT_TYPE: 1, + PRODUCT_ID: 1, + APPLICATION_VERSION: "test", + }, + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.replace_failed_node", + "nodeId": 67, + "options": { + "strategy": InclusionStrategy.SECURITY_S2, + "provisioning": QRProvisioningInformation( + QRCodeVersion.S2, + [SecurityClass.S2_UNAUTHENTICATED], + "test", + 1, + 1, + 1, + 1, + 1, + 1, + "test", + None, + None, + None, + ).to_dict(), + }, + } + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + # Test S2 QR code string + await ws_client.send_json( + { + ID: 4, + TYPE: "zwave_js/replace_failed_node", + ENTRY_ID: entry.entry_id, + NODE_ID: 67, + INCLUSION_STRATEGY: InclusionStrategy.SECURITY_S2.value, + QR_CODE_STRING: "90testtesttesttesttesttesttesttesttesttesttesttesttest", + } + ) + + msg = await ws_client.receive_json() + assert msg["success"] + + assert len(client.async_send_command.call_args_list) == 1 + assert client.async_send_command.call_args[0][0] == { + "command": "controller.replace_failed_node", + "nodeId": 67, + "options": { + "strategy": InclusionStrategy.SECURITY_S2, + "provisioning": "90testtesttesttesttesttesttesttesttesttesttesttesttest", + }, + } + + client.async_send_command.reset_mock() + client.async_send_command.return_value = {"success": True} + + # Test ValueError is caught as failure + await ws_client.send_json( + { + ID: 6, + TYPE: "zwave_js/replace_failed_node", + ENTRY_ID: entry.entry_id, + NODE_ID: 67, + INCLUSION_STRATEGY: InclusionStrategy.DEFAULT.value, + QR_CODE_STRING: "90testtesttesttesttesttesttesttesttesttesttesttesttest", + } + ) + + msg = await ws_client.receive_json() + assert not msg["success"] + + assert len(client.async_send_command.call_args_list) == 0 + # Test FailedZWaveCommand is caught with patch( "zwave_js_server.model.controller.Controller.async_replace_failed_node", @@ -1004,7 +1834,7 @@ async def test_replace_failed_node( ): await ws_client.send_json( { - ID: 4, + ID: 7, TYPE: "zwave_js/replace_failed_node", ENTRY_ID: entry.entry_id, NODE_ID: 67, @@ -1022,7 +1852,7 @@ async def test_replace_failed_node( await ws_client.send_json( { - ID: 5, + ID: 8, TYPE: "zwave_js/replace_failed_node", ENTRY_ID: entry.entry_id, NODE_ID: 67, @@ -2324,7 +3154,7 @@ async def test_update_log_config(hass, client, integration, hass_ws_client): ) msg = await ws_client.receive_json() assert not msg["success"] - assert "error" in msg and "value must be one of" in msg["error"]["message"] + assert "error" in msg and msg["error"]["code"] == "invalid_format" # Test error without service data await ws_client.send_json(