Switch rfxtrx to dispatcher (#37271)

* Switch to dispatcher

* Adjust tests for dispatcher

* Store device in sensor

* Move state application into binary sensor entity class

* Move more specifics into per platform classes

* Should not apply event in init of sensor

* Switch to call_later

* Make apply_event public

* No point in slugifying debug logs

* Adjust error in off delay and event

* Make sure we match with masked id
This commit is contained in:
Joakim Plate 2020-07-06 00:10:26 +02:00 committed by GitHub
parent 0067b6a84d
commit 2088092f7c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 255 additions and 170 deletions

View file

@ -44,6 +44,7 @@ CONF_DUMMY = "dummy"
CONF_DEBUG = "debug"
CONF_OFF_DELAY = "off_delay"
EVENT_BUTTON_PRESSED = "button_pressed"
SIGNAL_EVENT = f"{DOMAIN}_event"
DATA_TYPES = OrderedDict(
[
@ -79,7 +80,6 @@ DATA_TYPES = OrderedDict(
]
)
RECEIVED_EVT_SUBSCRIBERS = []
RFX_DEVICES = {}
_LOGGER = logging.getLogger(__name__)
DATA_RFXOBJECT = "rfxobject"
@ -120,8 +120,7 @@ def setup(hass, config):
)
# Callback to HA registered components.
for subscriber in RECEIVED_EVT_SUBSCRIBERS:
subscriber(event)
hass.helpers.dispatcher.dispatcher_send(SIGNAL_EVENT, event)
device = config[DOMAIN].get(ATTR_DEVICE)
host = config[DOMAIN].get(CONF_HOST)
@ -301,69 +300,19 @@ def get_new_device(event, config, device):
return new_device
def apply_received_command(event):
"""Apply command from rfxtrx."""
device_id = slugify(event.device.id_string.lower())
# Check if entity exists or previously added automatically
if device_id not in RFX_DEVICES:
return
_LOGGER.debug(
"Device_id: %s device_update. Command: %s", device_id, event.values["Command"],
def fire_command_event(hass, entity_id, command):
"""Fire a command event."""
hass.bus.fire(
EVENT_BUTTON_PRESSED, {ATTR_ENTITY_ID: entity_id, ATTR_STATE: command.lower()}
)
_LOGGER.debug(
"Rfxtrx fired event: (event_type: %s, %s: %s, %s: %s)",
EVENT_BUTTON_PRESSED,
ATTR_ENTITY_ID,
entity_id,
ATTR_STATE,
command.lower(),
)
if event.values["Command"] in [
"On",
"Off",
"Up",
"Down",
"Stop",
"Open (inline relay)",
"Close (inline relay)",
"Stop (inline relay)",
]:
# Update the rfxtrx device state
command = event.values["Command"]
if command in [
"On",
"Up",
"Stop",
"Open (inline relay)",
"Stop (inline relay)",
]:
is_on = True
elif command in ["Off", "Down", "Close (inline relay)"]:
is_on = False
RFX_DEVICES[device_id].update_state(is_on)
elif (
hasattr(RFX_DEVICES[device_id], "brightness")
and event.values["Command"] == "Set level"
):
_brightness = event.values["Dim level"] * 255 // 100
# Update the rfxtrx device state
is_on = _brightness > 0
RFX_DEVICES[device_id].update_state(is_on, _brightness)
# Fire event
if RFX_DEVICES[device_id].should_fire_event:
RFX_DEVICES[device_id].hass.bus.fire(
EVENT_BUTTON_PRESSED,
{
ATTR_ENTITY_ID: RFX_DEVICES[device_id].entity_id,
ATTR_STATE: event.values["Command"].lower(),
},
)
_LOGGER.debug(
"Rfxtrx fired event: (event_type: %s, %s: %s, %s: %s)",
EVENT_BUTTON_PRESSED,
ATTR_ENTITY_ID,
RFX_DEVICES[device_id].entity_id,
ATTR_STATE,
event.values["Command"].lower(),
)
class RfxtrxDevice(Entity):
@ -379,13 +328,7 @@ class RfxtrxDevice(Entity):
self._event = event
self._state = datas[ATTR_STATE]
self._should_fire_event = datas[ATTR_FIRE_EVENT]
self._brightness = 0
self._unique_id = f"{slugify(self._event.device.type_string.lower())}_{slugify(self._event.device.id_string.lower())}"
self.added_to_hass = False
async def async_added_to_hass(self):
"""Subscribe RFXtrx events."""
self.added_to_hass = True
@property
def should_poll(self):
@ -417,16 +360,8 @@ class RfxtrxDevice(Entity):
"""Return unique identifier of remote device."""
return self._unique_id
def turn_off(self, **kwargs):
"""Turn the device off."""
self._send_command("turn_off")
def update_state(self, state, brightness=0):
"""Update det state of the device."""
self._state = state
self._brightness = brightness
if self.added_to_hass:
self.schedule_update_ha_state()
def _apply_event(self, event):
"""Apply a received event."""
def _send_command(self, command, brightness=0):
if not self._event:
@ -447,7 +382,6 @@ class RfxtrxDevice(Entity):
for _ in range(self.signal_repetitions):
self._event.device.send_off(rfx_object.transport)
self._state = False
self._brightness = 0
elif command == "roll_up":
for _ in range(self.signal_repetitions):
@ -464,5 +398,5 @@ class RfxtrxDevice(Entity):
self._event.device.send_stop(rfx_object.transport)
self._state = True
if self.added_to_hass:
if self.hass:
self.schedule_update_ha_state()

View file

@ -16,7 +16,7 @@ from homeassistant.const import (
CONF_NAME,
)
from homeassistant.helpers import config_validation as cv, event as evt
from homeassistant.util import dt as dt_util, slugify
from homeassistant.util import slugify
from . import (
ATTR_NAME,
@ -25,15 +25,16 @@ from . import (
CONF_DEVICES,
CONF_FIRE_EVENT,
CONF_OFF_DELAY,
RECEIVED_EVT_SUBSCRIBERS,
RFX_DEVICES,
apply_received_command,
SIGNAL_EVENT,
find_possible_pt2262_device,
fire_command_event,
get_pt2262_cmd,
get_pt2262_device,
get_pt2262_deviceid,
get_rfx_object,
)
from .const import COMMAND_OFF_LIST, COMMAND_ON_LIST
_LOGGER = logging.getLogger(__name__)
@ -120,6 +121,7 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
pkt_id = "".join(f"{x:02x}" for x in event.data)
sensor = RfxtrxBinarySensor(event, pkt_id)
sensor.apply_event(event)
RFX_DEVICES[device_id] = sensor
add_entities([sensor])
_LOGGER.info(
@ -130,43 +132,8 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
event.device.subtype,
)
elif not isinstance(sensor, RfxtrxBinarySensor):
return
else:
_LOGGER.debug(
"Binary sensor update (Device ID: %s Class: %s Sub: %s)",
slugify(event.device.id_string.lower()),
event.device.__class__.__name__,
event.device.subtype,
)
if sensor.is_lighting4:
if sensor.data_bits is not None:
cmd = get_pt2262_cmd(device_id, sensor.data_bits)
sensor.apply_cmd(int(cmd, 16))
else:
sensor.update_state(True)
else:
apply_received_command(event)
if (
sensor.is_on
and sensor.off_delay is not None
and sensor.delay_listener is None
):
def off_delay_listener(now):
"""Switch device off after a delay."""
sensor.delay_listener = None
sensor.update_state(False)
sensor.delay_listener = evt.track_point_in_time(
hass, off_delay_listener, dt_util.utcnow() + sensor.off_delay
)
# Subscribe to main RFXtrx events
if binary_sensor_update not in RECEIVED_EVT_SUBSCRIBERS:
RECEIVED_EVT_SUBSCRIBERS.append(binary_sensor_update)
hass.helpers.dispatcher.dispatcher_connect(SIGNAL_EVENT, binary_sensor_update)
class RfxtrxBinarySensor(BinarySensorEntity):
@ -204,6 +171,35 @@ class RfxtrxBinarySensor(BinarySensorEntity):
else:
self._masked_id = None
async def async_added_to_hass(self):
"""Restore RFXtrx switch device state (ON/OFF)."""
await super().async_added_to_hass()
def _handle_event(event):
"""Check if event applies to me and update."""
if self._masked_id:
masked_id = get_pt2262_deviceid(event.device.id_string, self._data_bits)
if masked_id != self._masked_id:
return
else:
if event.device.id_string != self.event.device.id_string:
return
_LOGGER.debug(
"Binary sensor update (Device ID: %s Class: %s Sub: %s)",
event.device.id_string,
event.device.__class__.__name__,
event.device.subtype,
)
self.apply_event(event)
self.async_on_remove(
self.hass.helpers.dispatcher.async_dispatcher_connect(
SIGNAL_EVENT, _handle_event
)
)
@property
def name(self):
"""Return the device name."""
@ -259,15 +255,45 @@ class RfxtrxBinarySensor(BinarySensorEntity):
"""Return unique identifier of remote device."""
return self._unique_id
def apply_cmd(self, cmd):
"""Apply a command for updating the state."""
if cmd == self.cmd_on:
self.update_state(True)
elif cmd == self.cmd_off:
self.update_state(False)
def _apply_event_lighting4(self, event):
"""Apply event for a lighting 4 device."""
if self.data_bits is not None:
cmd = get_pt2262_cmd(event.device.id_string, self.data_bits)
cmd = int(cmd, 16)
if cmd == self.cmd_on:
self._state = True
elif cmd == self.cmd_off:
self._state = False
else:
self._state = True
def _apply_event_standard(self, event):
if event.values["Command"] in COMMAND_ON_LIST:
self._state = True
elif event.values["Command"] in COMMAND_OFF_LIST:
self._state = False
def apply_event(self, event):
"""Apply command from rfxtrx."""
if self.is_lighting4:
self._apply_event_lighting4(event)
else:
self._apply_event_standard(event)
def update_state(self, state):
"""Update the state of the device."""
self._state = state
if self.hass:
self.schedule_update_ha_state()
if self.should_fire_event:
fire_command_event(self.hass, self.entity_id, event.values["Command"])
if self.is_on and self.off_delay is not None and self.delay_listener is None:
def off_delay_listener(now):
"""Switch device off after a delay."""
self.delay_listener = None
self._state = False
if self.hass:
self.schedule_update_ha_state()
self.delay_listener = evt.call_later(
self.hass, self.off_delay.total_seconds(), off_delay_listener
)

View file

@ -0,0 +1,16 @@
"""Constants for RFXtrx integration."""
COMMAND_ON_LIST = [
"On",
"Up",
"Stop",
"Open (inline relay)",
"Stop (inline relay)",
]
COMMAND_OFF_LIST = [
"Off",
"Down",
"Close (inline relay)",
]

View file

@ -13,12 +13,13 @@ from . import (
CONF_FIRE_EVENT,
CONF_SIGNAL_REPETITIONS,
DEFAULT_SIGNAL_REPETITIONS,
RECEIVED_EVT_SUBSCRIBERS,
SIGNAL_EVENT,
RfxtrxDevice,
apply_received_command,
fire_command_event,
get_devices_from_config,
get_new_device,
)
from .const import COMMAND_OFF_LIST, COMMAND_ON_LIST
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
@ -54,13 +55,11 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
new_device = get_new_device(event, config, RfxtrxCover)
if new_device:
new_device.apply_event(event)
add_entities([new_device])
apply_received_command(event)
# Subscribe to main RFXtrx events
if cover_update not in RECEIVED_EVT_SUBSCRIBERS:
RECEIVED_EVT_SUBSCRIBERS.append(cover_update)
hass.helpers.dispatcher.dispatcher_connect(SIGNAL_EVENT, cover_update)
class RfxtrxCover(RfxtrxDevice, CoverEntity, RestoreEntity):
@ -74,6 +73,19 @@ class RfxtrxCover(RfxtrxDevice, CoverEntity, RestoreEntity):
if old_state is not None:
self._state = old_state.state == STATE_OPEN
def _handle_event(event):
"""Check if event applies to me and update."""
if event.device.id_string != self._event.device.id_string:
return
self.apply_event(event)
self.async_on_remove(
self.hass.helpers.dispatcher.async_dispatcher_connect(
SIGNAL_EVENT, _handle_event
)
)
@property
def should_poll(self):
"""Return the polling state. No polling available in RFXtrx cover."""
@ -95,3 +107,15 @@ class RfxtrxCover(RfxtrxDevice, CoverEntity, RestoreEntity):
def stop_cover(self, **kwargs):
"""Stop the cover."""
self._send_command("stop_roll")
def apply_event(self, event):
"""Apply command from rfxtrx."""
if event.values["Command"] in COMMAND_ON_LIST:
self._state = True
elif event.values["Command"] in COMMAND_OFF_LIST:
self._state = False
if self.hass:
self.schedule_update_ha_state()
if self.should_fire_event:
fire_command_event(self.hass, self.entity_id, event.values["Command"])

View file

@ -20,12 +20,13 @@ from . import (
CONF_FIRE_EVENT,
CONF_SIGNAL_REPETITIONS,
DEFAULT_SIGNAL_REPETITIONS,
RECEIVED_EVT_SUBSCRIBERS,
SIGNAL_EVENT,
RfxtrxDevice,
apply_received_command,
fire_command_event,
get_devices_from_config,
get_new_device,
)
from .const import COMMAND_OFF_LIST, COMMAND_ON_LIST
_LOGGER = logging.getLogger(__name__)
@ -64,18 +65,18 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
new_device = get_new_device(event, config, RfxtrxLight)
if new_device:
new_device.apply_event(event)
add_entities([new_device])
apply_received_command(event)
# Subscribe to main RFXtrx events
if light_update not in RECEIVED_EVT_SUBSCRIBERS:
RECEIVED_EVT_SUBSCRIBERS.append(light_update)
hass.helpers.dispatcher.dispatcher_connect(SIGNAL_EVENT, light_update)
class RfxtrxLight(RfxtrxDevice, LightEntity, RestoreEntity):
"""Representation of a RFXtrx light."""
_brightness = 0
async def async_added_to_hass(self):
"""Restore RFXtrx device state (ON/OFF)."""
await super().async_added_to_hass()
@ -91,6 +92,19 @@ class RfxtrxLight(RfxtrxDevice, LightEntity, RestoreEntity):
):
self._brightness = int(old_state.attributes[ATTR_BRIGHTNESS])
def _handle_event(event):
"""Check if event applies to me and update."""
if event.device.id_string != self._event.device.id_string:
return
self.apply_event(event)
self.async_on_remove(
self.hass.helpers.dispatcher.async_dispatcher_connect(
SIGNAL_EVENT, _handle_event
)
)
@property
def brightness(self):
"""Return the brightness of this light between 0..255."""
@ -111,3 +125,23 @@ class RfxtrxLight(RfxtrxDevice, LightEntity, RestoreEntity):
self._brightness = brightness
_brightness = brightness * 100 // 255
self._send_command("dim", _brightness)
def turn_off(self, **kwargs):
"""Turn the device off."""
self._brightness = 0
self._send_command("turn_off")
def apply_event(self, event):
"""Apply command from rfxtrx."""
if event.values["Command"] in COMMAND_ON_LIST:
self._state = True
elif event.values["Command"] in COMMAND_OFF_LIST:
self._state = False
elif event.values["Command"] == "Set level":
self._brightness = event.values["Dim level"] * 255 // 100
self._state = self._brightness > 0
if self.hass:
self.schedule_update_ha_state()
if self.should_fire_event:
fire_command_event(self.hass, self.entity_id, event.values["Command"])

View file

@ -18,8 +18,8 @@ from . import (
CONF_DEVICES,
CONF_FIRE_EVENT,
DATA_TYPES,
RECEIVED_EVT_SUBSCRIBERS,
RFX_DEVICES,
SIGNAL_EVENT,
get_rfx_object,
)
@ -83,22 +83,6 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
device_id = f"sensor_{slugify(event.device.id_string.lower())}"
if device_id in RFX_DEVICES:
sensors = RFX_DEVICES[device_id]
for data_type in sensors:
# Some multi-sensor devices send individual messages for each
# of their sensors. Update only if event contains the
# right data_type for the sensor.
if data_type not in event.values:
continue
sensor = sensors[data_type]
sensor.event = event
if sensor.hass:
sensor.schedule_update_ha_state()
# Fire event
if sensor.should_fire_event:
sensor.hass.bus.fire(
"signal_received", {ATTR_ENTITY_ID: sensor.entity_id}
)
return
# Add entity if not exist and the automatic_add is True
@ -114,13 +98,14 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
data_type = _data_type
break
new_sensor = RfxtrxSensor(event, event.device, pkt_id, data_type)
new_sensor.apply_event(event)
sub_sensors = {}
sub_sensors[new_sensor.data_type] = new_sensor
RFX_DEVICES[device_id] = sub_sensors
add_entities([new_sensor])
if sensor_update not in RECEIVED_EVT_SUBSCRIBERS:
RECEIVED_EVT_SUBSCRIBERS.append(sensor_update)
# Subscribe to main RFXtrx events
hass.helpers.dispatcher.dispatcher_connect(SIGNAL_EVENT, sensor_update)
class RfxtrxSensor(Entity):
@ -129,12 +114,43 @@ class RfxtrxSensor(Entity):
def __init__(self, event, device, name, data_type, should_fire_event=False):
"""Initialize the sensor."""
self.event = event
self._device = device
self._name = name
self.should_fire_event = should_fire_event
self.data_type = data_type
self._unit_of_measurement = DATA_TYPES.get(data_type, "")
self._unique_id = f"{slugify(device.type_string.lower())}_{slugify(device.id_string.lower())}_{slugify(data_type)}"
async def async_added_to_hass(self):
"""Restore RFXtrx switch device state (ON/OFF)."""
await super().async_added_to_hass()
def _handle_event(event):
"""Check if event applies to me and update."""
if not isinstance(event, SensorEvent):
return
if event.device.id_string != self._device.id_string:
return
if self.data_type not in event.values:
return
_LOGGER.debug(
"Sensor update (Device ID: %s Class: %s Sub: %s)",
event.device.id_string,
event.device.__class__.__name__,
event.device.subtype,
)
self.apply_event(event)
self.async_on_remove(
self.hass.helpers.dispatcher.async_dispatcher_connect(
SIGNAL_EVENT, _handle_event
)
)
def __str__(self):
"""Return the name of the sensor."""
return self._name
@ -167,3 +183,11 @@ class RfxtrxSensor(Entity):
def unique_id(self):
"""Return unique identifier of remote device."""
return self._unique_id
def apply_event(self, event):
"""Apply command from rfxtrx."""
self.event = event
if self.hass:
self.schedule_update_ha_state()
if self.should_fire_event:
self.hass.bus.fire("signal_received", {ATTR_ENTITY_ID: self.entity_id})

View file

@ -15,12 +15,13 @@ from . import (
CONF_FIRE_EVENT,
CONF_SIGNAL_REPETITIONS,
DEFAULT_SIGNAL_REPETITIONS,
RECEIVED_EVT_SUBSCRIBERS,
SIGNAL_EVENT,
RfxtrxDevice,
apply_received_command,
fire_command_event,
get_devices_from_config,
get_new_device,
)
from .const import COMMAND_OFF_LIST, COMMAND_ON_LIST
_LOGGER = logging.getLogger(__name__)
@ -59,13 +60,11 @@ def setup_platform(hass, config, add_entities_callback, discovery_info=None):
new_device = get_new_device(event, config, RfxtrxSwitch)
if new_device:
new_device.apply_event(event)
add_entities_callback([new_device])
apply_received_command(event)
# Subscribe to main RFXtrx events
if switch_update not in RECEIVED_EVT_SUBSCRIBERS:
RECEIVED_EVT_SUBSCRIBERS.append(switch_update)
hass.helpers.dispatcher.dispatcher_connect(SIGNAL_EVENT, switch_update)
class RfxtrxSwitch(RfxtrxDevice, SwitchEntity, RestoreEntity):
@ -79,6 +78,35 @@ class RfxtrxSwitch(RfxtrxDevice, SwitchEntity, RestoreEntity):
if old_state is not None:
self._state = old_state.state == STATE_ON
def _handle_event(event):
"""Check if event applies to me and update."""
if event.device.id_string != self._event.device.id_string:
return
self.apply_event(event)
self.async_on_remove(
self.hass.helpers.dispatcher.async_dispatcher_connect(
SIGNAL_EVENT, _handle_event
)
)
def apply_event(self, event):
"""Apply command from rfxtrx."""
if event.values["Command"] in COMMAND_ON_LIST:
self._state = True
elif event.values["Command"] in COMMAND_OFF_LIST:
self._state = False
if self.hass:
self.schedule_update_ha_state()
if self.should_fire_event:
fire_command_event(self.hass, self.entity_id, event.values["Command"])
def turn_on(self, **kwargs):
"""Turn the device on."""
self._send_command("turn_on")
def turn_off(self, **kwargs):
"""Turn the device off."""
self._send_command("turn_off")

View file

@ -4,6 +4,6 @@ from homeassistant.components import rfxtrx
async def _signal_event(hass, packet_id):
event = rfxtrx.get_rfx_object(packet_id)
await hass.async_add_executor_job(rfxtrx.RECEIVED_EVT_SUBSCRIBERS[0], event)
hass.helpers.dispatcher.async_dispatcher_send(rfxtrx.SIGNAL_EVENT, event)
await hass.async_block_till_done()
return event

View file

@ -59,7 +59,6 @@ async def rfxtrx_cleanup():
):
yield
rfxtrx_core.RECEIVED_EVT_SUBSCRIBERS.clear()
rfxtrx_core.RFX_DEVICES.clear()