Add controller support to zwave_js/subscribe_firmware_update_status (#87348)

This commit is contained in:
Raman Gupta 2023-02-22 12:08:57 -05:00 committed by GitHub
parent 1cf5b7c9f7
commit 05a7df5629
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 195 additions and 13 deletions

View file

@ -3872,6 +3872,121 @@ async def test_subscribe_firmware_update_status_initial_value(
}
async def test_subscribe_controller_firmware_update_status(
hass, integration, client, hass_ws_client
):
"""Test the subscribe_firmware_update_status websocket command for a node."""
ws_client = await hass_ws_client(hass)
device = get_device(hass, client.driver.controller.nodes[1])
client.async_send_command_no_wait.return_value = {}
await ws_client.send_json(
{
ID: 1,
TYPE: "zwave_js/subscribe_firmware_update_status",
DEVICE_ID: device.id,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert msg["result"] is None
event = Event(
type="firmware update progress",
data={
"source": "controller",
"event": "firmware update progress",
"progress": {
"sentFragments": 1,
"totalFragments": 10,
"progress": 10.0,
},
},
)
client.driver.controller.receive_event(event)
msg = await ws_client.receive_json()
assert msg["event"] == {
"event": "firmware update progress",
"current_file": 1,
"total_files": 1,
"sent_fragments": 1,
"total_fragments": 10,
"progress": 10.0,
}
event = Event(
type="firmware update finished",
data={
"source": "controller",
"event": "firmware update finished",
"result": {
"status": 255,
"success": True,
},
},
)
client.driver.controller.receive_event(event)
msg = await ws_client.receive_json()
assert msg["event"] == {
"event": "firmware update finished",
"status": 255,
"success": True,
}
async def test_subscribe_controller_firmware_update_status_initial_value(
hass, client, integration, hass_ws_client
):
"""Test subscribe_firmware_update_status cmd with in progress update for node."""
ws_client = await hass_ws_client(hass)
device = get_device(hass, client.driver.controller.nodes[1])
assert client.driver.controller.firmware_update_progress is None
# Send a firmware update progress event before the WS command
event = Event(
type="firmware update progress",
data={
"source": "controller",
"event": "firmware update progress",
"progress": {
"sentFragments": 1,
"totalFragments": 10,
"progress": 10.0,
},
},
)
client.driver.controller.receive_event(event)
client.async_send_command_no_wait.return_value = {}
await ws_client.send_json(
{
ID: 1,
TYPE: "zwave_js/subscribe_firmware_update_status",
DEVICE_ID: device.id,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert msg["result"] is None
msg = await ws_client.receive_json()
assert msg["event"] == {
"event": "firmware update progress",
"current_file": 1,
"total_files": 1,
"sent_fragments": 1,
"total_fragments": 10,
"progress": 10.0,
}
async def test_subscribe_firmware_update_status_failures(
hass: HomeAssistant,
multisensor_6,