Add new repeat loop for scripts and automations (#37589)

This commit is contained in:
Phil Bruckner 2020-07-10 13:37:19 -05:00 committed by GitHub
parent b187b17a4f
commit 91271f388c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 337 additions and 62 deletions

View file

@ -822,6 +822,101 @@ async def test_condition_all_cached(hass, script_mode):
assert len(script_obj._config_cache) == 2
async def test_repeat_count(hass):
"""Test repeat action w/ count option."""
event = "test_event"
events = async_capture_events(hass, event)
count = 3
sequence = cv.SCRIPT_SCHEMA(
{
"repeat": {
"count": count,
"sequence": {
"event": event,
"event_data_template": {
"first": "{{ repeat.first }}",
"index": "{{ repeat.index }}",
"last": "{{ repeat.last }}",
},
},
}
}
)
script_obj = script.Script(hass, sequence, script_mode="ignore")
await script_obj.async_run()
await hass.async_block_till_done()
assert len(events) == count
for index, event in enumerate(events):
assert event.data.get("first") == str(index == 0)
assert event.data.get("index") == str(index + 1)
assert event.data.get("last") == str(index == count - 1)
@pytest.mark.parametrize("condition", ["while", "until"])
async def test_repeat_conditional(hass, condition):
"""Test repeat action w/ while option."""
event = "test_event"
events = async_capture_events(hass, event)
count = 3
sequence = {
"repeat": {
"sequence": [
{
"event": event,
"event_data_template": {
"first": "{{ repeat.first }}",
"index": "{{ repeat.index }}",
},
},
{"wait_template": "{{ is_state('sensor.test', 'next') }}"},
{"wait_template": "{{ not is_state('sensor.test', 'next') }}"},
],
}
}
if condition == "while":
sequence["repeat"]["while"] = {
"condition": "template",
"value_template": "{{ not is_state('sensor.test', 'done') }}",
}
else:
sequence["repeat"]["until"] = {
"condition": "template",
"value_template": "{{ is_state('sensor.test', 'done') }}",
}
script_obj = script.Script(hass, cv.SCRIPT_SCHEMA(sequence), script_mode="ignore")
wait_started = async_watch_for_action(script_obj, "wait")
hass.states.async_set("sensor.test", "1")
hass.async_create_task(script_obj.async_run())
try:
for index in range(2, count + 1):
await asyncio.wait_for(wait_started.wait(), 1)
wait_started.clear()
hass.states.async_set("sensor.test", "next")
await asyncio.wait_for(wait_started.wait(), 1)
wait_started.clear()
hass.states.async_set("sensor.test", index)
await asyncio.wait_for(wait_started.wait(), 1)
hass.states.async_set("sensor.test", "next")
await asyncio.wait_for(wait_started.wait(), 1)
wait_started.clear()
hass.states.async_set("sensor.test", "done")
await asyncio.wait_for(hass.async_block_till_done(), 1)
except asyncio.TimeoutError:
await script_obj.async_stop()
raise
assert len(events) == count
for index, event in enumerate(events):
assert event.data.get("first") == str(index == 0)
assert event.data.get("index") == str(index + 1)
@pytest.mark.parametrize("script_mode", _BASIC_SCRIPT_MODES)
async def test_last_triggered(hass, script_mode):
"""Test the last_triggered."""