""" Support for RFXtrx binary sensors. Lighting4 devices (sensors based on PT2262 encoder) are supported and tested. Other types may need some work. """ import logging import voluptuous as vol from homeassistant.components import rfxtrx from homeassistant.util import slugify from homeassistant.util import dt as dt_util from homeassistant.helpers import config_validation as cv from homeassistant.helpers import event as evt from homeassistant.components.binary_sensor import BinarySensorDevice from homeassistant.components.rfxtrx import ( ATTR_AUTOMATIC_ADD, ATTR_NAME, ATTR_OFF_DELAY, ATTR_FIREEVENT, ATTR_DATA_BITS, CONF_DEVICES ) from homeassistant.const import ( CONF_DEVICE_CLASS, CONF_COMMAND_ON, CONF_COMMAND_OFF ) DEPENDENCIES = ["rfxtrx"] _LOGGER = logging.getLogger(__name__) PLATFORM_SCHEMA = vol.Schema({ vol.Required("platform"): rfxtrx.DOMAIN, vol.Optional(CONF_DEVICES, default={}): vol.All( dict, rfxtrx.valid_binary_sensor), vol.Optional(ATTR_AUTOMATIC_ADD, default=False): cv.boolean, }, extra=vol.ALLOW_EXTRA) def setup_platform(hass, config, add_devices_callback, discovery_info=None): """Setup the Binary Sensor platform to rfxtrx.""" import RFXtrx as rfxtrxmod sensors = [] for packet_id, entity in config['devices'].items(): event = rfxtrx.get_rfx_object(packet_id) device_id = slugify(event.device.id_string.lower()) if device_id in rfxtrx.RFX_DEVICES: continue if entity[ATTR_DATA_BITS] is not None: _LOGGER.info("Masked device id: %s", rfxtrx.get_pt2262_deviceid(device_id, entity[ATTR_DATA_BITS])) _LOGGER.info("Add %s rfxtrx.binary_sensor (class %s)", entity[ATTR_NAME], entity[CONF_DEVICE_CLASS]) device = RfxtrxBinarySensor(event, entity[ATTR_NAME], entity[CONF_DEVICE_CLASS], entity[ATTR_FIREEVENT], entity[ATTR_OFF_DELAY], entity[ATTR_DATA_BITS], entity[CONF_COMMAND_ON], entity[CONF_COMMAND_OFF]) device.hass = hass device.is_lighting4 = (packet_id[2:4] == '13') sensors.append(device) rfxtrx.RFX_DEVICES[device_id] = device add_devices_callback(sensors) # pylint: disable=too-many-branches def binary_sensor_update(event): """Callback for control updates from the RFXtrx gateway.""" if not isinstance(event, rfxtrxmod.ControlEvent): return device_id = slugify(event.device.id_string.lower()) if device_id in rfxtrx.RFX_DEVICES: sensor = rfxtrx.RFX_DEVICES[device_id] else: sensor = rfxtrx.get_pt2262_device(device_id) if sensor is None: # Add the entity if not exists and automatic_add is True if not config[ATTR_AUTOMATIC_ADD]: return poss_dev = rfxtrx.find_possible_pt2262_device(device_id) if poss_dev is not None: poss_id = slugify(poss_dev.event.device.id_string.lower()) _LOGGER.info("Found possible matching deviceid %s.", poss_id) pkt_id = "".join("{0:02x}".format(x) for x in event.data) sensor = RfxtrxBinarySensor(event, pkt_id) sensor.hass = hass sensor.is_lighting4 = (pkt_id[2:4] == '13') rfxtrx.RFX_DEVICES[device_id] = sensor add_devices_callback([sensor]) _LOGGER.info("Added binary sensor %s " "(Device_id: %s Class: %s Sub: %s)", pkt_id, slugify(event.device.id_string.lower()), event.device.__class__.__name__, event.device.subtype) elif not isinstance(sensor, RfxtrxBinarySensor): return else: _LOGGER.info("Binary sensor update " "(Device_id: %s Class: %s Sub: %s)", slugify(event.device.id_string.lower()), event.device.__class__.__name__, event.device.subtype) if sensor.is_lighting4: if sensor.data_bits is not None: cmd = rfxtrx.get_pt2262_cmd(device_id, sensor.data_bits) sensor.apply_cmd(int(cmd, 16)) else: sensor.update_state(True) else: rfxtrx.apply_received_command(event) if (sensor.is_on and sensor.off_delay is not None and sensor.delay_listener is None): def off_delay_listener(now): """Switch device off after a delay.""" sensor.delay_listener = None sensor.update_state(False) sensor.delay_listener = evt.track_point_in_time( hass, off_delay_listener, dt_util.utcnow() + sensor.off_delay ) # Subscribe to main rfxtrx events if binary_sensor_update not in rfxtrx.RECEIVED_EVT_SUBSCRIBERS: rfxtrx.RECEIVED_EVT_SUBSCRIBERS.append(binary_sensor_update) # pylint: disable=too-many-instance-attributes,too-many-arguments class RfxtrxBinarySensor(BinarySensorDevice): """An Rfxtrx binary sensor.""" def __init__(self, event, name, device_class=None, should_fire=False, off_delay=None, data_bits=None, cmd_on=None, cmd_off=None): """Initialize the sensor.""" self.event = event self._name = name self._should_fire_event = should_fire self._device_class = device_class self._off_delay = off_delay self._state = False self.delay_listener = None self._data_bits = data_bits self._cmd_on = cmd_on self._cmd_off = cmd_off if data_bits is not None: self._masked_id = rfxtrx.get_pt2262_deviceid( event.device.id_string.lower(), data_bits) def __str__(self): """Return the name of the sensor.""" return self._name @property def name(self): """Return the device name.""" return self._name @property def masked_id(self): """Return the masked device id (isolated address bits).""" return self._masked_id @property def data_bits(self): """Return the number of data bits.""" return self._data_bits @property def cmd_on(self): """Return the value of the 'On' command.""" return self._cmd_on @property def cmd_off(self): """Return the value of the 'Off' command.""" return self._cmd_off @property def should_poll(self): """No polling needed.""" return False @property def should_fire_event(self): """Return is the device must fire event.""" return self._should_fire_event @property def device_class(self): """Return the sensor class.""" return self._device_class @property def off_delay(self): """Return the off_delay attribute value.""" return self._off_delay @property def is_on(self): """Return true if the sensor state is True.""" return self._state def apply_cmd(self, cmd): """Apply a command for updating the state.""" if cmd == self.cmd_on: self.update_state(True) elif cmd == self.cmd_off: self.update_state(False) def update_state(self, state): """Update the state of the device.""" self._state = state self.schedule_update_ha_state()