From f1693e243383fb9f1ea90fdf6d1f946e16b57fc3 Mon Sep 17 00:00:00 2001 From: Alexei Chetroi Date: Wed, 18 Nov 2020 19:12:38 -0500 Subject: [PATCH] Refactor ZHA tests to allow attribute reads during device initialization (#43357) * Allow plugging zigpy attribute reads in tests * Migrate ZHA tests to use new patched attribute reads * Remove logging in tests --- tests/components/zha/common.py | 26 ++++++++++++++++++++++++-- tests/components/zha/test_climate.py | 14 +------------- tests/components/zha/test_cover.py | 20 +++++++------------- 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/tests/components/zha/common.py b/tests/components/zha/common.py index 11f390b202e..249e1bf58b2 100644 --- a/tests/components/zha/common.py +++ b/tests/components/zha/common.py @@ -70,12 +70,34 @@ FakeEndpoint.remove_from_group = zigpy_ep.remove_from_group def patch_cluster(cluster): """Patch a cluster for testing.""" + cluster.PLUGGED_ATTR_READS = {} + + async def _read_attribute_raw(attributes, *args, **kwargs): + result = [] + for attr_id in attributes: + value = cluster.PLUGGED_ATTR_READS.get(attr_id) + if value is None: + # try converting attr_id to attr_name and lookup the plugs again + attr_name = cluster.attributes.get(attr_id) + value = attr_name and cluster.PLUGGED_ATTR_READS.get(attr_name[0]) + if value is not None: + result.append( + zcl_f.ReadAttributeRecord( + attr_id, + zcl_f.Status.SUCCESS, + zcl_f.TypeValue(python_type=None, value=value), + ) + ) + else: + result.append(zcl_f.ReadAttributeRecord(attr_id, zcl_f.Status.FAILURE)) + return (result,) + cluster.bind = AsyncMock(return_value=[0]) cluster.configure_reporting = AsyncMock(return_value=[0]) cluster.deserialize = Mock() cluster.handle_cluster_request = Mock() - cluster.read_attributes = AsyncMock(return_value=[{}, {}]) - cluster.read_attributes_raw = Mock() + cluster.read_attributes = AsyncMock(wraps=cluster.read_attributes) + cluster.read_attributes_raw = AsyncMock(side_effect=_read_attribute_raw) cluster.unbind = AsyncMock(return_value=[0]) cluster.write_attributes = AsyncMock( return_value=[zcl_f.WriteAttributesResponse.deserialize(b"\x00")[0]] diff --git a/tests/components/zha/test_climate.py b/tests/components/zha/test_climate.py index fe69e6536d3..cc152c1a36d 100644 --- a/tests/components/zha/test_climate.py +++ b/tests/components/zha/test_climate.py @@ -140,19 +140,8 @@ def device_climate_mock(hass, zigpy_device_mock, zha_device_joined): else: plugged_attrs = {**ZCL_ATTR_PLUG, **plug} - async def _read_attr(attrs, *args, **kwargs): - res = {} - failed = {} - - for attr in attrs: - if attr in plugged_attrs: - res[attr] = plugged_attrs[attr] - else: - failed[attr] = zcl_f.Status.UNSUPPORTED_ATTRIBUTE - return res, failed - zigpy_device = zigpy_device_mock(clusters, manufacturer=manuf) - zigpy_device.endpoints[1].thermostat.read_attributes.side_effect = _read_attr + zigpy_device.endpoints[1].thermostat.PLUGGED_ATTR_READS = plugged_attrs zha_device = await zha_device_joined(zigpy_device) await async_enable_traffic(hass, [zha_device]) await hass.async_block_till_done() @@ -1039,7 +1028,6 @@ async def test_occupancy_reset(hass, device_climate_sinope): state = hass.states.get(entity_id) assert state.attributes[ATTR_PRESET_MODE] == PRESET_AWAY - thrm_cluster.read_attributes.return_value = [True], {} await send_attributes_report( hass, thrm_cluster, {"occupied_heating_setpoint": 1950} ) diff --git a/tests/components/zha/test_cover.py b/tests/components/zha/test_cover.py index 783637d26d7..97fa5c7579d 100644 --- a/tests/components/zha/test_cover.py +++ b/tests/components/zha/test_cover.py @@ -32,7 +32,7 @@ from .common import ( send_attributes_report, ) -from tests.async_mock import AsyncMock, MagicMock, patch +from tests.async_mock import AsyncMock, patch from tests.common import async_capture_events, mock_coro, mock_restore_cache @@ -104,19 +104,13 @@ def zigpy_keen_vent(zigpy_device_mock): async def test_cover(m1, hass, zha_device_joined_restored, zigpy_cover_device): """Test zha cover platform.""" - async def get_chan_attr(*args, **kwargs): - return 100 - - with patch( - "homeassistant.components.zha.core.channels.base.ZigbeeChannel.get_attribute_value", - new=MagicMock(side_effect=get_chan_attr), - ) as get_attr_mock: - # load up cover domain - zha_device = await zha_device_joined_restored(zigpy_cover_device) - assert get_attr_mock.call_count == 2 - assert get_attr_mock.call_args[0][0] == "current_position_lift_percentage" - + # load up cover domain cluster = zigpy_cover_device.endpoints.get(1).window_covering + cluster.PLUGGED_ATTR_READS = {"current_position_lift_percentage": 100} + zha_device = await zha_device_joined_restored(zigpy_cover_device) + assert cluster.read_attributes.call_count == 2 + assert "current_position_lift_percentage" in cluster.read_attributes.call_args[0][0] + entity_id = await find_entity_id(DOMAIN, zha_device, hass) assert entity_id is not None