Test edge cases in wemo platform code (#44136)
This commit is contained in:
parent
dd4147fbaa
commit
8fe5e61cbf
8 changed files with 322 additions and 8 deletions
|
@ -208,7 +208,7 @@ class WemoLight(LightEntity):
|
|||
except (AttributeError, ActionException) as err:
|
||||
_LOGGER.warning("Could not update status for %s (%s)", self.name, err)
|
||||
self._available = False
|
||||
self.wemo.reconnect_with_device()
|
||||
self.wemo.bridge.reconnect_with_device()
|
||||
else:
|
||||
self._is_on = self._state.get("onoff") != WEMO_OFF
|
||||
self._brightness = self._state.get("level", 255)
|
||||
|
|
|
@ -23,7 +23,7 @@ def pywemo_model_fixture():
|
|||
@pytest.fixture(name="pywemo_registry")
|
||||
def pywemo_registry_fixture():
|
||||
"""Fixture for SubscriptionRegistry instances."""
|
||||
registry = create_autospec(pywemo.SubscriptionRegistry)
|
||||
registry = create_autospec(pywemo.SubscriptionRegistry, instance=True)
|
||||
|
||||
registry.callbacks = {}
|
||||
|
||||
|
@ -39,12 +39,13 @@ def pywemo_registry_fixture():
|
|||
@pytest.fixture(name="pywemo_device")
|
||||
def pywemo_device_fixture(pywemo_registry, pywemo_model):
|
||||
"""Fixture for WeMoDevice instances."""
|
||||
device = create_autospec(getattr(pywemo, pywemo_model))
|
||||
device = create_autospec(getattr(pywemo, pywemo_model), instance=True)
|
||||
device.host = MOCK_HOST
|
||||
device.port = MOCK_PORT
|
||||
device.name = MOCK_NAME
|
||||
device.serialnumber = MOCK_SERIAL_NUMBER
|
||||
device.model_name = pywemo_model
|
||||
device.get_state.return_value = 0 # Default to Off
|
||||
|
||||
url = f"http://{MOCK_HOST}:{MOCK_PORT}/setup.xml"
|
||||
with patch("pywemo.setup_url_for_address", return_value=url), patch(
|
||||
|
|
167
tests/components/wemo/entity_test_helpers.py
Normal file
167
tests/components/wemo/entity_test_helpers.py
Normal file
|
@ -0,0 +1,167 @@
|
|||
"""Test cases that are in common among wemo platform modules.
|
||||
|
||||
This is not a test module. These test methods are used by the platform test modules.
|
||||
"""
|
||||
import asyncio
|
||||
import threading
|
||||
|
||||
from homeassistant.components.homeassistant import (
|
||||
DOMAIN as HA_DOMAIN,
|
||||
SERVICE_UPDATE_ENTITY,
|
||||
)
|
||||
from homeassistant.const import ATTR_ENTITY_ID, STATE_OFF, STATE_UNAVAILABLE
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from tests.async_mock import patch
|
||||
|
||||
|
||||
def _perform_registry_callback(hass, pywemo_registry, pywemo_device):
|
||||
"""Return a callable method to trigger a state callback from the device."""
|
||||
|
||||
@callback
|
||||
def async_callback():
|
||||
# Cause a state update callback to be triggered by the device.
|
||||
pywemo_registry.callbacks[pywemo_device.name](pywemo_device, "", "")
|
||||
return hass.async_block_till_done()
|
||||
|
||||
return async_callback
|
||||
|
||||
|
||||
def _perform_async_update(hass, wemo_entity):
|
||||
"""Return a callable method to cause hass to update the state of the entity."""
|
||||
|
||||
@callback
|
||||
def async_callback():
|
||||
return hass.services.async_call(
|
||||
HA_DOMAIN,
|
||||
SERVICE_UPDATE_ENTITY,
|
||||
{ATTR_ENTITY_ID: [wemo_entity.entity_id]},
|
||||
blocking=True,
|
||||
)
|
||||
|
||||
return async_callback
|
||||
|
||||
|
||||
async def _async_multiple_call_helper(
|
||||
hass,
|
||||
pywemo_registry,
|
||||
wemo_entity,
|
||||
pywemo_device,
|
||||
call1,
|
||||
call2,
|
||||
update_polling_method=None,
|
||||
):
|
||||
"""Create two calls (call1 & call2) in parallel; verify only one polls the device.
|
||||
|
||||
The platform entity should only perform one update poll on the device at a time.
|
||||
Any parallel updates that happen at the same time should be ignored. This is
|
||||
verified by blocking in the update polling method. The polling method should
|
||||
only be called once as a result of calling call1 & call2 simultaneously.
|
||||
"""
|
||||
# get_state is called outside the event loop. Use non-async Python Event.
|
||||
event = threading.Event()
|
||||
|
||||
def get_update(force_update=True):
|
||||
event.wait()
|
||||
|
||||
update_polling_method = update_polling_method or pywemo_device.get_state
|
||||
update_polling_method.side_effect = get_update
|
||||
|
||||
# One of these two calls will block on `event`. The other will return right
|
||||
# away because the `_update_lock` is held.
|
||||
_, pending = await asyncio.wait(
|
||||
[call1(), call2()], return_when=asyncio.FIRST_COMPLETED
|
||||
)
|
||||
|
||||
# Allow the blocked call to return.
|
||||
event.set()
|
||||
if pending:
|
||||
await asyncio.wait(pending)
|
||||
|
||||
# Make sure the state update only happened once.
|
||||
update_polling_method.assert_called_once()
|
||||
|
||||
|
||||
async def test_async_update_locked_callback_and_update(
|
||||
hass, pywemo_registry, wemo_entity, pywemo_device, **kwargs
|
||||
):
|
||||
"""Test that a callback and a state update request can't both happen at the same time.
|
||||
|
||||
When a state update is received via a callback from the device at the same time
|
||||
as hass is calling `async_update`, verify that only one of the updates proceeds.
|
||||
"""
|
||||
await async_setup_component(hass, HA_DOMAIN, {})
|
||||
callback = _perform_registry_callback(hass, pywemo_registry, pywemo_device)
|
||||
update = _perform_async_update(hass, wemo_entity)
|
||||
await _async_multiple_call_helper(
|
||||
hass, pywemo_registry, wemo_entity, pywemo_device, callback, update, **kwargs
|
||||
)
|
||||
|
||||
|
||||
async def test_async_update_locked_multiple_updates(
|
||||
hass, pywemo_registry, wemo_entity, pywemo_device, **kwargs
|
||||
):
|
||||
"""Test that two hass async_update state updates do not proceed at the same time."""
|
||||
await async_setup_component(hass, HA_DOMAIN, {})
|
||||
update = _perform_async_update(hass, wemo_entity)
|
||||
await _async_multiple_call_helper(
|
||||
hass, pywemo_registry, wemo_entity, pywemo_device, update, update, **kwargs
|
||||
)
|
||||
|
||||
|
||||
async def test_async_update_locked_multiple_callbacks(
|
||||
hass, pywemo_registry, wemo_entity, pywemo_device, **kwargs
|
||||
):
|
||||
"""Test that two device callback state updates do not proceed at the same time."""
|
||||
await async_setup_component(hass, HA_DOMAIN, {})
|
||||
callback = _perform_registry_callback(hass, pywemo_registry, pywemo_device)
|
||||
await _async_multiple_call_helper(
|
||||
hass, pywemo_registry, wemo_entity, pywemo_device, callback, callback, **kwargs
|
||||
)
|
||||
|
||||
|
||||
async def test_async_locked_update_with_exception(
|
||||
hass, wemo_entity, pywemo_device, update_polling_method=None
|
||||
):
|
||||
"""Test that the entity becomes unavailable when communication is lost."""
|
||||
assert hass.states.get(wemo_entity.entity_id).state == STATE_OFF
|
||||
await async_setup_component(hass, HA_DOMAIN, {})
|
||||
update_polling_method = update_polling_method or pywemo_device.get_state
|
||||
update_polling_method.side_effect = AttributeError
|
||||
|
||||
await hass.services.async_call(
|
||||
HA_DOMAIN,
|
||||
SERVICE_UPDATE_ENTITY,
|
||||
{ATTR_ENTITY_ID: [wemo_entity.entity_id]},
|
||||
blocking=True,
|
||||
)
|
||||
|
||||
assert hass.states.get(wemo_entity.entity_id).state == STATE_UNAVAILABLE
|
||||
pywemo_device.reconnect_with_device.assert_called_with()
|
||||
|
||||
|
||||
async def test_async_update_with_timeout_and_recovery(hass, wemo_entity, pywemo_device):
|
||||
"""Test that the entity becomes unavailable after a timeout, and that it recovers."""
|
||||
assert hass.states.get(wemo_entity.entity_id).state == STATE_OFF
|
||||
await async_setup_component(hass, HA_DOMAIN, {})
|
||||
|
||||
with patch("async_timeout.timeout", side_effect=asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
HA_DOMAIN,
|
||||
SERVICE_UPDATE_ENTITY,
|
||||
{ATTR_ENTITY_ID: [wemo_entity.entity_id]},
|
||||
blocking=True,
|
||||
)
|
||||
|
||||
assert hass.states.get(wemo_entity.entity_id).state == STATE_UNAVAILABLE
|
||||
|
||||
# Check that the entity recovers and is available after the update succeeds.
|
||||
await hass.services.async_call(
|
||||
HA_DOMAIN,
|
||||
SERVICE_UPDATE_ENTITY,
|
||||
{ATTR_ENTITY_ID: [wemo_entity.entity_id]},
|
||||
blocking=True,
|
||||
)
|
||||
|
||||
assert hass.states.get(wemo_entity.entity_id).state == STATE_OFF
|
|
@ -9,6 +9,8 @@ from homeassistant.components.homeassistant import (
|
|||
from homeassistant.const import ATTR_ENTITY_ID, STATE_OFF, STATE_ON
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from . import entity_test_helpers
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pywemo_model():
|
||||
|
@ -16,6 +18,26 @@ def pywemo_model():
|
|||
return "Motion"
|
||||
|
||||
|
||||
# Tests that are in common among wemo platforms. These test methods will be run
|
||||
# in the scope of this test module. They will run using the pywemo_model from
|
||||
# this test module (Motion).
|
||||
test_async_update_locked_multiple_updates = (
|
||||
entity_test_helpers.test_async_update_locked_multiple_updates
|
||||
)
|
||||
test_async_update_locked_multiple_callbacks = (
|
||||
entity_test_helpers.test_async_update_locked_multiple_callbacks
|
||||
)
|
||||
test_async_update_locked_callback_and_update = (
|
||||
entity_test_helpers.test_async_update_locked_callback_and_update
|
||||
)
|
||||
test_async_locked_update_with_exception = (
|
||||
entity_test_helpers.test_async_locked_update_with_exception
|
||||
)
|
||||
test_async_update_with_timeout_and_recovery = (
|
||||
entity_test_helpers.test_async_update_with_timeout_and_recovery
|
||||
)
|
||||
|
||||
|
||||
async def test_binary_sensor_registry_state_callback(
|
||||
hass, pywemo_registry, pywemo_device, wemo_entity
|
||||
):
|
||||
|
|
|
@ -11,6 +11,8 @@ from homeassistant.components.wemo.const import DOMAIN
|
|||
from homeassistant.const import ATTR_ENTITY_ID, STATE_OFF, STATE_ON
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from . import entity_test_helpers
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pywemo_model():
|
||||
|
@ -18,6 +20,26 @@ def pywemo_model():
|
|||
return "Humidifier"
|
||||
|
||||
|
||||
# Tests that are in common among wemo platforms. These test methods will be run
|
||||
# in the scope of this test module. They will run using the pywemo_model from
|
||||
# this test module (Humidifier).
|
||||
test_async_update_locked_multiple_updates = (
|
||||
entity_test_helpers.test_async_update_locked_multiple_updates
|
||||
)
|
||||
test_async_update_locked_multiple_callbacks = (
|
||||
entity_test_helpers.test_async_update_locked_multiple_callbacks
|
||||
)
|
||||
test_async_update_locked_callback_and_update = (
|
||||
entity_test_helpers.test_async_update_locked_callback_and_update
|
||||
)
|
||||
test_async_locked_update_with_exception = (
|
||||
entity_test_helpers.test_async_locked_update_with_exception
|
||||
)
|
||||
test_async_update_with_timeout_and_recovery = (
|
||||
entity_test_helpers.test_async_update_with_timeout_and_recovery
|
||||
)
|
||||
|
||||
|
||||
async def test_fan_registry_state_callback(
|
||||
hass, pywemo_registry, pywemo_device, wemo_entity
|
||||
):
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
"""Tests for the Wemo light entity via the bridge."""
|
||||
|
||||
import pytest
|
||||
import pywemo
|
||||
|
||||
|
@ -7,10 +6,14 @@ from homeassistant.components.homeassistant import (
|
|||
DOMAIN as HA_DOMAIN,
|
||||
SERVICE_UPDATE_ENTITY,
|
||||
)
|
||||
from homeassistant.components.wemo.light import MIN_TIME_BETWEEN_SCANS
|
||||
from homeassistant.const import ATTR_ENTITY_ID, STATE_OFF, STATE_ON
|
||||
from homeassistant.setup import async_setup_component
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from tests.async_mock import PropertyMock, create_autospec
|
||||
from . import entity_test_helpers
|
||||
|
||||
from tests.async_mock import create_autospec, patch
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -19,16 +22,71 @@ def pywemo_model():
|
|||
return "Bridge"
|
||||
|
||||
|
||||
# Note: The ordering of where the pywemo_bridge_light comes in test arguments matters.
|
||||
# In test methods, the pywemo_bridge_light fixture argument must come before the
|
||||
# wemo_entity fixture argument.
|
||||
@pytest.fixture(name="pywemo_bridge_light")
|
||||
def pywemo_bridge_light_fixture(pywemo_device):
|
||||
"""Fixture for Bridge.Light WeMoDevice instances."""
|
||||
light = create_autospec(pywemo.ouimeaux_device.bridge.Light)
|
||||
light = create_autospec(pywemo.ouimeaux_device.bridge.Light, instance=True)
|
||||
light.uniqueID = pywemo_device.serialnumber
|
||||
light.name = pywemo_device.name
|
||||
light.bridge = pywemo_device
|
||||
light.state = {"onoff": 0}
|
||||
pywemo_device.Lights = {pywemo_device.serialnumber: light}
|
||||
return light
|
||||
|
||||
|
||||
def _bypass_throttling():
|
||||
"""Bypass the util.Throttle on the update_lights method."""
|
||||
utcnow = dt_util.utcnow()
|
||||
|
||||
def increment_and_return_time():
|
||||
nonlocal utcnow
|
||||
utcnow += MIN_TIME_BETWEEN_SCANS
|
||||
return utcnow
|
||||
|
||||
return patch("homeassistant.util.utcnow", side_effect=increment_and_return_time)
|
||||
|
||||
|
||||
async def test_async_update_locked_multiple_updates(
|
||||
hass, pywemo_registry, pywemo_bridge_light, wemo_entity, pywemo_device
|
||||
):
|
||||
"""Test that two state updates do not proceed at the same time."""
|
||||
pywemo_device.bridge_update.reset_mock()
|
||||
|
||||
with _bypass_throttling():
|
||||
await entity_test_helpers.test_async_update_locked_multiple_updates(
|
||||
hass,
|
||||
pywemo_registry,
|
||||
wemo_entity,
|
||||
pywemo_device,
|
||||
update_polling_method=pywemo_device.bridge_update,
|
||||
)
|
||||
|
||||
|
||||
async def test_async_update_with_timeout_and_recovery(
|
||||
hass, pywemo_bridge_light, wemo_entity, pywemo_device
|
||||
):
|
||||
"""Test that the entity becomes unavailable after a timeout, and that it recovers."""
|
||||
await entity_test_helpers.test_async_update_with_timeout_and_recovery(
|
||||
hass, wemo_entity, pywemo_device
|
||||
)
|
||||
|
||||
|
||||
async def test_async_locked_update_with_exception(
|
||||
hass, pywemo_bridge_light, wemo_entity, pywemo_device
|
||||
):
|
||||
"""Test that the entity becomes unavailable when communication is lost."""
|
||||
with _bypass_throttling():
|
||||
await entity_test_helpers.test_async_locked_update_with_exception(
|
||||
hass,
|
||||
wemo_entity,
|
||||
pywemo_device,
|
||||
update_polling_method=pywemo_device.bridge_update,
|
||||
)
|
||||
|
||||
|
||||
async def test_light_update_entity(
|
||||
hass, pywemo_registry, pywemo_bridge_light, wemo_entity
|
||||
):
|
||||
|
@ -36,7 +94,7 @@ async def test_light_update_entity(
|
|||
await async_setup_component(hass, HA_DOMAIN, {})
|
||||
|
||||
# On state.
|
||||
type(pywemo_bridge_light).state = PropertyMock(return_value={"onoff": 1})
|
||||
pywemo_bridge_light.state = {"onoff": 1}
|
||||
await hass.services.async_call(
|
||||
HA_DOMAIN,
|
||||
SERVICE_UPDATE_ENTITY,
|
||||
|
@ -46,7 +104,7 @@ async def test_light_update_entity(
|
|||
assert hass.states.get(wemo_entity.entity_id).state == STATE_ON
|
||||
|
||||
# Off state.
|
||||
type(pywemo_bridge_light).state = PropertyMock(return_value={"onoff": 0})
|
||||
pywemo_bridge_light.state = {"onoff": 0}
|
||||
await hass.services.async_call(
|
||||
HA_DOMAIN,
|
||||
SERVICE_UPDATE_ENTITY,
|
||||
|
|
|
@ -9,6 +9,8 @@ from homeassistant.components.homeassistant import (
|
|||
from homeassistant.const import ATTR_ENTITY_ID, STATE_OFF, STATE_ON
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from . import entity_test_helpers
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pywemo_model():
|
||||
|
@ -16,6 +18,26 @@ def pywemo_model():
|
|||
return "Dimmer"
|
||||
|
||||
|
||||
# Tests that are in common among wemo platforms. These test methods will be run
|
||||
# in the scope of this test module. They will run using the pywemo_model from
|
||||
# this test module (Dimmer).
|
||||
test_async_update_locked_multiple_updates = (
|
||||
entity_test_helpers.test_async_update_locked_multiple_updates
|
||||
)
|
||||
test_async_update_locked_multiple_callbacks = (
|
||||
entity_test_helpers.test_async_update_locked_multiple_callbacks
|
||||
)
|
||||
test_async_update_locked_callback_and_update = (
|
||||
entity_test_helpers.test_async_update_locked_callback_and_update
|
||||
)
|
||||
test_async_locked_update_with_exception = (
|
||||
entity_test_helpers.test_async_locked_update_with_exception
|
||||
)
|
||||
test_async_update_with_timeout_and_recovery = (
|
||||
entity_test_helpers.test_async_update_with_timeout_and_recovery
|
||||
)
|
||||
|
||||
|
||||
async def test_light_registry_state_callback(
|
||||
hass, pywemo_registry, pywemo_device, wemo_entity
|
||||
):
|
||||
|
|
|
@ -9,6 +9,8 @@ from homeassistant.components.homeassistant import (
|
|||
from homeassistant.const import ATTR_ENTITY_ID, STATE_OFF, STATE_ON
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from . import entity_test_helpers
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pywemo_model():
|
||||
|
@ -16,6 +18,26 @@ def pywemo_model():
|
|||
return "LightSwitch"
|
||||
|
||||
|
||||
# Tests that are in common among wemo platforms. These test methods will be run
|
||||
# in the scope of this test module. They will run using the pywemo_model from
|
||||
# this test module (LightSwitch).
|
||||
test_async_update_locked_multiple_updates = (
|
||||
entity_test_helpers.test_async_update_locked_multiple_updates
|
||||
)
|
||||
test_async_update_locked_multiple_callbacks = (
|
||||
entity_test_helpers.test_async_update_locked_multiple_callbacks
|
||||
)
|
||||
test_async_update_locked_callback_and_update = (
|
||||
entity_test_helpers.test_async_update_locked_callback_and_update
|
||||
)
|
||||
test_async_locked_update_with_exception = (
|
||||
entity_test_helpers.test_async_locked_update_with_exception
|
||||
)
|
||||
test_async_update_with_timeout_and_recovery = (
|
||||
entity_test_helpers.test_async_update_with_timeout_and_recovery
|
||||
)
|
||||
|
||||
|
||||
async def test_switch_registry_state_callback(
|
||||
hass, pywemo_registry, pywemo_device, wemo_entity
|
||||
):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue