Make sensor and binary_sensor inherit from base class (#37946)

* Make sensor and binary_sensor inherit from base class

* Drop several pointless public properties

* Make sure base function has same parameters

* Drop pass

* Missed one

* Adjust inherit order
This commit is contained in:
Joakim Plate 2020-07-18 13:43:38 +02:00 committed by GitHub
parent b030ed1adf
commit f173805c2f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 68 additions and 216 deletions

View file

@ -23,6 +23,7 @@ from homeassistant.const import (
UNIT_PERCENTAGE,
UV_INDEX,
)
from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.restore_state import RestoreEntity
@ -343,35 +344,36 @@ def get_device_id(device, data_bits=None):
return (f"{device.packettype:x}", f"{device.subtype:x}", id_string)
class RfxtrxDevice(RestoreEntity):
class RfxtrxEntity(RestoreEntity):
"""Represents a Rfxtrx device.
Contains the common logic for Rfxtrx lights and switches.
"""
def __init__(self, device, device_id, signal_repetitions, event=None):
def __init__(self, device, device_id, event=None):
"""Initialize the device."""
self.signal_repetitions = signal_repetitions
self._name = f"{device.type_string} {device.id_string}"
self._device = device
self._event = None
self._state = None
self._event = event
self._device_id = device_id
self._unique_id = "_".join(x for x in self._device_id)
if event:
self._apply_event(event)
async def async_added_to_hass(self):
"""Restore RFXtrx device state (ON/OFF)."""
if self._event:
return
self._apply_event(self._event)
else:
old_state = await self.async_get_last_state()
if old_state is not None:
event = old_state.attributes.get(ATTR_EVENT)
if event:
self._apply_event(get_rfx_object(event))
old_state = await self.async_get_last_state()
if old_state is not None:
event = old_state.attributes.get(ATTR_EVENT)
if event:
self._apply_event(get_rfx_object(event))
self.async_on_remove(
self.hass.helpers.dispatcher.async_dispatcher_connect(
SIGNAL_EVENT, self._handle_event
)
)
@property
def should_poll(self):
@ -390,11 +392,6 @@ class RfxtrxDevice(RestoreEntity):
return None
return {ATTR_EVENT: "".join(f"{x:02x}" for x in self._event.data)}
@property
def is_on(self):
"""Return true if device is on."""
return self._state
@property
def assumed_state(self):
"""Return true if unable to access real state of entity."""
@ -418,6 +415,23 @@ class RfxtrxDevice(RestoreEntity):
"""Apply a received event."""
self._event = event
@callback
def _handle_event(self, event, device_id):
"""Handle a reception of data, overridden by other classes."""
class RfxtrxCommandEntity(RfxtrxEntity):
"""Represents a Rfxtrx device.
Contains the common logic for Rfxtrx lights and switches.
"""
def __init__(self, device, device_id, signal_repetitions=1, event=None):
"""Initialzie a switch or light device."""
super().__init__(device, device_id, event=event)
self.signal_repetitions = signal_repetitions
self._state = None
def _send_command(self, command, brightness=0):
rfx_object = self.hass.data[DATA_RFXOBJECT]

View file

@ -12,21 +12,19 @@ from homeassistant.const import (
)
from homeassistant.core import callback
from homeassistant.helpers import event as evt
from homeassistant.helpers.restore_state import RestoreEntity
from . import (
CONF_AUTOMATIC_ADD,
CONF_DATA_BITS,
CONF_OFF_DELAY,
DOMAIN,
SIGNAL_EVENT,
RfxtrxEntity,
find_possible_pt2262_device,
get_device_id,
get_pt2262_cmd,
get_rfx_object,
)
from .const import (
ATTR_EVENT,
COMMAND_OFF_LIST,
COMMAND_ON_LIST,
DATA_RFXTRX_CONFIG,
@ -107,7 +105,7 @@ async def async_setup_entry(
)
class RfxtrxBinarySensor(BinarySensorEntity, RestoreEntity):
class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
"""A representation of a RFXtrx binary sensor."""
def __init__(
@ -122,72 +120,15 @@ class RfxtrxBinarySensor(BinarySensorEntity, RestoreEntity):
event=None,
):
"""Initialize the RFXtrx sensor."""
self._event = None
self._device = device
self._name = f"{device.type_string} {device.id_string}"
super().__init__(device, device_id, event=event)
self._device_class = device_class
self._data_bits = data_bits
self._off_delay = off_delay
self._state = False
self.delay_listener = None
self._state = None
self._delay_listener = None
self._cmd_on = cmd_on
self._cmd_off = cmd_off
self._device_id = device_id
self._unique_id = "_".join(x for x in self._device_id)
if event:
self._apply_event(event)
async def async_added_to_hass(self):
"""Restore RFXtrx switch device state (ON/OFF)."""
await super().async_added_to_hass()
if self._event is None:
old_state = await self.async_get_last_state()
if old_state is not None:
event = old_state.attributes.get(ATTR_EVENT)
if event:
self._apply_event(get_rfx_object(event))
self.async_on_remove(
self.hass.helpers.dispatcher.async_dispatcher_connect(
SIGNAL_EVENT, self._handle_event
)
)
@property
def name(self):
"""Return the device name."""
return self._name
@property
def device_state_attributes(self):
"""Return the device state attributes."""
if not self._event:
return None
return {ATTR_EVENT: "".join(f"{x:02x}" for x in self._event.data)}
@property
def data_bits(self):
"""Return the number of data bits."""
return self._data_bits
@property
def cmd_on(self):
"""Return the value of the 'On' command."""
return self._cmd_on
@property
def cmd_off(self):
"""Return the value of the 'Off' command."""
return self._cmd_off
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def force_update(self) -> bool:
"""We should force updates. Repeated states have meaning."""
@ -198,38 +139,19 @@ class RfxtrxBinarySensor(BinarySensorEntity, RestoreEntity):
"""Return the sensor class."""
return self._device_class
@property
def off_delay(self):
"""Return the off_delay attribute value."""
return self._off_delay
@property
def is_on(self):
"""Return true if the sensor state is True."""
return self._state
@property
def unique_id(self):
"""Return unique identifier of remote device."""
return self._unique_id
@property
def device_info(self):
"""Return the device info."""
return {
"identifiers": {(DOMAIN, *self._device_id)},
"name": f"{self._device.type_string} {self._device.id_string}",
"model": self._device.type_string,
}
def _apply_event_lighting4(self, event):
"""Apply event for a lighting 4 device."""
if self.data_bits is not None:
cmd = get_pt2262_cmd(event.device.id_string, self.data_bits)
if self._data_bits is not None:
cmd = get_pt2262_cmd(event.device.id_string, self._data_bits)
cmd = int(cmd, 16)
if cmd == self.cmd_on:
if cmd == self._cmd_on:
self._state = True
elif cmd == self.cmd_off:
elif cmd == self._cmd_off:
self._state = False
else:
self._state = True
@ -242,7 +164,7 @@ class RfxtrxBinarySensor(BinarySensorEntity, RestoreEntity):
def _apply_event(self, event):
"""Apply command from rfxtrx."""
self._event = event
super()._apply_event(event)
if event.device.packettype == DEVICE_PACKET_TYPE_LIGHTING4:
self._apply_event_lighting4(event)
else:
@ -265,15 +187,15 @@ class RfxtrxBinarySensor(BinarySensorEntity, RestoreEntity):
self.async_write_ha_state()
if self.is_on and self.off_delay is not None and self.delay_listener is None:
if self.is_on and self._off_delay is not None and self._delay_listener is None:
@callback
def off_delay_listener(now):
"""Switch device off after a delay."""
self.delay_listener = None
self._delay_listener = None
self._state = False
self.async_write_ha_state()
self.delay_listener = evt.async_call_later(
self.hass, self.off_delay.total_seconds(), off_delay_listener
self._delay_listener = evt.async_call_later(
self.hass, self._off_delay.total_seconds(), off_delay_listener
)

View file

@ -4,14 +4,13 @@ import logging
from homeassistant.components.cover import CoverEntity
from homeassistant.const import CONF_DEVICES
from homeassistant.core import callback
from homeassistant.helpers.restore_state import RestoreEntity
from . import (
CONF_AUTOMATIC_ADD,
CONF_SIGNAL_REPETITIONS,
DEFAULT_SIGNAL_REPETITIONS,
SIGNAL_EVENT,
RfxtrxDevice,
RfxtrxCommandEntity,
get_device_id,
get_rfx_object,
)
@ -79,24 +78,9 @@ async def async_setup_entry(
hass.helpers.dispatcher.async_dispatcher_connect(SIGNAL_EVENT, cover_update)
class RfxtrxCover(RfxtrxDevice, CoverEntity, RestoreEntity):
class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
"""Representation of a RFXtrx cover."""
async def async_added_to_hass(self):
"""Restore RFXtrx cover device state (OPEN/CLOSE)."""
await super().async_added_to_hass()
self.async_on_remove(
self.hass.helpers.dispatcher.async_dispatcher_connect(
SIGNAL_EVENT, self._handle_event
)
)
@property
def should_poll(self):
"""Return the polling state. No polling available in RFXtrx cover."""
return False
@property
def is_closed(self):
"""Return if the cover is closed."""

View file

@ -16,7 +16,7 @@ from . import (
CONF_SIGNAL_REPETITIONS,
DEFAULT_SIGNAL_REPETITIONS,
SIGNAL_EVENT,
RfxtrxDevice,
RfxtrxCommandEntity,
get_device_id,
get_rfx_object,
)
@ -92,21 +92,11 @@ async def async_setup_entry(
hass.helpers.dispatcher.async_dispatcher_connect(SIGNAL_EVENT, light_update)
class RfxtrxLight(RfxtrxDevice, LightEntity):
class RfxtrxLight(RfxtrxCommandEntity, LightEntity):
"""Representation of a RFXtrx light."""
_brightness = 0
async def async_added_to_hass(self):
"""Restore RFXtrx device state (ON/OFF)."""
await super().async_added_to_hass()
self.async_on_remove(
self.hass.helpers.dispatcher.async_dispatcher_connect(
SIGNAL_EVENT, self._handle_event
)
)
@property
def brightness(self):
"""Return the brightness of this light between 0..255."""
@ -117,6 +107,11 @@ class RfxtrxLight(RfxtrxDevice, LightEntity):
"""Flag supported features."""
return SUPPORT_RFXTRX
@property
def is_on(self):
"""Return true if device is on."""
return self._state
def turn_on(self, **kwargs):
"""Turn the light on."""
brightness = kwargs.get(ATTR_BRIGHTNESS)

View file

@ -11,17 +11,16 @@ from homeassistant.components.sensor import (
)
from homeassistant.const import CONF_DEVICES
from homeassistant.core import callback
from homeassistant.helpers.restore_state import RestoreEntity
from . import (
CONF_AUTOMATIC_ADD,
DATA_TYPES,
DOMAIN,
SIGNAL_EVENT,
RfxtrxEntity,
get_device_id,
get_rfx_object,
)
from .const import ATTR_EVENT, DATA_RFXTRX_CONFIG
from .const import DATA_RFXTRX_CONFIG
_LOGGER = logging.getLogger(__name__)
@ -113,46 +112,20 @@ async def async_setup_entry(
hass.helpers.dispatcher.async_dispatcher_connect(SIGNAL_EVENT, sensor_update)
class RfxtrxSensor(RestoreEntity):
class RfxtrxSensor(RfxtrxEntity):
"""Representation of a RFXtrx sensor."""
def __init__(self, device, device_id, data_type, event=None):
"""Initialize the sensor."""
self._event = None
self._device = device
self._name = f"{device.type_string} {device.id_string} {data_type}"
super().__init__(device, device_id, event=event)
self.data_type = data_type
self._unit_of_measurement = DATA_TYPES.get(data_type, "")
self._device_id = device_id
self._name = f"{device.type_string} {device.id_string} {data_type}"
self._unique_id = "_".join(x for x in (*self._device_id, data_type))
self._device_class = DEVICE_CLASSES.get(data_type)
self._convert_fun = CONVERT_FUNCTIONS.get(data_type, lambda x: x)
if event:
self._apply_event(event)
async def async_added_to_hass(self):
"""Restore RFXtrx switch device state (ON/OFF)."""
await super().async_added_to_hass()
if self._event is None:
old_state = await self.async_get_last_state()
if old_state is not None:
event = old_state.attributes.get(ATTR_EVENT)
if event:
self._apply_event(get_rfx_object(event))
self.async_on_remove(
self.hass.helpers.dispatcher.async_dispatcher_connect(
SIGNAL_EVENT, self._handle_event
)
)
def __str__(self):
"""Return the name of the sensor."""
return self._name
@property
def state(self):
"""Return the state of the sensor."""
@ -161,18 +134,6 @@ class RfxtrxSensor(RestoreEntity):
value = self._event.values.get(self.data_type)
return self._convert_fun(value)
@property
def name(self):
"""Get the name of the sensor."""
return self._name
@property
def device_state_attributes(self):
"""Return the device state attributes."""
if not self._event:
return None
return {ATTR_EVENT: "".join(f"{x:02x}" for x in self._event.data)}
@property
def unit_of_measurement(self):
"""Return the unit this state is expressed in."""
@ -193,24 +154,6 @@ class RfxtrxSensor(RestoreEntity):
"""Return a device class for sensor."""
return self._device_class
@property
def unique_id(self):
"""Return unique identifier of remote device."""
return self._unique_id
@property
def device_info(self):
"""Return the device info."""
return {
"identifiers": {(DOMAIN, *self._device_id)},
"name": f"{self._device.type_string} {self._device.id_string}",
"model": self._device.type_string,
}
def _apply_event(self, event):
"""Apply command from rfxtrx."""
self._event = event
@callback
def _handle_event(self, event, device_id):
"""Check if event applies to me and update."""

View file

@ -6,7 +6,6 @@ import RFXtrx as rfxtrxmod
from homeassistant.components.switch import SwitchEntity
from homeassistant.const import CONF_DEVICES
from homeassistant.core import callback
from homeassistant.helpers.restore_state import RestoreEntity
from . import (
CONF_AUTOMATIC_ADD,
@ -14,7 +13,7 @@ from . import (
DEFAULT_SIGNAL_REPETITIONS,
DOMAIN,
SIGNAL_EVENT,
RfxtrxDevice,
RfxtrxCommandEntity,
get_device_id,
get_rfx_object,
)
@ -89,19 +88,9 @@ async def async_setup_entry(
hass.helpers.dispatcher.async_dispatcher_connect(SIGNAL_EVENT, switch_update)
class RfxtrxSwitch(RfxtrxDevice, SwitchEntity, RestoreEntity):
class RfxtrxSwitch(RfxtrxCommandEntity, SwitchEntity):
"""Representation of a RFXtrx switch."""
async def async_added_to_hass(self):
"""Restore RFXtrx switch device state (ON/OFF)."""
await super().async_added_to_hass()
self.async_on_remove(
self.hass.helpers.dispatcher.async_dispatcher_connect(
SIGNAL_EVENT, self._handle_event
)
)
def _apply_event(self, event):
"""Apply command from rfxtrx."""
super()._apply_event(event)
@ -120,6 +109,11 @@ class RfxtrxSwitch(RfxtrxDevice, SwitchEntity, RestoreEntity):
self.async_write_ha_state()
@property
def is_on(self):
"""Return true if device is on."""
return self._state
def turn_on(self, **kwargs):
"""Turn the device on."""
self._send_command("turn_on")