Add support for rfxtrx sirens and chimes (#66416)
* Add support for sirens and chimes * Fixup testing * Fixup comments * Hook up existing off delay * Add docs for off delay. * Rename mixin
This commit is contained in:
parent
ad4409bcb0
commit
8a74295d6f
3 changed files with 385 additions and 0 deletions
|
@ -49,6 +49,7 @@ from .const import (
|
|||
DOMAIN = "rfxtrx"
|
||||
|
||||
DEFAULT_SIGNAL_REPETITIONS = 1
|
||||
DEFAULT_OFF_DELAY = 2.0
|
||||
|
||||
SIGNAL_EVENT = f"{DOMAIN}_event"
|
||||
|
||||
|
@ -81,6 +82,7 @@ PLATFORMS = [
|
|||
Platform.LIGHT,
|
||||
Platform.BINARY_SENSOR,
|
||||
Platform.COVER,
|
||||
Platform.SIREN,
|
||||
]
|
||||
|
||||
|
||||
|
|
245
homeassistant/components/rfxtrx/siren.py
Normal file
245
homeassistant/components/rfxtrx/siren.py
Normal file
|
@ -0,0 +1,245 @@
|
|||
"""Support for RFXtrx sirens."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import RFXtrx as rfxtrxmod
|
||||
|
||||
from homeassistant.components.siren import (
|
||||
SUPPORT_TONES,
|
||||
SUPPORT_TURN_OFF,
|
||||
SUPPORT_TURN_ON,
|
||||
SirenEntity,
|
||||
)
|
||||
from homeassistant.components.siren.const import ATTR_TONE
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
|
||||
from homeassistant.helpers.entity import Entity
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.event import async_call_later
|
||||
|
||||
from . import (
|
||||
DEFAULT_OFF_DELAY,
|
||||
DEFAULT_SIGNAL_REPETITIONS,
|
||||
DeviceTuple,
|
||||
RfxtrxCommandEntity,
|
||||
async_setup_platform_entry,
|
||||
)
|
||||
from .const import CONF_OFF_DELAY, CONF_SIGNAL_REPETITIONS
|
||||
|
||||
SUPPORT_RFXTRX = SUPPORT_TURN_ON | SUPPORT_TONES
|
||||
|
||||
SECURITY_PANIC_ON = "Panic"
|
||||
SECURITY_PANIC_OFF = "End Panic"
|
||||
SECURITY_PANIC_ALL = {SECURITY_PANIC_ON, SECURITY_PANIC_OFF}
|
||||
|
||||
|
||||
def supported(event: rfxtrxmod.RFXtrxEvent):
|
||||
"""Return whether an event supports sirens."""
|
||||
device = event.device
|
||||
|
||||
if isinstance(device, rfxtrxmod.ChimeDevice):
|
||||
return True
|
||||
|
||||
if isinstance(device, rfxtrxmod.SecurityDevice) and isinstance(
|
||||
event, rfxtrxmod.SensorEvent
|
||||
):
|
||||
if event.values["Sensor Status"] in SECURITY_PANIC_ALL:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def get_first_key(data: dict[int, str], entry: str) -> int:
|
||||
"""Find a key based on the items value."""
|
||||
return next((key for key, value in data.items() if value == entry))
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
config_entry: ConfigEntry,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up config entry."""
|
||||
|
||||
def _constructor(
|
||||
event: rfxtrxmod.RFXtrxEvent,
|
||||
auto: rfxtrxmod.RFXtrxEvent | None,
|
||||
device_id: DeviceTuple,
|
||||
entity_info: dict,
|
||||
):
|
||||
"""Construct a entity from an event."""
|
||||
device = event.device
|
||||
|
||||
if isinstance(device, rfxtrxmod.ChimeDevice):
|
||||
return [
|
||||
RfxtrxChime(
|
||||
event.device,
|
||||
device_id,
|
||||
entity_info.get(
|
||||
CONF_SIGNAL_REPETITIONS, DEFAULT_SIGNAL_REPETITIONS
|
||||
),
|
||||
entity_info.get(CONF_OFF_DELAY, DEFAULT_OFF_DELAY),
|
||||
auto,
|
||||
)
|
||||
]
|
||||
|
||||
if isinstance(device, rfxtrxmod.SecurityDevice) and isinstance(
|
||||
event, rfxtrxmod.SensorEvent
|
||||
):
|
||||
if event.values["Sensor Status"] in SECURITY_PANIC_ALL:
|
||||
return [
|
||||
RfxtrxSecurityPanic(
|
||||
event.device,
|
||||
device_id,
|
||||
entity_info.get(
|
||||
CONF_SIGNAL_REPETITIONS, DEFAULT_SIGNAL_REPETITIONS
|
||||
),
|
||||
entity_info.get(CONF_OFF_DELAY, DEFAULT_OFF_DELAY),
|
||||
auto,
|
||||
)
|
||||
]
|
||||
|
||||
await async_setup_platform_entry(
|
||||
hass, config_entry, async_add_entities, supported, _constructor
|
||||
)
|
||||
|
||||
|
||||
class RfxtrxOffDelayMixin(Entity):
|
||||
"""Mixin to support timeouts on data.
|
||||
|
||||
Many 433 devices only send data when active. They will
|
||||
repeatedly (every x seconds) send a command to indicate
|
||||
being active and stop sending this command when inactive.
|
||||
This mixin allow us to keep track of the timeout once
|
||||
they go inactive.
|
||||
"""
|
||||
|
||||
_timeout: CALLBACK_TYPE | None = None
|
||||
_off_delay: float | None = None
|
||||
|
||||
def _setup_timeout(self):
|
||||
@callback
|
||||
def _done(_):
|
||||
self._timeout = None
|
||||
self.async_write_ha_state()
|
||||
|
||||
if self._off_delay:
|
||||
self._timeout = async_call_later(self.hass, self._off_delay, _done)
|
||||
|
||||
def _cancel_timeout(self):
|
||||
if self._timeout:
|
||||
self._timeout()
|
||||
self._timeout = None
|
||||
|
||||
|
||||
class RfxtrxChime(RfxtrxCommandEntity, SirenEntity, RfxtrxOffDelayMixin):
|
||||
"""Representation of a RFXtrx chime."""
|
||||
|
||||
_device: rfxtrxmod.ChimeDevice
|
||||
|
||||
def __init__(
|
||||
self, device, device_id, signal_repetitions=1, off_delay=None, event=None
|
||||
):
|
||||
"""Initialize the entity."""
|
||||
super().__init__(device, device_id, signal_repetitions, event)
|
||||
self._attr_available_tones = list(self._device.COMMANDS.values())
|
||||
self._attr_supported_features = SUPPORT_TURN_ON | SUPPORT_TONES
|
||||
self._default_tone = next(iter(self._device.COMMANDS))
|
||||
self._off_delay = off_delay
|
||||
|
||||
@property
|
||||
def is_on(self):
|
||||
"""Return true if device is on."""
|
||||
return self._timeout is not None
|
||||
|
||||
async def async_turn_on(self, **kwargs):
|
||||
"""Turn the device on."""
|
||||
self._cancel_timeout()
|
||||
|
||||
if tone := kwargs.get(ATTR_TONE):
|
||||
command = get_first_key(self._device.COMMANDS, tone)
|
||||
else:
|
||||
command = self._default_tone
|
||||
|
||||
await self._async_send(self._device.send_command, command)
|
||||
|
||||
self._setup_timeout()
|
||||
|
||||
self.async_write_ha_state()
|
||||
|
||||
def _apply_event(self, event: rfxtrxmod.ControlEvent):
|
||||
"""Apply a received event."""
|
||||
super()._apply_event(event)
|
||||
|
||||
sound = event.values.get("Sound")
|
||||
if sound is not None:
|
||||
self._cancel_timeout()
|
||||
self._setup_timeout()
|
||||
|
||||
@callback
|
||||
def _handle_event(self, event, device_id):
|
||||
"""Check if event applies to me and update."""
|
||||
if self._event_applies(event, device_id):
|
||||
self._apply_event(event)
|
||||
|
||||
self.async_write_ha_state()
|
||||
|
||||
|
||||
class RfxtrxSecurityPanic(RfxtrxCommandEntity, SirenEntity, RfxtrxOffDelayMixin):
|
||||
"""Representation of a security device."""
|
||||
|
||||
_device: rfxtrxmod.SecurityDevice
|
||||
|
||||
def __init__(
|
||||
self, device, device_id, signal_repetitions=1, off_delay=None, event=None
|
||||
):
|
||||
"""Initialize the entity."""
|
||||
super().__init__(device, device_id, signal_repetitions, event)
|
||||
self._attr_supported_features = SUPPORT_TURN_ON | SUPPORT_TURN_OFF
|
||||
self._on_value = get_first_key(self._device.STATUS, SECURITY_PANIC_ON)
|
||||
self._off_value = get_first_key(self._device.STATUS, SECURITY_PANIC_OFF)
|
||||
self._off_delay = off_delay
|
||||
|
||||
@property
|
||||
def is_on(self):
|
||||
"""Return true if device is on."""
|
||||
return self._timeout is not None
|
||||
|
||||
async def async_turn_on(self, **kwargs: Any):
|
||||
"""Turn the device on."""
|
||||
self._cancel_timeout()
|
||||
|
||||
await self._async_send(self._device.send_status, self._on_value)
|
||||
|
||||
self._setup_timeout()
|
||||
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_turn_off(self, **kwargs: Any) -> None:
|
||||
"""Turn the device off."""
|
||||
self._cancel_timeout()
|
||||
|
||||
await self._async_send(self._device.send_status, self._off_value)
|
||||
|
||||
self.async_write_ha_state()
|
||||
|
||||
def _apply_event(self, event: rfxtrxmod.SensorEvent):
|
||||
"""Apply a received event."""
|
||||
super()._apply_event(event)
|
||||
|
||||
status = event.values.get("Sensor Status")
|
||||
|
||||
if status == SECURITY_PANIC_ON:
|
||||
self._cancel_timeout()
|
||||
self._setup_timeout()
|
||||
elif status == SECURITY_PANIC_OFF:
|
||||
self._cancel_timeout()
|
||||
|
||||
@callback
|
||||
def _handle_event(self, event, device_id):
|
||||
"""Check if event applies to me and update."""
|
||||
if self._event_applies(event, device_id):
|
||||
self._apply_event(event)
|
||||
|
||||
self.async_write_ha_state()
|
138
tests/components/rfxtrx/test_siren.py
Normal file
138
tests/components/rfxtrx/test_siren.py
Normal file
|
@ -0,0 +1,138 @@
|
|||
"""The tests for the Rfxtrx siren platform."""
|
||||
from unittest.mock import call
|
||||
|
||||
from homeassistant.components.rfxtrx import DOMAIN
|
||||
|
||||
from .conftest import create_rfx_test_cfg
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
|
||||
async def test_one_chime(hass, rfxtrx, timestep):
|
||||
"""Test with 1 entity."""
|
||||
entry_data = create_rfx_test_cfg(
|
||||
devices={"0a16000000000000000000": {"signal_repetitions": 1, "off_delay": 2.0}}
|
||||
)
|
||||
mock_entry = MockConfigEntry(domain="rfxtrx", unique_id=DOMAIN, data=entry_data)
|
||||
|
||||
mock_entry.add_to_hass(hass)
|
||||
|
||||
entity_id = "siren.byron_sx_00_00"
|
||||
|
||||
await hass.config_entries.async_setup(mock_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get(entity_id)
|
||||
assert state
|
||||
assert state.state == "off"
|
||||
assert state.attributes.get("friendly_name") == "Byron SX 00:00"
|
||||
|
||||
await hass.services.async_call(
|
||||
"siren", "turn_on", {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == "on"
|
||||
|
||||
await timestep(5)
|
||||
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == "off"
|
||||
|
||||
await hass.services.async_call(
|
||||
"siren", "turn_on", {"entity_id": entity_id, "tone": "Sound 1"}, blocking=True
|
||||
)
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == "on"
|
||||
|
||||
await timestep(3)
|
||||
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == "off"
|
||||
|
||||
await rfxtrx.signal("0a16000000000000000000")
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == "on"
|
||||
|
||||
await timestep(3)
|
||||
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == "off"
|
||||
|
||||
assert rfxtrx.transport.send.mock_calls == [
|
||||
call(bytearray(b"\x07\x16\x00\x00\x00\x00\x00\x00")),
|
||||
call(bytearray(b"\x07\x16\x00\x00\x00\x00\x01\x00")),
|
||||
]
|
||||
|
||||
|
||||
async def test_one_security1(hass, rfxtrx, timestep):
|
||||
"""Test with 1 entity."""
|
||||
entry_data = create_rfx_test_cfg(
|
||||
devices={"08200300a109000670": {"signal_repetitions": 1, "off_delay": 2.0}}
|
||||
)
|
||||
mock_entry = MockConfigEntry(domain="rfxtrx", unique_id=DOMAIN, data=entry_data)
|
||||
|
||||
mock_entry.add_to_hass(hass)
|
||||
|
||||
entity_id = "siren.kd101_smoke_detector_a10900_32"
|
||||
|
||||
await hass.config_entries.async_setup(mock_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get(entity_id)
|
||||
assert state
|
||||
assert state.state == "off"
|
||||
assert state.attributes.get("friendly_name") == "KD101 Smoke Detector a10900:32"
|
||||
|
||||
await hass.services.async_call(
|
||||
"siren", "turn_on", {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == "on"
|
||||
|
||||
await hass.services.async_call(
|
||||
"siren", "turn_off", {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == "off"
|
||||
|
||||
await hass.services.async_call(
|
||||
"siren", "turn_on", {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == "on"
|
||||
|
||||
await timestep(11)
|
||||
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == "off"
|
||||
|
||||
await rfxtrx.signal("08200300a109000670")
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == "on"
|
||||
|
||||
await rfxtrx.signal("08200300a109000770")
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == "off"
|
||||
|
||||
assert rfxtrx.transport.send.mock_calls == [
|
||||
call(bytearray(b"\x08\x20\x03\x00\xa1\x09\x00\x06\x00")),
|
||||
call(bytearray(b"\x08\x20\x03\x01\xa1\x09\x00\x07\x00")),
|
||||
call(bytearray(b"\x08\x20\x03\x02\xa1\x09\x00\x06\x00")),
|
||||
]
|
||||
|
||||
|
||||
async def test_discover_siren(hass, rfxtrx_automatic):
|
||||
"""Test with discovery."""
|
||||
rfxtrx = rfxtrx_automatic
|
||||
|
||||
await rfxtrx.signal("0a16000000000000000000")
|
||||
state = hass.states.get("siren.byron_sx_00_00")
|
||||
assert state
|
||||
assert state.state == "on"
|
||||
assert state.attributes.get("friendly_name") == "Byron SX 00:00"
|
||||
|
||||
await rfxtrx.signal("0a16010000000000000000")
|
||||
state = hass.states.get("siren.byron_mp001_00_00")
|
||||
assert state
|
||||
assert state.state == "on"
|
||||
assert state.attributes.get("friendly_name") == "Byron MP001 00:00"
|
Loading…
Add table
Reference in a new issue