Support receiving long-press events from WeMo devices (#45503)
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
parent
c65d120633
commit
61079ab7fa
12 changed files with 372 additions and 41 deletions
|
@ -20,6 +20,7 @@ from homeassistant.helpers.event import async_call_later
|
|||
from homeassistant.util.async_ import gather_with_concurrency
|
||||
|
||||
from .const import DOMAIN
|
||||
from .wemo_device import async_register_device
|
||||
|
||||
# Max number of devices to initialize at once. This limit is in place to
|
||||
# avoid tying up too many executor threads with WeMo device setup.
|
||||
|
@ -105,6 +106,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
# Keep track of WeMo device subscriptions for push updates
|
||||
registry = hass.data[DOMAIN]["registry"] = pywemo.SubscriptionRegistry()
|
||||
await hass.async_add_executor_job(registry.start)
|
||||
|
||||
# Respond to discovery requests from WeMo devices.
|
||||
discovery_responder = pywemo.ssdp.DiscoveryResponder(registry.port)
|
||||
await hass.async_add_executor_job(discovery_responder.start)
|
||||
|
||||
static_conf = config.get(CONF_STATIC, [])
|
||||
wemo_dispatcher = WemoDispatcher(entry)
|
||||
wemo_discovery = WemoDiscovery(hass, wemo_dispatcher, static_conf)
|
||||
|
@ -113,6 +119,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
"""Shutdown Wemo subscriptions and subscription thread on exit."""
|
||||
_LOGGER.debug("Shutting down WeMo event subscriptions")
|
||||
await hass.async_add_executor_job(registry.stop)
|
||||
await hass.async_add_executor_job(discovery_responder.stop)
|
||||
wemo_discovery.async_stop_discovery()
|
||||
|
||||
entry.async_on_unload(
|
||||
|
@ -137,15 +144,15 @@ class WemoDispatcher:
|
|||
self._added_serial_numbers = set()
|
||||
self._loaded_components = set()
|
||||
|
||||
@callback
|
||||
def async_add_unique_device(
|
||||
self, hass: HomeAssistant, device: pywemo.WeMoDevice
|
||||
async def async_add_unique_device(
|
||||
self, hass: HomeAssistant, wemo: pywemo.WeMoDevice
|
||||
) -> None:
|
||||
"""Add a WeMo device to hass if it has not already been added."""
|
||||
if device.serialnumber in self._added_serial_numbers:
|
||||
if wemo.serialnumber in self._added_serial_numbers:
|
||||
return
|
||||
|
||||
component = WEMO_MODEL_DISPATCH.get(device.model_name, SWITCH_DOMAIN)
|
||||
component = WEMO_MODEL_DISPATCH.get(wemo.model_name, SWITCH_DOMAIN)
|
||||
device = await async_register_device(hass, self._config_entry, wemo)
|
||||
|
||||
# Three cases:
|
||||
# - First time we see component, we need to load it and initialize the backlog
|
||||
|
@ -171,7 +178,7 @@ class WemoDispatcher:
|
|||
device,
|
||||
)
|
||||
|
||||
self._added_serial_numbers.add(device.serialnumber)
|
||||
self._added_serial_numbers.add(wemo.serialnumber)
|
||||
|
||||
|
||||
class WemoDiscovery:
|
||||
|
@ -200,7 +207,7 @@ class WemoDiscovery:
|
|||
for device in await self._hass.async_add_executor_job(
|
||||
pywemo.discover_devices
|
||||
):
|
||||
self._wemo_dispatcher.async_add_unique_device(self._hass, device)
|
||||
await self._wemo_dispatcher.async_add_unique_device(self._hass, device)
|
||||
await self.discover_statics()
|
||||
|
||||
finally:
|
||||
|
@ -236,7 +243,9 @@ class WemoDiscovery:
|
|||
],
|
||||
):
|
||||
if device:
|
||||
self._wemo_dispatcher.async_add_unique_device(self._hass, device)
|
||||
await self._wemo_dispatcher.async_add_unique_device(
|
||||
self._hass, device
|
||||
)
|
||||
|
||||
|
||||
def validate_static_config(host, port):
|
||||
|
|
|
@ -3,3 +3,6 @@ DOMAIN = "wemo"
|
|||
|
||||
SERVICE_SET_HUMIDITY = "set_humidity"
|
||||
SERVICE_RESET_FILTER_LIFE = "reset_filter_life"
|
||||
SIGNAL_WEMO_STATE_PUSH = f"{DOMAIN}.state_push"
|
||||
|
||||
WEMO_SUBSCRIPTION_EVENT = f"{DOMAIN}_subscription_event"
|
||||
|
|
61
homeassistant/components/wemo/device_trigger.py
Normal file
61
homeassistant/components/wemo/device_trigger.py
Normal file
|
@ -0,0 +1,61 @@
|
|||
"""Triggers for WeMo devices."""
|
||||
from pywemo.subscribe import EVENT_TYPE_LONG_PRESS
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.device_automation import DEVICE_TRIGGER_BASE_SCHEMA
|
||||
from homeassistant.components.homeassistant.triggers import event as event_trigger
|
||||
from homeassistant.const import CONF_DEVICE_ID, CONF_DOMAIN, CONF_PLATFORM, CONF_TYPE
|
||||
|
||||
from .const import DOMAIN as WEMO_DOMAIN, WEMO_SUBSCRIPTION_EVENT
|
||||
from .wemo_device import async_get_device
|
||||
|
||||
TRIGGER_TYPES = {EVENT_TYPE_LONG_PRESS}
|
||||
|
||||
TRIGGER_SCHEMA = DEVICE_TRIGGER_BASE_SCHEMA.extend(
|
||||
{
|
||||
vol.Required(CONF_TYPE): vol.In(TRIGGER_TYPES),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def async_get_triggers(hass, device_id):
|
||||
"""Return a list of triggers."""
|
||||
|
||||
wemo_trigger = {
|
||||
# Required fields of TRIGGER_BASE_SCHEMA
|
||||
CONF_PLATFORM: "device",
|
||||
CONF_DOMAIN: WEMO_DOMAIN,
|
||||
CONF_DEVICE_ID: device_id,
|
||||
}
|
||||
|
||||
device = async_get_device(hass, device_id)
|
||||
triggers = []
|
||||
|
||||
# Check for long press support.
|
||||
if device.supports_long_press:
|
||||
triggers.append(
|
||||
{
|
||||
# Required fields of TRIGGER_SCHEMA
|
||||
CONF_TYPE: EVENT_TYPE_LONG_PRESS,
|
||||
**wemo_trigger,
|
||||
}
|
||||
)
|
||||
|
||||
return triggers
|
||||
|
||||
|
||||
async def async_attach_trigger(hass, config, action, automation_info):
|
||||
"""Attach a trigger."""
|
||||
event_config = event_trigger.TRIGGER_SCHEMA(
|
||||
{
|
||||
event_trigger.CONF_PLATFORM: "event",
|
||||
event_trigger.CONF_EVENT_TYPE: WEMO_SUBSCRIPTION_EVENT,
|
||||
event_trigger.CONF_EVENT_DATA: {
|
||||
CONF_DEVICE_ID: config[CONF_DEVICE_ID],
|
||||
CONF_TYPE: config[CONF_TYPE],
|
||||
},
|
||||
}
|
||||
)
|
||||
return await event_trigger.async_attach_trigger(
|
||||
hass, event_config, action, automation_info, platform_type="device"
|
||||
)
|
|
@ -10,9 +10,11 @@ import async_timeout
|
|||
from pywemo import WeMoDevice
|
||||
from pywemo.exceptions import ActionException
|
||||
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||
from homeassistant.helpers.entity import DeviceInfo, Entity
|
||||
|
||||
from .const import DOMAIN as WEMO_DOMAIN
|
||||
from .const import DOMAIN as WEMO_DOMAIN, SIGNAL_WEMO_STATE_PUSH
|
||||
from .wemo_device import DeviceWrapper
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
@ -35,9 +37,9 @@ class WemoEntity(Entity):
|
|||
Requires that subclasses implement the _update method.
|
||||
"""
|
||||
|
||||
def __init__(self, device: WeMoDevice) -> None:
|
||||
def __init__(self, wemo: WeMoDevice) -> None:
|
||||
"""Initialize the WeMo device."""
|
||||
self.wemo = device
|
||||
self.wemo = wemo
|
||||
self._state = None
|
||||
self._available = True
|
||||
self._update_lock = None
|
||||
|
@ -120,6 +122,12 @@ class WemoEntity(Entity):
|
|||
class WemoSubscriptionEntity(WemoEntity):
|
||||
"""Common methods for Wemo devices that register for update callbacks."""
|
||||
|
||||
def __init__(self, device: DeviceWrapper) -> None:
|
||||
"""Initialize WemoSubscriptionEntity."""
|
||||
super().__init__(device.wemo)
|
||||
self._device_id = device.device_id
|
||||
self._device_info = device.device_info
|
||||
|
||||
@property
|
||||
def unique_id(self) -> str:
|
||||
"""Return the id of this WeMo device."""
|
||||
|
@ -128,12 +136,7 @@ class WemoSubscriptionEntity(WemoEntity):
|
|||
@property
|
||||
def device_info(self) -> DeviceInfo:
|
||||
"""Return the device info."""
|
||||
return {
|
||||
"name": self.name,
|
||||
"identifiers": {(WEMO_DOMAIN, self.unique_id)},
|
||||
"model": self.wemo.model_name,
|
||||
"manufacturer": "Belkin",
|
||||
}
|
||||
return self._device_info
|
||||
|
||||
@property
|
||||
def is_on(self) -> bool:
|
||||
|
@ -169,27 +172,25 @@ class WemoSubscriptionEntity(WemoEntity):
|
|||
"""Wemo device added to Home Assistant."""
|
||||
await super().async_added_to_hass()
|
||||
|
||||
registry = self.hass.data[WEMO_DOMAIN]["registry"]
|
||||
await self.hass.async_add_executor_job(registry.register, self.wemo)
|
||||
registry.on(self.wemo, None, self._subscription_callback)
|
||||
self.async_on_remove(
|
||||
async_dispatcher_connect(
|
||||
self.hass, SIGNAL_WEMO_STATE_PUSH, self._async_subscription_callback
|
||||
)
|
||||
)
|
||||
|
||||
async def async_will_remove_from_hass(self) -> None:
|
||||
"""Wemo device removed from hass."""
|
||||
registry = self.hass.data[WEMO_DOMAIN]["registry"]
|
||||
await self.hass.async_add_executor_job(registry.unregister, self.wemo)
|
||||
|
||||
def _subscription_callback(
|
||||
self, _device: WeMoDevice, _type: str, _params: str
|
||||
async def _async_subscription_callback(
|
||||
self, device_id: str, event_type: str, params: str
|
||||
) -> None:
|
||||
"""Update the state by the Wemo device."""
|
||||
_LOGGER.info("Subscription update for %s", self.name)
|
||||
updated = self.wemo.subscription_update(_type, _params)
|
||||
self.hass.add_job(self._async_locked_subscription_callback(not updated))
|
||||
|
||||
async def _async_locked_subscription_callback(self, force_update: bool) -> None:
|
||||
"""Handle an update from a subscription."""
|
||||
# Only respond events for this device.
|
||||
if device_id != self._device_id:
|
||||
return
|
||||
# If an update is in progress, we don't do anything
|
||||
if self._update_lock.locked():
|
||||
return
|
||||
|
||||
await self._async_locked_update(force_update)
|
||||
_LOGGER.debug("Subscription event (%s) for %s", event_type, self.name)
|
||||
updated = await self.hass.async_add_executor_job(
|
||||
self.wemo.subscription_update, event_type, params
|
||||
)
|
||||
await self._async_locked_update(not updated)
|
||||
|
|
|
@ -40,11 +40,11 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
|
|||
|
||||
async def _discovered_wemo(device):
|
||||
"""Handle a discovered Wemo device."""
|
||||
if device.model_name == "Dimmer":
|
||||
if device.wemo.model_name == "Dimmer":
|
||||
async_add_entities([WemoDimmer(device)])
|
||||
else:
|
||||
await hass.async_add_executor_job(
|
||||
setup_bridge, hass, device, async_add_entities
|
||||
setup_bridge, hass, device.wemo, async_add_entities
|
||||
)
|
||||
|
||||
async_dispatcher_connect(hass, f"{WEMO_DOMAIN}.light", _discovered_wemo)
|
||||
|
|
|
@ -9,5 +9,10 @@
|
|||
"single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]",
|
||||
"no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]"
|
||||
}
|
||||
},
|
||||
"device_automation": {
|
||||
"trigger_type": {
|
||||
"long_press": "Wemo button was pressed for 2 seconds"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
96
homeassistant/components/wemo/wemo_device.py
Normal file
96
homeassistant/components/wemo/wemo_device.py
Normal file
|
@ -0,0 +1,96 @@
|
|||
"""Home Assistant wrapper for a pyWeMo device."""
|
||||
import logging
|
||||
|
||||
from pywemo import PyWeMoException, WeMoDevice
|
||||
from pywemo.subscribe import EVENT_TYPE_LONG_PRESS
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import (
|
||||
CONF_DEVICE_ID,
|
||||
CONF_NAME,
|
||||
CONF_PARAMS,
|
||||
CONF_TYPE,
|
||||
CONF_UNIQUE_ID,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.helpers.device_registry import async_get as async_get_device_registry
|
||||
from homeassistant.helpers.dispatcher import dispatcher_send
|
||||
|
||||
from .const import DOMAIN, SIGNAL_WEMO_STATE_PUSH, WEMO_SUBSCRIPTION_EVENT
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DeviceWrapper:
|
||||
"""Home Assistant wrapper for a pyWeMo device."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant, wemo: WeMoDevice, device_id: str) -> None:
|
||||
"""Initialize DeviceWrapper."""
|
||||
self.hass = hass
|
||||
self.wemo = wemo
|
||||
self.device_id = device_id
|
||||
self.device_info = _device_info(wemo)
|
||||
self.supports_long_press = wemo.supports_long_press()
|
||||
|
||||
def subscription_callback(
|
||||
self, _device: WeMoDevice, event_type: str, params: str
|
||||
) -> None:
|
||||
"""Receives push notifications from WeMo devices."""
|
||||
if event_type == EVENT_TYPE_LONG_PRESS:
|
||||
self.hass.bus.fire(
|
||||
WEMO_SUBSCRIPTION_EVENT,
|
||||
{
|
||||
CONF_DEVICE_ID: self.device_id,
|
||||
CONF_NAME: self.wemo.name,
|
||||
CONF_TYPE: event_type,
|
||||
CONF_PARAMS: params,
|
||||
CONF_UNIQUE_ID: self.wemo.serialnumber,
|
||||
},
|
||||
)
|
||||
else:
|
||||
dispatcher_send(
|
||||
self.hass, SIGNAL_WEMO_STATE_PUSH, self.device_id, event_type, params
|
||||
)
|
||||
|
||||
|
||||
def _device_info(wemo: WeMoDevice):
|
||||
return {
|
||||
"name": wemo.name,
|
||||
"identifiers": {(DOMAIN, wemo.serialnumber)},
|
||||
"model": wemo.model_name,
|
||||
"manufacturer": "Belkin",
|
||||
}
|
||||
|
||||
|
||||
async def async_register_device(
|
||||
hass: HomeAssistant, config_entry: ConfigEntry, wemo: WeMoDevice
|
||||
) -> DeviceWrapper:
|
||||
"""Register a device with home assistant and enable pywemo event callbacks."""
|
||||
device_registry = async_get_device_registry(hass)
|
||||
entry = device_registry.async_get_or_create(
|
||||
config_entry_id=config_entry.entry_id, **_device_info(wemo)
|
||||
)
|
||||
|
||||
registry = hass.data[DOMAIN]["registry"]
|
||||
await hass.async_add_executor_job(registry.register, wemo)
|
||||
|
||||
device = DeviceWrapper(hass, wemo, entry.id)
|
||||
hass.data[DOMAIN].setdefault("devices", {})[entry.id] = device
|
||||
registry.on(wemo, None, device.subscription_callback)
|
||||
|
||||
if device.supports_long_press:
|
||||
try:
|
||||
await hass.async_add_executor_job(wemo.ensure_long_press_virtual_device)
|
||||
except PyWeMoException:
|
||||
_LOGGER.warning(
|
||||
"Failed to enable long press support for device: %s", wemo.name
|
||||
)
|
||||
device.supports_long_press = False
|
||||
|
||||
return device
|
||||
|
||||
|
||||
@callback
|
||||
def async_get_device(hass: HomeAssistant, device_id: str) -> DeviceWrapper:
|
||||
"""Return DeviceWrapper for device_id."""
|
||||
return hass.data[DOMAIN]["devices"][device_id]
|
|
@ -43,13 +43,15 @@ def pywemo_registry_fixture():
|
|||
@pytest.fixture(name="pywemo_device")
|
||||
def pywemo_device_fixture(pywemo_registry, pywemo_model):
|
||||
"""Fixture for WeMoDevice instances."""
|
||||
device = create_autospec(getattr(pywemo, pywemo_model), instance=True)
|
||||
cls = getattr(pywemo, pywemo_model)
|
||||
device = create_autospec(cls, instance=True)
|
||||
device.host = MOCK_HOST
|
||||
device.port = MOCK_PORT
|
||||
device.name = MOCK_NAME
|
||||
device.serialnumber = MOCK_SERIAL_NUMBER
|
||||
device.model_name = pywemo_model
|
||||
device.get_state.return_value = 0 # Default to Off
|
||||
device.supports_long_press.return_value = cls.supports_long_press()
|
||||
|
||||
url = f"http://{MOCK_HOST}:{MOCK_PORT}/setup.xml"
|
||||
with patch("pywemo.setup_url_for_address", return_value=url), patch(
|
||||
|
|
|
@ -13,19 +13,31 @@ from homeassistant.components.homeassistant import (
|
|||
DOMAIN as HA_DOMAIN,
|
||||
SERVICE_UPDATE_ENTITY,
|
||||
)
|
||||
from homeassistant.components.wemo.const import SIGNAL_WEMO_STATE_PUSH
|
||||
from homeassistant.const import ATTR_ENTITY_ID, STATE_OFF, STATE_UNAVAILABLE
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
|
||||
def _perform_registry_callback(hass, pywemo_registry, pywemo_device):
|
||||
"""Return a callable method to trigger a state callback from the device."""
|
||||
|
||||
@callback
|
||||
def async_callback():
|
||||
async def async_callback():
|
||||
event = asyncio.Event()
|
||||
|
||||
async def event_callback(e, *args):
|
||||
event.set()
|
||||
|
||||
stop_dispatcher_listener = async_dispatcher_connect(
|
||||
hass, SIGNAL_WEMO_STATE_PUSH, event_callback
|
||||
)
|
||||
# Cause a state update callback to be triggered by the device.
|
||||
pywemo_registry.callbacks[pywemo_device.name](pywemo_device, "", "")
|
||||
return hass.async_block_till_done()
|
||||
await hass.async_add_executor_job(
|
||||
pywemo_registry.callbacks[pywemo_device.name], pywemo_device, "", ""
|
||||
)
|
||||
await event.wait()
|
||||
stop_dispatcher_listener()
|
||||
|
||||
return async_callback
|
||||
|
||||
|
@ -63,8 +75,10 @@ async def _async_multiple_call_helper(
|
|||
"""
|
||||
# get_state is called outside the event loop. Use non-async Python Event.
|
||||
event = threading.Event()
|
||||
waiting = asyncio.Event()
|
||||
|
||||
def get_update(force_update=True):
|
||||
hass.add_job(waiting.set)
|
||||
event.wait()
|
||||
|
||||
update_polling_method = update_polling_method or pywemo_device.get_state
|
||||
|
@ -77,6 +91,7 @@ async def _async_multiple_call_helper(
|
|||
)
|
||||
|
||||
# Allow the blocked call to return.
|
||||
await waiting.wait()
|
||||
event.set()
|
||||
if pending:
|
||||
await asyncio.wait(pending)
|
||||
|
|
98
tests/components/wemo/test_device_trigger.py
Normal file
98
tests/components/wemo/test_device_trigger.py
Normal file
|
@ -0,0 +1,98 @@
|
|||
"""Verify that WeMo device triggers work as expected."""
|
||||
import pytest
|
||||
from pywemo.subscribe import EVENT_TYPE_LONG_PRESS
|
||||
|
||||
from homeassistant.components.automation import DOMAIN as AUTOMATION_DOMAIN
|
||||
from homeassistant.components.light import DOMAIN as LIGHT_DOMAIN
|
||||
from homeassistant.components.wemo.const import DOMAIN, WEMO_SUBSCRIPTION_EVENT
|
||||
from homeassistant.const import (
|
||||
CONF_DEVICE_ID,
|
||||
CONF_DOMAIN,
|
||||
CONF_ENTITY_ID,
|
||||
CONF_PLATFORM,
|
||||
CONF_TYPE,
|
||||
)
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from tests.common import (
|
||||
assert_lists_same,
|
||||
async_get_device_automations,
|
||||
async_mock_service,
|
||||
)
|
||||
|
||||
MOCK_DEVICE_ID = "some-device-id"
|
||||
DATA_MESSAGE = {"message": "service-called"}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pywemo_model():
|
||||
"""Pywemo Dimmer models use the light platform (WemoDimmer class)."""
|
||||
return "Dimmer"
|
||||
|
||||
|
||||
async def setup_automation(hass, device_id, trigger_type):
|
||||
"""Set up an automation trigger for testing triggering."""
|
||||
return await async_setup_component(
|
||||
hass,
|
||||
AUTOMATION_DOMAIN,
|
||||
{
|
||||
AUTOMATION_DOMAIN: [
|
||||
{
|
||||
"trigger": {
|
||||
CONF_PLATFORM: "device",
|
||||
CONF_DOMAIN: DOMAIN,
|
||||
CONF_DEVICE_ID: device_id,
|
||||
CONF_TYPE: trigger_type,
|
||||
},
|
||||
"action": {
|
||||
"service": "test.automation",
|
||||
"data": DATA_MESSAGE,
|
||||
},
|
||||
},
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
async def test_get_triggers(hass, wemo_entity):
|
||||
"""Test that the triggers appear for a supported device."""
|
||||
assert wemo_entity.device_id is not None
|
||||
|
||||
expected_triggers = [
|
||||
{
|
||||
CONF_DEVICE_ID: wemo_entity.device_id,
|
||||
CONF_DOMAIN: DOMAIN,
|
||||
CONF_PLATFORM: "device",
|
||||
CONF_TYPE: EVENT_TYPE_LONG_PRESS,
|
||||
},
|
||||
{
|
||||
CONF_DEVICE_ID: wemo_entity.device_id,
|
||||
CONF_DOMAIN: LIGHT_DOMAIN,
|
||||
CONF_ENTITY_ID: wemo_entity.entity_id,
|
||||
CONF_PLATFORM: "device",
|
||||
CONF_TYPE: "turned_off",
|
||||
},
|
||||
{
|
||||
CONF_DEVICE_ID: wemo_entity.device_id,
|
||||
CONF_DOMAIN: LIGHT_DOMAIN,
|
||||
CONF_ENTITY_ID: wemo_entity.entity_id,
|
||||
CONF_PLATFORM: "device",
|
||||
CONF_TYPE: "turned_on",
|
||||
},
|
||||
]
|
||||
triggers = await async_get_device_automations(
|
||||
hass, "trigger", wemo_entity.device_id
|
||||
)
|
||||
assert_lists_same(triggers, expected_triggers)
|
||||
|
||||
|
||||
async def test_fires_on_long_press(hass):
|
||||
"""Test wemo long press trigger firing."""
|
||||
assert await setup_automation(hass, MOCK_DEVICE_ID, EVENT_TYPE_LONG_PRESS)
|
||||
calls = async_mock_service(hass, "test", "automation")
|
||||
|
||||
message = {CONF_DEVICE_ID: MOCK_DEVICE_ID, CONF_TYPE: EVENT_TYPE_LONG_PRESS}
|
||||
hass.bus.async_fire(WEMO_SUBSCRIPTION_EVENT, message)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
assert calls[0].data == DATA_MESSAGE
|
|
@ -110,6 +110,7 @@ async def test_discovery(hass, pywemo_registry):
|
|||
device.serialnumber = f"{MOCK_SERIAL_NUMBER}_{counter}"
|
||||
device.model_name = "Motion"
|
||||
device.get_state.return_value = 0 # Default to Off
|
||||
device.supports_long_press.return_value = False
|
||||
return device
|
||||
|
||||
pywemo_devices = [create_device(0), create_device(1)]
|
||||
|
|
40
tests/components/wemo/test_wemo_device.py
Normal file
40
tests/components/wemo/test_wemo_device.py
Normal file
|
@ -0,0 +1,40 @@
|
|||
"""Tests for wemo_device.py."""
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from pywemo import PyWeMoException
|
||||
|
||||
from homeassistant.components.wemo import CONF_DISCOVERY, CONF_STATIC, wemo_device
|
||||
from homeassistant.components.wemo.const import DOMAIN
|
||||
from homeassistant.helpers import device_registry
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from .conftest import MOCK_HOST
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pywemo_model():
|
||||
"""Pywemo Dimmer models use the light platform (WemoDimmer class)."""
|
||||
return "Dimmer"
|
||||
|
||||
|
||||
async def test_async_register_device_longpress_fails(hass, pywemo_device):
|
||||
"""Device is still registered if ensure_long_press_virtual_device fails."""
|
||||
with patch.object(pywemo_device, "ensure_long_press_virtual_device") as elp:
|
||||
elp.side_effect = PyWeMoException
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
DOMAIN,
|
||||
{
|
||||
DOMAIN: {
|
||||
CONF_DISCOVERY: False,
|
||||
CONF_STATIC: [MOCK_HOST],
|
||||
},
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
dr = device_registry.async_get(hass)
|
||||
device_entries = list(dr.devices.values())
|
||||
assert len(device_entries) == 1
|
||||
device_wrapper = wemo_device.async_get_device(hass, device_entries[0].id)
|
||||
assert device_wrapper.supports_long_press is False
|
Loading…
Add table
Add a link
Reference in a new issue