Support trigger-based template binary sensors (#49504)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
Paulus Schoutsen 2021-04-22 14:54:28 -07:00 committed by GitHub
parent 9410aefd0d
commit 1016d4ea28
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 289 additions and 22 deletions

View file

@ -1,14 +1,19 @@
"""Support for exposing a templated binary sensor."""
from __future__ import annotations
from datetime import timedelta
import logging
import voluptuous as vol
from homeassistant.components.binary_sensor import (
DEVICE_CLASSES_SCHEMA,
DOMAIN as BINARY_SENSOR_DOMAIN,
ENTITY_ID_FORMAT,
PLATFORM_SCHEMA,
BinarySensorEntity,
)
from homeassistant.components.template import TriggerUpdateCoordinator
from homeassistant.const import (
ATTR_ENTITY_ID,
ATTR_FRIENDLY_NAME,
@ -40,6 +45,7 @@ from .const import (
CONF_PICTURE,
)
from .template_entity import TemplateEntity
from .trigger_entity import TriggerEntity
CONF_DELAY_ON = "delay_on"
CONF_DELAY_OFF = "delay_off"
@ -168,8 +174,23 @@ def _async_create_template_tracking_entities(async_add_entities, hass, definitio
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the template binary sensors."""
if discovery_info is None:
_async_create_template_tracking_entities(
async_add_entities,
hass,
rewrite_legacy_to_modern_conf(config[CONF_SENSORS]),
)
return
if "coordinator" in discovery_info:
async_add_entities(
TriggerBinarySensorEntity(hass, discovery_info["coordinator"], config)
for config in discovery_info["entities"]
)
return
_async_create_template_tracking_entities(
async_add_entities, hass, rewrite_legacy_to_modern_conf(config[CONF_SENSORS])
async_add_entities, hass, discovery_info["entities"]
)
@ -283,7 +304,7 @@ class BinarySensorTemplate(TemplateEntity, BinarySensorEntity):
self._state = state
self.async_write_ha_state()
delay = (self._delay_on if state else self._delay_off).seconds
delay = (self._delay_on if state else self._delay_off).total_seconds()
# state with delay. Cancelled if template result changes.
self._delay_cancel = async_call_later(self.hass, delay, _set_state)
@ -306,3 +327,83 @@ class BinarySensorTemplate(TemplateEntity, BinarySensorEntity):
def device_class(self):
"""Return the sensor class of the binary sensor."""
return self._device_class
class TriggerBinarySensorEntity(TriggerEntity, BinarySensorEntity):
"""Sensor entity based on trigger data."""
domain = BINARY_SENSOR_DOMAIN
extra_template_keys = (CONF_STATE,)
def __init__(
self,
hass: HomeAssistant,
coordinator: TriggerUpdateCoordinator,
config: dict,
) -> None:
"""Initialize the entity."""
super().__init__(hass, coordinator, config)
if isinstance(config.get(CONF_DELAY_ON), template.Template):
self._to_render.append(CONF_DELAY_ON)
self._parse_result.add(CONF_DELAY_ON)
if isinstance(config.get(CONF_DELAY_OFF), template.Template):
self._to_render.append(CONF_DELAY_OFF)
self._parse_result.add(CONF_DELAY_OFF)
self._delay_cancel = None
self._state = False
@property
def is_on(self) -> bool:
"""Return state of the sensor."""
return self._state
@callback
def _handle_coordinator_update(self) -> None:
"""Handle update of the data."""
self._process_data()
if self._delay_cancel:
self._delay_cancel()
self._delay_cancel = None
if not self.available:
return
raw = self._rendered.get(CONF_STATE)
state = template.result_as_boolean(raw)
if state == self._state:
return
key = CONF_DELAY_ON if state else CONF_DELAY_OFF
delay = self._rendered.get(key) or self._config.get(key)
# state without delay. None means rendering failed.
if state is None or delay is None:
self._state = state
self.async_write_ha_state()
return
if not isinstance(delay, timedelta):
try:
delay = cv.positive_time_period(delay)
except vol.Invalid as err:
logging.getLogger(__name__).warning(
"Error rendering %s template: %s", key, err
)
return
@callback
def _set_state(_):
"""Set state of template binary sensor."""
self._state = state
self.async_set_context(self.coordinator.data["context"])
self.async_write_ha_state()
# state with delay. Cancelled if new trigger received
self._delay_cancel = async_call_later(
self.hass, delay.total_seconds(), _set_state
)

View file

@ -3,13 +3,14 @@ import logging
import voluptuous as vol
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.config import async_log_exception, config_without_domain
from homeassistant.const import CONF_SENSORS, CONF_UNIQUE_ID
from homeassistant.const import CONF_BINARY_SENSORS, CONF_SENSORS, CONF_UNIQUE_ID
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.trigger import async_validate_trigger_config
from . import sensor as sensor_platform
from . import binary_sensor as binary_sensor_platform, sensor as sensor_platform
from .const import CONF_TRIGGER, DOMAIN
CONFIG_SECTION_SCHEMA = vol.Schema(
@ -22,6 +23,12 @@ CONFIG_SECTION_SCHEMA = vol.Schema(
vol.Optional(CONF_SENSORS): cv.schema_with_slug_keys(
sensor_platform.LEGACY_SENSOR_SCHEMA
),
vol.Optional(BINARY_SENSOR_DOMAIN): vol.All(
cv.ensure_list, [binary_sensor_platform.BINARY_SENSOR_SCHEMA]
),
vol.Optional(CONF_BINARY_SENSORS): cv.schema_with_slug_keys(
binary_sensor_platform.LEGACY_BINARY_SENSOR_SCHEMA
),
}
)
@ -45,17 +52,34 @@ async def async_validate_config(hass, config):
async_log_exception(err, DOMAIN, cfg, hass)
continue
if CONF_SENSORS in cfg:
logging.getLogger(__name__).warning(
"The entity definition format under template: differs from the platform "
"configuration format. See "
"https://www.home-assistant.io/integrations/template#configuration-for-trigger-based-template-sensors"
)
sensors = list(cfg[SENSOR_DOMAIN]) if SENSOR_DOMAIN in cfg else []
sensors.extend(
sensor_platform.rewrite_legacy_to_modern_conf(cfg[CONF_SENSORS])
)
cfg = {**cfg, "sensor": sensors}
legacy_warn_printed = False
for old_key, new_key, transform in (
(
CONF_SENSORS,
SENSOR_DOMAIN,
sensor_platform.rewrite_legacy_to_modern_conf,
),
(
CONF_BINARY_SENSORS,
BINARY_SENSOR_DOMAIN,
binary_sensor_platform.rewrite_legacy_to_modern_conf,
),
):
if old_key not in cfg:
continue
if not legacy_warn_printed:
legacy_warn_printed = True
logging.getLogger(__name__).warning(
"The entity definition format under template: differs from the platform "
"configuration format. See "
"https://www.home-assistant.io/integrations/template#configuration-for-trigger-based-template-sensors"
)
definitions = list(cfg[new_key]) if new_key in cfg else []
definitions.extend(transform(cfg[old_key]))
cfg = {**cfg, new_key: definitions}
config_sections.append(cfg)

View file

@ -64,6 +64,7 @@ class TriggerEntity(update_coordinator.CoordinatorEntity):
# We make a copy so our initial render is 'unknown' and not 'unavailable'
self._rendered = dict(self._static_rendered)
self._parse_result = set()
@property
def name(self):
@ -115,17 +116,18 @@ class TriggerEntity(update_coordinator.CoordinatorEntity):
template.attach(self.hass, self._config)
await super().async_added_to_hass()
if self.coordinator.data is not None:
self._handle_coordinator_update()
self._process_data()
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
def _process_data(self) -> None:
"""Process new data."""
try:
rendered = dict(self._static_rendered)
for key in self._to_render:
rendered[key] = self._config[key].async_render(
self.coordinator.data["run_variables"], parse_result=False
self.coordinator.data["run_variables"],
parse_result=key in self._parse_result,
)
if CONF_ATTRIBUTES in self._config:
@ -142,4 +144,9 @@ class TriggerEntity(update_coordinator.CoordinatorEntity):
self._rendered = self._static_rendered
self.async_set_context(self.coordinator.data["context"])
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self._process_data()
self.async_write_ha_state()

View file

@ -858,6 +858,9 @@ def result_as_boolean(template_result: str | None) -> bool:
False/0/None/'0'/'false'/'no'/'off'/'disable' are considered falsy
"""
if template_result is None:
return False
try:
# Import here, not at top-level to avoid circular import
from homeassistant.helpers import ( # pylint: disable=import-outside-toplevel

View file

@ -12,13 +12,14 @@ from homeassistant.const import (
STATE_ON,
STATE_UNAVAILABLE,
)
from homeassistant.core import CoreState
from homeassistant.core import Context, CoreState
from homeassistant.helpers import entity_registry
import homeassistant.util.dt as dt_util
from tests.common import async_fire_time_changed
async def test_setup(hass):
async def test_setup_legacy(hass):
"""Test the setup."""
config = {
"binary_sensor": {
@ -906,3 +907,128 @@ async def test_template_validation_error(hass, caplog):
state = hass.states.get("binary_sensor.test")
assert state.attributes.get("icon") is None
async def test_trigger_entity(hass):
"""Test trigger entity works."""
assert await setup.async_setup_component(
hass,
"template",
{
"template": [
{"invalid": "config"},
# Config after invalid should still be set up
{
"unique_id": "listening-test-event",
"trigger": {"platform": "event", "event_type": "test_event"},
"binary_sensors": {
"hello": {
"friendly_name": "Hello Name",
"unique_id": "hello_name-id",
"device_class": "battery",
"value_template": "{{ trigger.event.data.beer == 2 }}",
"entity_picture_template": "{{ '/local/dogs.png' }}",
"icon_template": "{{ 'mdi:pirate' }}",
"attribute_templates": {
"plus_one": "{{ trigger.event.data.beer + 1 }}"
},
},
},
"binary_sensor": [
{
"name": "via list",
"unique_id": "via_list-id",
"device_class": "battery",
"state": "{{ trigger.event.data.beer == 2 }}",
"picture": "{{ '/local/dogs.png' }}",
"icon": "{{ 'mdi:pirate' }}",
"attributes": {
"plus_one": "{{ trigger.event.data.beer + 1 }}"
},
}
],
},
{
"trigger": [],
"binary_sensors": {
"bare_minimum": {
"value_template": "{{ trigger.event.data.beer == 1 }}"
},
},
},
],
},
)
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.hello_name")
assert state is not None
assert state.state == "off"
state = hass.states.get("binary_sensor.bare_minimum")
assert state is not None
assert state.state == "off"
context = Context()
hass.bus.async_fire("test_event", {"beer": 2}, context=context)
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.hello_name")
assert state.state == "on"
assert state.attributes.get("device_class") == "battery"
assert state.attributes.get("icon") == "mdi:pirate"
assert state.attributes.get("entity_picture") == "/local/dogs.png"
assert state.attributes.get("plus_one") == 3
assert state.context is context
ent_reg = entity_registry.async_get(hass)
assert len(ent_reg.entities) == 2
assert (
ent_reg.entities["binary_sensor.hello_name"].unique_id
== "listening-test-event-hello_name-id"
)
assert (
ent_reg.entities["binary_sensor.via_list"].unique_id
== "listening-test-event-via_list-id"
)
state = hass.states.get("binary_sensor.via_list")
assert state.state == "on"
assert state.attributes.get("device_class") == "battery"
assert state.attributes.get("icon") == "mdi:pirate"
assert state.attributes.get("entity_picture") == "/local/dogs.png"
assert state.attributes.get("plus_one") == 3
assert state.context is context
async def test_template_with_trigger_templated_delay_on(hass):
"""Test binary sensor template with template delay on."""
config = {
"template": {
"trigger": {"platform": "event", "event_type": "test_event"},
"binary_sensor": {
"name": "test",
"state": "{{ trigger.event.data.beer == 2 }}",
"device_class": "motion",
"delay_on": '{{ ({ "seconds": 6 / 2 }) }}',
},
}
}
await setup.async_setup_component(hass, "template", config)
await hass.async_block_till_done()
await hass.async_start()
state = hass.states.get("binary_sensor.test")
assert state.state == "off"
context = Context()
hass.bus.async_fire("test_event", {"beer": 2}, context=context)
await hass.async_block_till_done()
future = dt_util.utcnow() + timedelta(seconds=3)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.test")
assert state.state == "on"

View file

@ -41,6 +41,10 @@ async def test_reloadable(hass):
"name": "top level state",
"state": "{{ states.sensor.top_level.state }} + 2",
},
"binary_sensor": {
"name": "top level state",
"state": "{{ states.sensor.top_level.state == 'init' }}",
},
},
],
},
@ -50,15 +54,17 @@ async def test_reloadable(hass):
await hass.async_start()
await hass.async_block_till_done()
assert hass.states.get("sensor.top_level_state").state == "unknown + 2"
assert hass.states.get("binary_sensor.top_level_state").state == "off"
hass.bus.async_fire("event_1", {"source": "init"})
await hass.async_block_till_done()
assert len(hass.states.async_all()) == 4
assert len(hass.states.async_all()) == 5
assert hass.states.get("sensor.state").state == "mytest"
assert hass.states.get("sensor.top_level").state == "init"
await hass.async_block_till_done()
assert hass.states.get("sensor.top_level_state").state == "init + 2"
assert hass.states.get("binary_sensor.top_level_state").state == "on"
yaml_path = path.join(
_get_fixtures_base_path(),