Add binary_sensor support to RFlink (#17146)

* Add binary_sensor support to RFlink

* Add support for aliases

* Fix review comments

* Refactor, add tests

* Review comments

* Review comments

* Review comments

* Review comments
This commit is contained in:
emontnemery 2018-10-18 22:28:40 +02:00 committed by Martin Hjelmare
parent cf3a97ff3c
commit 10c1378195
4 changed files with 284 additions and 2 deletions

View file

@ -0,0 +1,105 @@
"""
Support for Rflink binary sensors.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/binary_sensor.rflink/
"""
import logging
import voluptuous as vol
from homeassistant.components.binary_sensor import (
DEVICE_CLASSES_SCHEMA, PLATFORM_SCHEMA, BinarySensorDevice)
from homeassistant.components.rflink import (
CONF_ALIASES, CONF_DEVICES, RflinkDevice)
from homeassistant.const import (
CONF_FORCE_UPDATE, CONF_NAME, CONF_DEVICE_CLASS)
import homeassistant.helpers.config_validation as cv
import homeassistant.helpers.event as evt
CONF_OFF_DELAY = 'off_delay'
DEFAULT_FORCE_UPDATE = False
DEPENDENCIES = ['rflink']
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_DEVICES, default={}): {
cv.string: vol.Schema({
vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_FORCE_UPDATE, default=DEFAULT_FORCE_UPDATE):
cv.boolean,
vol.Optional(CONF_OFF_DELAY): cv.positive_int,
vol.Optional(CONF_ALIASES, default=[]):
vol.All(cv.ensure_list, [cv.string]),
})
},
}, extra=vol.ALLOW_EXTRA)
def devices_from_config(domain_config):
"""Parse configuration and add Rflink sensor devices."""
devices = []
for device_id, config in domain_config[CONF_DEVICES].items():
device = RflinkBinarySensor(device_id, **config)
devices.append(device)
return devices
async def async_setup_platform(hass, config, async_add_entities,
discovery_info=None):
"""Set up the Rflink platform."""
async_add_entities(devices_from_config(config))
class RflinkBinarySensor(RflinkDevice, BinarySensorDevice):
"""Representation of an Rflink binary sensor."""
def __init__(self, device_id, device_class=None,
force_update=None, off_delay=None,
**kwargs):
"""Handle sensor specific args and super init."""
self._state = None
self._device_class = device_class
self._force_update = force_update
self._off_delay = off_delay
self._delay_listener = None
super().__init__(device_id, **kwargs)
def _handle_event(self, event):
"""Domain specific event handler."""
command = event['command']
if command == 'on':
self._state = True
elif command == 'off':
self._state = False
if (self._state and self._off_delay is not None):
def off_delay_listener(now):
"""Switch device off after a delay."""
self._delay_listener = None
self._state = False
self.async_schedule_update_ha_state()
if self._delay_listener is not None:
self._delay_listener()
self._delay_listener = evt.async_call_later(
self.hass, self._off_delay, off_delay_listener)
@property
def is_on(self):
"""Return true if the binary sensor is on."""
return self._state
@property
def device_class(self):
"""Return the class of this sensor."""
return self._device_class
@property
def force_update(self):
"""Force update."""
return self._force_update

View file

@ -120,7 +120,6 @@ async def async_setup(hass, config):
} }
hass.data[DATA_ENTITY_GROUP_LOOKUP] = { hass.data[DATA_ENTITY_GROUP_LOOKUP] = {
EVENT_KEY_COMMAND: defaultdict(list), EVENT_KEY_COMMAND: defaultdict(list),
EVENT_KEY_SENSOR: defaultdict(list),
} }
# Allow platform to specify function to register new unknown devices # Allow platform to specify function to register new unknown devices

View file

@ -2,7 +2,7 @@
Support for Rflink sensors. Support for Rflink sensors.
For more details about this platform, please refer to the documentation at For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/light.rflink/ https://home-assistant.io/components/sensor.rflink/
""" """
import logging import logging

View file

@ -0,0 +1,178 @@
"""
Test for RFlink sensor components.
Test setup of rflink sensor component/platform. Verify manual and
automatic sensor creation.
"""
from datetime import timedelta
from unittest.mock import patch
from ..test_rflink import mock_rflink
from homeassistant.components.rflink import (
CONF_RECONNECT_INTERVAL)
import homeassistant.core as ha
from homeassistant.const import (
EVENT_STATE_CHANGED, STATE_ON, STATE_OFF, STATE_UNAVAILABLE)
import homeassistant.util.dt as dt_util
from tests.common import async_fire_time_changed
DOMAIN = 'binary_sensor'
CONFIG = {
'rflink': {
'port': '/dev/ttyABC0',
'ignore_devices': ['ignore_wildcard_*', 'ignore_sensor'],
},
DOMAIN: {
'platform': 'rflink',
'devices': {
'test': {
'name': 'test',
'device_class': 'door',
},
'test2': {
'name': 'test2',
'device_class': 'motion',
'off_delay': 30,
'force_update': True,
},
},
},
}
async def test_default_setup(hass, monkeypatch):
"""Test all basic functionality of the rflink sensor component."""
# setup mocking rflink module
event_callback, create, _, disconnect_callback = await mock_rflink(
hass, CONFIG, DOMAIN, monkeypatch)
# make sure arguments are passed
assert create.call_args_list[0][1]['ignore']
# test default state of sensor loaded from config
config_sensor = hass.states.get('binary_sensor.test')
assert config_sensor
assert config_sensor.state == STATE_OFF
assert config_sensor.attributes['device_class'] == 'door'
# test event for config sensor
event_callback({
'id': 'test',
'command': 'on',
})
await hass.async_block_till_done()
assert hass.states.get('binary_sensor.test').state == STATE_ON
# test event for config sensor
event_callback({
'id': 'test',
'command': 'off',
})
await hass.async_block_till_done()
assert hass.states.get('binary_sensor.test').state == STATE_OFF
async def test_entity_availability(hass, monkeypatch):
"""If Rflink device is disconnected, entities should become unavailable."""
# Make sure Rflink mock does not 'recover' to quickly from the
# disconnect or else the unavailability cannot be measured
config = CONFIG
failures = [True, True]
config[CONF_RECONNECT_INTERVAL] = 60
# Create platform and entities
event_callback, create, _, disconnect_callback = await mock_rflink(
hass, config, DOMAIN, monkeypatch, failures=failures)
# Entities are available by default
assert hass.states.get('binary_sensor.test').state == STATE_OFF
# Mock a disconnect of the Rflink device
disconnect_callback()
# Wait for dispatch events to propagate
await hass.async_block_till_done()
# Entity should be unavailable
assert hass.states.get('binary_sensor.test').state == STATE_UNAVAILABLE
# Reconnect the Rflink device
disconnect_callback()
# Wait for dispatch events to propagate
await hass.async_block_till_done()
# Entities should be available again
assert hass.states.get('binary_sensor.test').state == STATE_OFF
async def test_off_delay(hass, monkeypatch):
"""Test off_delay option."""
# setup mocking rflink module
event_callback, create, _, disconnect_callback = await mock_rflink(
hass, CONFIG, DOMAIN, monkeypatch)
# make sure arguments are passed
assert create.call_args_list[0][1]['ignore']
events = []
on_event = {
'id': 'test2',
'command': 'on',
}
@ha.callback
def callback(event):
"""Verify event got called."""
events.append(event)
hass.bus.async_listen(EVENT_STATE_CHANGED, callback)
now = dt_util.utcnow()
# fake time and turn on sensor
future = now + timedelta(seconds=0)
with patch(('homeassistant.helpers.event.'
'dt_util.utcnow'), return_value=future):
async_fire_time_changed(hass, future)
event_callback(on_event)
await hass.async_block_till_done()
state = hass.states.get('binary_sensor.test2')
assert state.state == STATE_ON
assert len(events) == 1
# fake time and turn on sensor again
future = now + timedelta(seconds=15)
with patch(('homeassistant.helpers.event.'
'dt_util.utcnow'), return_value=future):
async_fire_time_changed(hass, future)
event_callback(on_event)
await hass.async_block_till_done()
state = hass.states.get('binary_sensor.test2')
assert state.state == STATE_ON
assert len(events) == 2
# fake time and verify sensor still on (de-bounce)
future = now + timedelta(seconds=35)
with patch(('homeassistant.helpers.event.'
'dt_util.utcnow'), return_value=future):
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
state = hass.states.get('binary_sensor.test2')
assert state.state == STATE_ON
assert len(events) == 2
# fake time and verify sensor is off
future = now + timedelta(seconds=45)
with patch(('homeassistant.helpers.event.'
'dt_util.utcnow'), return_value=future):
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
state = hass.states.get('binary_sensor.test2')
assert state.state == STATE_OFF
assert len(events) == 3