hass-core/tests/components/zha/test_cover.py
Alexei Chetroi 3385893b77
ZHA device channel refactoring (#31971)
* Add ZHA core typing helper.
* Add aux_channels to ZHA rule matching.
* Add match rule claim_channels() method.
* Expose underlying zigpy device.
* Not sure we need this one.
* Move "base" channels.
* Framework for channel discovery.
* Make DEVICE_CLASS and REMOTE_DEVICE_TYPE default dicts.
* Remove attribute reporting configuration registry.
* Refactor channels.
- Refactor zha events
- Use compound IDs and unique_ids
- Refactor signal dispatching on attribute updates

* Use unique id compatible with entities unique ids.
* Refactor ZHA Entity registry.
Let match rule to check for the match.

* Refactor discovery to use new channels.
* Cleanup ZDO channel.
Remove unused zha store call.

* Handle channel configuration and initialization.
* Refactor ZHA Device to use new channels.
* Refactor ZHA Gateway to use new discovery framework.
Use hass.data for entity info intermediate store.

* Don't keep entities in hass.data.
* ZHA gateway new discovery framework.
* Refactor ZHA platform loading.
* Don't update ZHA entities, when restoring from zigpy.
* ZHA entity discover tests.
* Add AnalogInput sensor.
* Remove 0xFC02 based entity from Keen smart vents.
* Clean up IAS channels.
* Refactor entity restoration.
* Fix lumi.router entities name.
* Rename EndpointsChannel to ChannelPool.
* Make Channels.pools a list.
* Fix cover test.
* Fix FakeDevice class.
* Fix device actions.
* Fix channels typing.
* Revert update_before_add=False
* Refactor channel class matching.
* Use a helper function for adding entities.
* Make Pylint happy.
* Rebase cleanup.
* Update coverage for ZHA device type overrides.
* Use cluster_id for single output cluster registry.
* Remove ZHA typing from coverage.
* Fix tests.
* Address comments.
* Address comments.
2020-02-21 18:06:57 -05:00

135 lines
4.4 KiB
Python

"""Test zha cover."""
from unittest.mock import MagicMock, call, patch
import asynctest
import pytest
import zigpy.types
import zigpy.zcl.clusters.closures as closures
import zigpy.zcl.foundation as zcl_f
from homeassistant.components.cover import DOMAIN
from homeassistant.const import STATE_CLOSED, STATE_OPEN, STATE_UNAVAILABLE
from .common import (
async_enable_traffic,
async_test_rejoin,
find_entity_id,
make_attribute,
make_zcl_header,
)
from tests.common import mock_coro
@pytest.fixture
def zigpy_cover_device(zigpy_device_mock):
"""Zigpy cover device."""
endpoints = {
1: {
"device_type": 1026,
"in_clusters": [closures.WindowCovering.cluster_id],
"out_clusters": [],
}
}
return zigpy_device_mock(endpoints)
@asynctest.patch(
"homeassistant.components.zha.core.channels.closures.WindowCovering.async_initialize"
)
async def test_cover(m1, hass, zha_device_joined_restored, zigpy_cover_device):
"""Test zha cover platform."""
async def get_chan_attr(*args, **kwargs):
return 100
with patch(
"homeassistant.components.zha.core.channels.base.ZigbeeChannel.get_attribute_value",
new=MagicMock(side_effect=get_chan_attr),
) as get_attr_mock:
# load up cover domain
zha_device = await zha_device_joined_restored(zigpy_cover_device)
assert get_attr_mock.call_count == 2
assert get_attr_mock.call_args[0][0] == "current_position_lift_percentage"
cluster = zigpy_cover_device.endpoints.get(1).window_covering
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
assert entity_id is not None
# test that the cover was created and that it is unavailable
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, [zha_device])
await hass.async_block_till_done()
attr = make_attribute(8, 100)
hdr = make_zcl_header(zcl_f.Command.Report_Attributes)
cluster.handle_message(hdr, [[attr]])
await hass.async_block_till_done()
# test that the state has changed from unavailable to off
assert hass.states.get(entity_id).state == STATE_CLOSED
# test to see if it opens
attr = make_attribute(8, 0)
hdr = make_zcl_header(zcl_f.Command.Report_Attributes)
cluster.handle_message(hdr, [[attr]])
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OPEN
# close from UI
with patch(
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x1, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, "close_cover", {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, 0x1, (), expect_reply=True, manufacturer=None
)
# open from UI
with patch(
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x0, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, "open_cover", {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, 0x0, (), expect_reply=True, manufacturer=None
)
# set position UI
with patch(
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x5, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN,
"set_cover_position",
{"entity_id": entity_id, "position": 47},
blocking=True,
)
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, 0x5, (zigpy.types.uint8_t,), 53, expect_reply=True, manufacturer=None
)
# stop from UI
with patch(
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x2, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, "stop_cover", {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, 0x2, (), expect_reply=True, manufacturer=None
)
# test rejoin
await async_test_rejoin(hass, zigpy_cover_device, [cluster], (1,))
assert hass.states.get(entity_id).state == STATE_OPEN