Bump ZHA dependencies (#93989)
* Make `find_entity_id` synchronous * Remove `tries` * Use new `attribute_updated` event signature * Validate attributes before creating entities * Avoid swallowing exceptions when opening covers * Bump ZHA dependencies * Add a matcher for Sinope water leak sensors using a non-standard ZCL attribute * Ensure handler matching is strict, not multi * Add type annotations for newly-updated functions
This commit is contained in:
parent
584967a35a
commit
22dfa8797f
32 changed files with 221 additions and 308 deletions
|
@ -1,6 +1,6 @@
|
|||
"""Test ZHA cover."""
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import zigpy.profiles.zha
|
||||
|
@ -36,7 +36,7 @@ from .common import (
|
|||
)
|
||||
from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE
|
||||
|
||||
from tests.common import async_capture_events, mock_coro, mock_restore_cache
|
||||
from tests.common import async_capture_events, mock_restore_cache
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
|
@ -132,7 +132,7 @@ async def test_cover(
|
|||
assert cluster.read_attributes.call_count == 1
|
||||
assert "current_position_lift_percentage" in cluster.read_attributes.call_args[0][0]
|
||||
|
||||
entity_id = await find_entity_id(Platform.COVER, zha_device, hass)
|
||||
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
|
||||
assert entity_id is not None
|
||||
|
||||
await async_enable_traffic(hass, [zha_device], enabled=False)
|
||||
|
@ -152,9 +152,7 @@ async def test_cover(
|
|||
assert hass.states.get(entity_id).state == STATE_OPEN
|
||||
|
||||
# close from UI
|
||||
with patch(
|
||||
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x1, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN, SERVICE_CLOSE_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
|
@ -165,9 +163,7 @@ async def test_cover(
|
|||
assert cluster.request.call_args[1]["expect_reply"] is True
|
||||
|
||||
# open from UI
|
||||
with patch(
|
||||
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x0, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN, SERVICE_OPEN_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
|
@ -178,9 +174,7 @@ async def test_cover(
|
|||
assert cluster.request.call_args[1]["expect_reply"] is True
|
||||
|
||||
# set position UI
|
||||
with patch(
|
||||
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x5, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN,
|
||||
SERVICE_SET_COVER_POSITION,
|
||||
|
@ -195,9 +189,7 @@ async def test_cover(
|
|||
assert cluster.request.call_args[1]["expect_reply"] is True
|
||||
|
||||
# stop from UI
|
||||
with patch(
|
||||
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x2, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN, SERVICE_STOP_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
|
@ -223,7 +215,7 @@ async def test_shade(
|
|||
|
||||
cluster_on_off = zigpy_shade_device.endpoints.get(1).on_off
|
||||
cluster_level = zigpy_shade_device.endpoints.get(1).level
|
||||
entity_id = await find_entity_id(Platform.COVER, zha_device, hass)
|
||||
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
|
||||
assert entity_id is not None
|
||||
|
||||
await async_enable_traffic(hass, [zha_device], enabled=False)
|
||||
|
@ -244,17 +236,19 @@ async def test_shade(
|
|||
|
||||
# close from UI command fails
|
||||
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN, SERVICE_CLOSE_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
assert cluster_on_off.request.call_count == 1
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN,
|
||||
SERVICE_CLOSE_COVER,
|
||||
{"entity_id": entity_id},
|
||||
blocking=True,
|
||||
)
|
||||
assert cluster_on_off.request.call_count == 3
|
||||
assert cluster_on_off.request.call_args[0][0] is False
|
||||
assert cluster_on_off.request.call_args[0][1] == 0x0000
|
||||
assert hass.states.get(entity_id).state == STATE_OPEN
|
||||
|
||||
with patch(
|
||||
"zigpy.zcl.Cluster.request", AsyncMock(return_value=[0x1, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN, SERVICE_CLOSE_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
|
@ -267,18 +261,20 @@ async def test_shade(
|
|||
assert ATTR_CURRENT_POSITION not in hass.states.get(entity_id).attributes
|
||||
await send_attributes_report(hass, cluster_level, {0: 0})
|
||||
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN, SERVICE_OPEN_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
assert cluster_on_off.request.call_count == 1
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN,
|
||||
SERVICE_OPEN_COVER,
|
||||
{"entity_id": entity_id},
|
||||
blocking=True,
|
||||
)
|
||||
assert cluster_on_off.request.call_count == 3
|
||||
assert cluster_on_off.request.call_args[0][0] is False
|
||||
assert cluster_on_off.request.call_args[0][1] == 0x0001
|
||||
assert hass.states.get(entity_id).state == STATE_CLOSED
|
||||
|
||||
# open from UI succeeds
|
||||
with patch(
|
||||
"zigpy.zcl.Cluster.request", AsyncMock(return_value=[0x0, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN, SERVICE_OPEN_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
|
@ -289,22 +285,21 @@ async def test_shade(
|
|||
|
||||
# set position UI command fails
|
||||
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN,
|
||||
SERVICE_SET_COVER_POSITION,
|
||||
{"entity_id": entity_id, "position": 47},
|
||||
blocking=True,
|
||||
)
|
||||
assert cluster_level.request.call_count == 1
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN,
|
||||
SERVICE_SET_COVER_POSITION,
|
||||
{"entity_id": entity_id, "position": 47},
|
||||
blocking=True,
|
||||
)
|
||||
assert cluster_level.request.call_count == 3
|
||||
assert cluster_level.request.call_args[0][0] is False
|
||||
assert cluster_level.request.call_args[0][1] == 0x0004
|
||||
assert int(cluster_level.request.call_args[0][3] * 100 / 255) == 47
|
||||
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 0
|
||||
|
||||
# set position UI success
|
||||
with patch(
|
||||
"zigpy.zcl.Cluster.request", AsyncMock(return_value=[0x5, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN,
|
||||
SERVICE_SET_COVER_POSITION,
|
||||
|
@ -331,13 +326,14 @@ async def test_shade(
|
|||
|
||||
# test cover stop
|
||||
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN,
|
||||
SERVICE_STOP_COVER,
|
||||
{"entity_id": entity_id},
|
||||
blocking=True,
|
||||
)
|
||||
assert cluster_level.request.call_count == 1
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN,
|
||||
SERVICE_STOP_COVER,
|
||||
{"entity_id": entity_id},
|
||||
blocking=True,
|
||||
)
|
||||
assert cluster_level.request.call_count == 3
|
||||
assert cluster_level.request.call_args[0][0] is False
|
||||
assert cluster_level.request.call_args[0][1] in (0x0003, 0x0007)
|
||||
|
||||
|
@ -361,7 +357,7 @@ async def test_restore_state(
|
|||
hass.state = CoreState.starting
|
||||
|
||||
zha_device = await zha_device_restored(zigpy_shade_device)
|
||||
entity_id = await find_entity_id(Platform.COVER, zha_device, hass)
|
||||
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
|
||||
assert entity_id is not None
|
||||
|
||||
# test that the cover was created and that it is unavailable
|
||||
|
@ -379,7 +375,7 @@ async def test_keen_vent(
|
|||
|
||||
cluster_on_off = zigpy_keen_vent.endpoints.get(1).on_off
|
||||
cluster_level = zigpy_keen_vent.endpoints.get(1).level
|
||||
entity_id = await find_entity_id(Platform.COVER, zha_device, hass)
|
||||
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
|
||||
assert entity_id is not None
|
||||
|
||||
await async_enable_traffic(hass, [zha_device], enabled=False)
|
||||
|
@ -396,21 +392,25 @@ async def test_keen_vent(
|
|||
|
||||
# open from UI command fails
|
||||
p1 = patch.object(cluster_on_off, "request", side_effect=asyncio.TimeoutError)
|
||||
p2 = patch.object(cluster_level, "request", AsyncMock(return_value=[4, 0]))
|
||||
p2 = patch.object(cluster_level, "request", return_value=[4, 0])
|
||||
|
||||
with p1, p2:
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN, SERVICE_OPEN_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
assert cluster_on_off.request.call_count == 1
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN,
|
||||
SERVICE_OPEN_COVER,
|
||||
{"entity_id": entity_id},
|
||||
blocking=True,
|
||||
)
|
||||
assert cluster_on_off.request.call_count == 3
|
||||
assert cluster_on_off.request.call_args[0][0] is False
|
||||
assert cluster_on_off.request.call_args[0][1] == 0x0001
|
||||
assert cluster_level.request.call_count == 1
|
||||
assert hass.states.get(entity_id).state == STATE_CLOSED
|
||||
|
||||
# open from UI command success
|
||||
p1 = patch.object(cluster_on_off, "request", AsyncMock(return_value=[1, 0]))
|
||||
p2 = patch.object(cluster_level, "request", AsyncMock(return_value=[4, 0]))
|
||||
p1 = patch.object(cluster_on_off, "request", return_value=[1, 0])
|
||||
p2 = patch.object(cluster_level, "request", return_value=[4, 0])
|
||||
|
||||
with p1, p2:
|
||||
await hass.services.async_call(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue