"""The tests for the Script component.""" # pylint: disable=protected-access import asyncio from contextlib import contextmanager from datetime import timedelta import logging from types import MappingProxyType from unittest import mock from unittest.mock import patch import pytest import voluptuous as vol # Otherwise can't test just this file (import order issue) from homeassistant import exceptions import homeassistant.components.scene as scene from homeassistant.const import ATTR_ENTITY_ID, SERVICE_TURN_ON from homeassistant.core import Context, CoreState, callback from homeassistant.helpers import config_validation as cv, script from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util from tests.common import ( async_capture_events, async_fire_time_changed, async_mock_service, ) ENTITY_ID = "script.test" def async_watch_for_action(script_obj, message): """Watch for message in last_action.""" flag = asyncio.Event() @callback def check_action(): if script_obj.last_action and message in script_obj.last_action: flag.set() script_obj.change_listener = check_action assert script_obj.change_listener is check_action return flag async def test_firing_event_basic(hass, caplog): """Test the firing of events.""" event = "test_event" context = Context() events = async_capture_events(hass, event) sequence = cv.SCRIPT_SCHEMA({"event": event, "event_data": {"hello": "world"}}) script_obj = script.Script( hass, sequence, "Test Name", "test_domain", running_description="test script" ) await script_obj.async_run(context=context) await hass.async_block_till_done() assert len(events) == 1 assert events[0].context is context assert events[0].data.get("hello") == "world" assert ".test_name:" in caplog.text assert "Test Name: Running test script" in caplog.text async def test_firing_event_template(hass): """Test the firing of events.""" event = "test_event" context = Context() events = async_capture_events(hass, event) sequence = cv.SCRIPT_SCHEMA( { "event": event, "event_data": { "dict": { 1: "{{ is_world }}", 2: "{{ is_world }}{{ is_world }}", 3: "{{ is_world }}{{ is_world }}{{ is_world }}", }, "list": ["{{ is_world }}", "{{ is_world }}{{ is_world }}"], }, "event_data_template": { "dict2": { 1: "{{ is_world }}", 2: "{{ is_world }}{{ is_world }}", 3: "{{ is_world }}{{ is_world }}{{ is_world }}", }, "list2": ["{{ is_world }}", "{{ is_world }}{{ is_world }}"], }, } ) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") await script_obj.async_run(MappingProxyType({"is_world": "yes"}), context=context) await hass.async_block_till_done() assert len(events) == 1 assert events[0].context is context assert events[0].data == { "dict": {1: "yes", 2: "yesyes", 3: "yesyesyes"}, "list": ["yes", "yesyes"], "dict2": {1: "yes", 2: "yesyes", 3: "yesyesyes"}, "list2": ["yes", "yesyes"], } async def test_calling_service_basic(hass): """Test the calling of a service.""" context = Context() calls = async_mock_service(hass, "test", "script") sequence = cv.SCRIPT_SCHEMA({"service": "test.script", "data": {"hello": "world"}}) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") await script_obj.async_run(context=context) await hass.async_block_till_done() assert len(calls) == 1 assert calls[0].context is context assert calls[0].data.get("hello") == "world" async def test_calling_service_template(hass): """Test the calling of a service.""" context = Context() calls = async_mock_service(hass, "test", "script") sequence = cv.SCRIPT_SCHEMA( { "service_template": """ {% if True %} test.script {% else %} test.not_script {% endif %}""", "data_template": { "hello": """ {% if is_world == 'yes' %} world {% else %} not world {% endif %} """ }, } ) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") await script_obj.async_run(MappingProxyType({"is_world": "yes"}), context=context) await hass.async_block_till_done() assert len(calls) == 1 assert calls[0].context is context assert calls[0].data.get("hello") == "world" async def test_data_template_with_templated_key(hass): """Test the calling of a service with a data_template with a templated key.""" context = Context() calls = async_mock_service(hass, "test", "script") sequence = cv.SCRIPT_SCHEMA( {"service": "test.script", "data_template": {"{{ hello_var }}": "world"}} ) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") await script_obj.async_run( MappingProxyType({"hello_var": "hello"}), context=context ) await hass.async_block_till_done() assert len(calls) == 1 assert calls[0].context is context assert "hello" in calls[0].data async def test_multiple_runs_no_wait(hass): """Test multiple runs with no wait in script.""" logger = logging.getLogger("TEST") calls = [] heard_event = asyncio.Event() async def async_simulate_long_service(service): """Simulate a service that takes a not insignificant time.""" fire = service.data.get("fire") listen = service.data.get("listen") service_done = asyncio.Event() @callback def service_done_cb(event): logger.debug("simulated service (%s:%s) done", fire, listen) service_done.set() calls.append(service) logger.debug("simulated service (%s:%s) started", fire, listen) unsub = hass.bus.async_listen(str(listen), service_done_cb) hass.bus.async_fire(str(fire)) await service_done.wait() unsub() hass.services.async_register("test", "script", async_simulate_long_service) @callback def heard_event_cb(event): logger.debug("heard: %s", event) heard_event.set() sequence = cv.SCRIPT_SCHEMA( [ { "service": "test.script", "data_template": {"fire": "{{ fire1 }}", "listen": "{{ listen1 }}"}, }, { "service": "test.script", "data_template": {"fire": "{{ fire2 }}", "listen": "{{ listen2 }}"}, }, ] ) script_obj = script.Script( hass, sequence, "Test Name", "test_domain", script_mode="parallel", max_runs=2 ) # Start script twice in such a way that second run will be started while first run # is in the middle of the first service call. unsub = hass.bus.async_listen("1", heard_event_cb) logger.debug("starting 1st script") hass.async_create_task( script_obj.async_run( MappingProxyType( {"fire1": "1", "listen1": "2", "fire2": "3", "listen2": "4"} ), Context(), ) ) await asyncio.wait_for(heard_event.wait(), 1) unsub() logger.debug("starting 2nd script") await script_obj.async_run( MappingProxyType({"fire1": "2", "listen1": "3", "fire2": "4", "listen2": "4"}), Context(), ) await hass.async_block_till_done() assert len(calls) == 4 async def test_activating_scene(hass): """Test the activation of a scene.""" context = Context() calls = async_mock_service(hass, scene.DOMAIN, SERVICE_TURN_ON) sequence = cv.SCRIPT_SCHEMA({"scene": "scene.hello"}) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") await script_obj.async_run(context=context) await hass.async_block_till_done() assert len(calls) == 1 assert calls[0].context is context assert calls[0].data.get(ATTR_ENTITY_ID) == "scene.hello" @pytest.mark.parametrize("count", [1, 3]) async def test_stop_no_wait(hass, count): """Test stopping script.""" service_started_sem = asyncio.Semaphore(0) finish_service_event = asyncio.Event() event = "test_event" events = async_capture_events(hass, event) async def async_simulate_long_service(service): """Simulate a service that takes a not insignificant time.""" service_started_sem.release() await finish_service_event.wait() hass.services.async_register("test", "script", async_simulate_long_service) sequence = cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": event}]) script_obj = script.Script( hass, sequence, "Test Name", "test_domain", script_mode="parallel", max_runs=count, ) # Get script started specified number of times and wait until the test.script # service has started for each run. tasks = [] for _ in range(count): hass.async_create_task(script_obj.async_run(context=Context())) tasks.append(hass.async_create_task(service_started_sem.acquire())) await asyncio.wait_for(asyncio.gather(*tasks), 1) # Can't assert just yet because we haven't verified stopping works yet. # If assert fails we can hang test if async_stop doesn't work. script_was_runing = script_obj.is_running were_no_events = len(events) == 0 # Begin the process of stopping the script (which should stop all runs), and then # let the service calls complete. hass.async_create_task(script_obj.async_stop()) finish_service_event.set() await hass.async_block_till_done() assert script_was_runing assert were_no_events assert not script_obj.is_running assert len(events) == 0 async def test_delay_basic(hass): """Test the delay.""" delay_alias = "delay step" sequence = cv.SCRIPT_SCHEMA({"delay": {"seconds": 5}, "alias": delay_alias}) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") delay_started_flag = async_watch_for_action(script_obj, delay_alias) try: hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(delay_started_flag.wait(), 1) assert script_obj.is_running assert script_obj.last_action == delay_alias except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=5)) await hass.async_block_till_done() assert not script_obj.is_running assert script_obj.last_action is None async def test_multiple_runs_delay(hass): """Test multiple runs with delay in script.""" event = "test_event" events = async_capture_events(hass, event) delay = timedelta(seconds=5) sequence = cv.SCRIPT_SCHEMA( [ {"event": event, "event_data": {"value": 1}}, {"delay": delay}, {"event": event, "event_data": {"value": 2}}, ] ) script_obj = script.Script( hass, sequence, "Test Name", "test_domain", script_mode="parallel", max_runs=2 ) delay_started_flag = async_watch_for_action(script_obj, "delay") try: hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(delay_started_flag.wait(), 1) assert script_obj.is_running assert len(events) == 1 assert events[-1].data["value"] == 1 except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: # Start second run of script while first run is in a delay. script_obj.sequence[1]["alias"] = "delay run 2" delay_started_flag = async_watch_for_action(script_obj, "delay run 2") hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(delay_started_flag.wait(), 1) async_fire_time_changed(hass, dt_util.utcnow() + delay) await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 4 assert events[-3].data["value"] == 1 assert events[-2].data["value"] == 2 assert events[-1].data["value"] == 2 async def test_delay_template_ok(hass): """Test the delay as a template.""" sequence = cv.SCRIPT_SCHEMA({"delay": "00:00:{{ 5 }}"}) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") delay_started_flag = async_watch_for_action(script_obj, "delay") try: hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(delay_started_flag.wait(), 1) assert script_obj.is_running except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=5)) await hass.async_block_till_done() assert not script_obj.is_running async def test_delay_template_invalid(hass, caplog): """Test the delay as a template that fails.""" event = "test_event" events = async_capture_events(hass, event) sequence = cv.SCRIPT_SCHEMA( [ {"event": event}, {"delay": "{{ invalid_delay }}"}, {"delay": {"seconds": 5}}, {"event": event}, ] ) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") start_idx = len(caplog.records) await script_obj.async_run(context=Context()) await hass.async_block_till_done() assert any( rec.levelname == "ERROR" and "Error rendering" in rec.message for rec in caplog.records[start_idx:] ) assert not script_obj.is_running assert len(events) == 1 async def test_delay_template_complex_ok(hass): """Test the delay with a working complex template.""" sequence = cv.SCRIPT_SCHEMA({"delay": {"seconds": "{{ 5 }}"}}) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") delay_started_flag = async_watch_for_action(script_obj, "delay") try: hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(delay_started_flag.wait(), 1) assert script_obj.is_running except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=5)) await hass.async_block_till_done() assert not script_obj.is_running async def test_delay_template_complex_invalid(hass, caplog): """Test the delay with a complex template that fails.""" event = "test_event" events = async_capture_events(hass, event) sequence = cv.SCRIPT_SCHEMA( [ {"event": event}, {"delay": {"seconds": "{{ invalid_delay }}"}}, {"delay": {"seconds": 5}}, {"event": event}, ] ) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") start_idx = len(caplog.records) await script_obj.async_run(context=Context()) await hass.async_block_till_done() assert any( rec.levelname == "ERROR" and "Error rendering" in rec.message for rec in caplog.records[start_idx:] ) assert not script_obj.is_running assert len(events) == 1 async def test_cancel_delay(hass): """Test the cancelling while the delay is present.""" event = "test_event" events = async_capture_events(hass, event) sequence = cv.SCRIPT_SCHEMA([{"delay": {"seconds": 5}}, {"event": event}]) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") delay_started_flag = async_watch_for_action(script_obj, "delay") try: hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(delay_started_flag.wait(), 1) assert script_obj.is_running assert len(events) == 0 except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: await script_obj.async_stop() assert not script_obj.is_running # Make sure the script is really stopped. async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=5)) await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 0 @pytest.mark.parametrize("action_type", ["template", "trigger"]) async def test_wait_basic(hass, action_type): """Test wait actions.""" wait_alias = "wait step" action = {"alias": wait_alias} if action_type == "template": action["wait_template"] = "{{ states.switch.test.state == 'off' }}" else: action["wait_for_trigger"] = { "platform": "state", "entity_id": "switch.test", "to": "off", } sequence = cv.SCRIPT_SCHEMA(action) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") wait_started_flag = async_watch_for_action(script_obj, wait_alias) try: hass.states.async_set("switch.test", "on") hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(wait_started_flag.wait(), 1) assert script_obj.is_running assert script_obj.last_action == wait_alias except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert not script_obj.is_running assert script_obj.last_action is None @pytest.mark.parametrize("action_type", ["template", "trigger"]) async def test_multiple_runs_wait(hass, action_type): """Test multiple runs with wait in script.""" event = "test_event" events = async_capture_events(hass, event) if action_type == "template": action = {"wait_template": "{{ states.switch.test.state == 'off' }}"} else: action = { "wait_for_trigger": { "platform": "state", "entity_id": "switch.test", "to": "off", } } sequence = cv.SCRIPT_SCHEMA( [ {"event": event, "event_data": {"value": 1}}, action, {"event": event, "event_data": {"value": 2}}, ] ) script_obj = script.Script( hass, sequence, "Test Name", "test_domain", script_mode="parallel", max_runs=2 ) wait_started_flag = async_watch_for_action(script_obj, "wait") try: hass.states.async_set("switch.test", "on") hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(wait_started_flag.wait(), 1) assert script_obj.is_running assert len(events) == 1 assert events[-1].data["value"] == 1 # Start second run of script while first run is in wait_template. wait_started_flag.clear() hass.async_create_task(script_obj.async_run()) await asyncio.wait_for(wait_started_flag.wait(), 1) except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 4 assert events[-3].data["value"] == 1 assert events[-2].data["value"] == 2 assert events[-1].data["value"] == 2 @pytest.mark.parametrize("action_type", ["template", "trigger"]) async def test_cancel_wait(hass, action_type): """Test the cancelling while wait is present.""" event = "test_event" events = async_capture_events(hass, event) if action_type == "template": action = {"wait_template": "{{ states.switch.test.state == 'off' }}"} else: action = { "wait_for_trigger": { "platform": "state", "entity_id": "switch.test", "to": "off", } } sequence = cv.SCRIPT_SCHEMA([action, {"event": event}]) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") wait_started_flag = async_watch_for_action(script_obj, "wait") try: hass.states.async_set("switch.test", "on") hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(wait_started_flag.wait(), 1) assert script_obj.is_running assert len(events) == 0 except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: await script_obj.async_stop() assert not script_obj.is_running # Make sure the script is really stopped. hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 0 async def test_wait_template_not_schedule(hass): """Test the wait template with correct condition.""" event = "test_event" events = async_capture_events(hass, event) sequence = cv.SCRIPT_SCHEMA( [ {"event": event}, {"wait_template": "{{ states.switch.test.state == 'on' }}"}, {"event": event}, ] ) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") hass.states.async_set("switch.test", "on") await script_obj.async_run(context=Context()) await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 2 @pytest.mark.parametrize( "timeout_param", [5, "{{ 5 }}", {"seconds": 5}, {"seconds": "{{ 5 }}"}] ) @pytest.mark.parametrize("action_type", ["template", "trigger"]) async def test_wait_timeout(hass, caplog, timeout_param, action_type): """Test the wait timeout option.""" event = "test_event" events = async_capture_events(hass, event) if action_type == "template": action = {"wait_template": "{{ states.switch.test.state == 'off' }}"} else: action = { "wait_for_trigger": { "platform": "state", "entity_id": "switch.test", "to": "off", } } action["timeout"] = timeout_param action["continue_on_timeout"] = True sequence = cv.SCRIPT_SCHEMA([action, {"event": event}]) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") wait_started_flag = async_watch_for_action(script_obj, "wait") try: hass.states.async_set("switch.test", "on") hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(wait_started_flag.wait(), 1) assert script_obj.is_running assert len(events) == 0 except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: cur_time = dt_util.utcnow() async_fire_time_changed(hass, cur_time + timedelta(seconds=4)) await asyncio.sleep(0) assert len(events) == 0 async_fire_time_changed(hass, cur_time + timedelta(seconds=5)) await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 1 assert "(timeout: 0:00:05)" in caplog.text @pytest.mark.parametrize( "continue_on_timeout,n_events", [(False, 0), (True, 1), (None, 1)] ) @pytest.mark.parametrize("action_type", ["template", "trigger"]) async def test_wait_continue_on_timeout( hass, continue_on_timeout, n_events, action_type ): """Test the wait continue_on_timeout option.""" event = "test_event" events = async_capture_events(hass, event) if action_type == "template": action = {"wait_template": "{{ states.switch.test.state == 'off' }}"} else: action = { "wait_for_trigger": { "platform": "state", "entity_id": "switch.test", "to": "off", } } action["timeout"] = 5 if continue_on_timeout is not None: action["continue_on_timeout"] = continue_on_timeout sequence = cv.SCRIPT_SCHEMA([action, {"event": event}]) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") wait_started_flag = async_watch_for_action(script_obj, "wait") try: hass.states.async_set("switch.test", "on") hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(wait_started_flag.wait(), 1) assert script_obj.is_running assert len(events) == 0 except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=5)) await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == n_events async def test_wait_template_variables_in(hass): """Test the wait template with input variables.""" sequence = cv.SCRIPT_SCHEMA({"wait_template": "{{ is_state(data, 'off') }}"}) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") wait_started_flag = async_watch_for_action(script_obj, "wait") try: hass.states.async_set("switch.test", "on") hass.async_create_task( script_obj.async_run(MappingProxyType({"data": "switch.test"}), Context()) ) await asyncio.wait_for(wait_started_flag.wait(), 1) assert script_obj.is_running except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert not script_obj.is_running async def test_wait_template_with_utcnow(hass): """Test the wait template with utcnow.""" sequence = cv.SCRIPT_SCHEMA({"wait_template": "{{ utcnow().hours == 12 }}"}) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") wait_started_flag = async_watch_for_action(script_obj, "wait") start_time = dt_util.utcnow() + timedelta(hours=24) try: hass.async_create_task(script_obj.async_run(context=Context())) async_fire_time_changed(hass, start_time.replace(hour=5)) assert not script_obj.is_running async_fire_time_changed(hass, start_time.replace(hour=12)) await asyncio.wait_for(wait_started_flag.wait(), 1) assert script_obj.is_running except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: async_fire_time_changed(hass, start_time.replace(hour=3)) await hass.async_block_till_done() assert not script_obj.is_running @pytest.mark.parametrize("mode", ["no_timeout", "timeout_finish", "timeout_not_finish"]) @pytest.mark.parametrize("action_type", ["template", "trigger"]) async def test_wait_variables_out(hass, mode, action_type): """Test the wait output variable.""" event = "test_event" events = async_capture_events(hass, event) if action_type == "template": action = {"wait_template": "{{ states.switch.test.state == 'off' }}"} event_key = "completed" else: action = { "wait_for_trigger": { "platform": "state", "entity_id": "switch.test", "to": "off", } } event_key = "trigger" if mode != "no_timeout": action["timeout"] = 5 action["continue_on_timeout"] = True sequence = [ action, { "event": event, "event_data_template": { event_key: f"{{{{ wait.{event_key} }}}}", "remaining": "{{ wait.remaining }}", }, }, ] sequence = cv.SCRIPT_SCHEMA(sequence) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") wait_started_flag = async_watch_for_action(script_obj, "wait") try: hass.states.async_set("switch.test", "on") hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(wait_started_flag.wait(), 1) assert script_obj.is_running assert len(events) == 0 except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: if mode == "timeout_not_finish": async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=5)) else: hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 1 if action_type == "template": assert events[0].data["completed"] == (mode != "timeout_not_finish") elif mode != "timeout_not_finish": assert "'to_state': default.""" max_runs = script.DEFAULT_MAX + 1 script_obj = script.Script( hass, cv.SCRIPT_SCHEMA(action), "Test Name", "test_domain", script_mode="parallel", max_runs=max_runs, ) events = async_capture_events(hass, "abc") for _ in range(max_runs): hass.async_create_task(script_obj.async_run(context=Context())) await hass.async_block_till_done() assert "WARNING" not in caplog.text assert "ERROR" not in caplog.text assert len(events) == max_runs async def test_last_triggered(hass): """Test the last_triggered.""" event = "test_event" sequence = cv.SCRIPT_SCHEMA({"event": event}) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") assert script_obj.last_triggered is None time = dt_util.utcnow() with mock.patch("homeassistant.helpers.script.utcnow", return_value=time): await script_obj.async_run(context=Context()) await hass.async_block_till_done() assert script_obj.last_triggered == time async def test_propagate_error_service_not_found(hass): """Test that a script aborts when a service is not found.""" event = "test_event" events = async_capture_events(hass, event) sequence = cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": event}]) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") with pytest.raises(exceptions.ServiceNotFound): await script_obj.async_run(context=Context()) assert len(events) == 0 assert not script_obj.is_running async def test_propagate_error_invalid_service_data(hass): """Test that a script aborts when we send invalid service data.""" event = "test_event" events = async_capture_events(hass, event) calls = async_mock_service(hass, "test", "script", vol.Schema({"text": str})) sequence = cv.SCRIPT_SCHEMA( [{"service": "test.script", "data": {"text": 1}}, {"event": event}] ) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") with pytest.raises(vol.Invalid): await script_obj.async_run(context=Context()) assert len(events) == 0 assert len(calls) == 0 assert not script_obj.is_running async def test_propagate_error_service_exception(hass): """Test that a script aborts when a service throws an exception.""" event = "test_event" events = async_capture_events(hass, event) @callback def record_call(service): """Add recorded event to set.""" raise ValueError("BROKEN") hass.services.async_register("test", "script", record_call) sequence = cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": event}]) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") with pytest.raises(ValueError): await script_obj.async_run(context=Context()) assert len(events) == 0 assert not script_obj.is_running async def test_referenced_entities(hass): """Test referenced entities.""" script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ { "service": "test.script", "data": {"entity_id": "light.service_not_list"}, }, { "service": "test.script", "data": {"entity_id": ["light.service_list"]}, }, { "service": "test.script", "data": {"entity_id": "{{ 'light.service_template' }}"}, }, { "service": "test.script", "entity_id": "light.direct_entity_referenced", }, { "service": "test.script", "target": {"entity_id": "light.entity_in_target"}, }, { "service": "test.script", "data_template": {"entity_id": "light.entity_in_data_template"}, }, { "condition": "state", "entity_id": "sensor.condition", "state": "100", }, {"service": "test.script", "data": {"without": "entity_id"}}, {"scene": "scene.hello"}, {"event": "test_event"}, {"delay": "{{ delay_period }}"}, ] ), "Test Name", "test_domain", ) assert script_obj.referenced_entities == { "light.service_not_list", "light.service_list", "sensor.condition", "scene.hello", "light.direct_entity_referenced", "light.entity_in_target", "light.entity_in_data_template", } # Test we cache results. assert script_obj.referenced_entities is script_obj.referenced_entities async def test_referenced_devices(hass): """Test referenced entities.""" script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"domain": "light", "device_id": "script-dev-id"}, { "condition": "device", "device_id": "condition-dev-id", "domain": "switch", }, { "service": "test.script", "data": {"device_id": "data-string-id"}, }, { "service": "test.script", "data_template": {"device_id": "data-template-string-id"}, }, { "service": "test.script", "target": {"device_id": "target-string-id"}, }, { "service": "test.script", "target": {"device_id": ["target-list-id-1", "target-list-id-2"]}, }, ] ), "Test Name", "test_domain", ) assert script_obj.referenced_devices == { "script-dev-id", "condition-dev-id", "data-string-id", "data-template-string-id", "target-string-id", "target-list-id-1", "target-list-id-2", } # Test we cache results. assert script_obj.referenced_devices is script_obj.referenced_devices @contextmanager def does_not_raise(): """Indicate no exception is expected.""" yield async def test_script_mode_single(hass, caplog): """Test overlapping runs with max_runs = 1.""" event = "test_event" events = async_capture_events(hass, event) sequence = cv.SCRIPT_SCHEMA( [ {"event": event, "event_data": {"value": 1}}, {"wait_template": "{{ states.switch.test.state == 'off' }}"}, {"event": event, "event_data": {"value": 2}}, ] ) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") wait_started_flag = async_watch_for_action(script_obj, "wait") try: hass.states.async_set("switch.test", "on") hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(wait_started_flag.wait(), 1) assert script_obj.is_running assert len(events) == 1 assert events[0].data["value"] == 1 # Start second run of script while first run is suspended in wait_template. await script_obj.async_run(context=Context()) assert "Already running" in caplog.text assert script_obj.is_running except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 2 assert events[1].data["value"] == 2 @pytest.mark.parametrize("max_exceeded", [None, "WARNING", "INFO", "ERROR", "SILENT"]) @pytest.mark.parametrize( "script_mode,max_runs", [("single", 1), ("parallel", 2), ("queued", 2)] ) async def test_max_exceeded(hass, caplog, max_exceeded, script_mode, max_runs): """Test max_exceeded option.""" sequence = cv.SCRIPT_SCHEMA( {"wait_template": "{{ states.switch.test.state == 'off' }}"} ) if max_exceeded is None: script_obj = script.Script( hass, sequence, "Test Name", "test_domain", script_mode=script_mode, max_runs=max_runs, ) else: script_obj = script.Script( hass, sequence, "Test Name", "test_domain", script_mode=script_mode, max_runs=max_runs, max_exceeded=max_exceeded, ) hass.states.async_set("switch.test", "on") for _ in range(max_runs + 1): hass.async_create_task(script_obj.async_run(context=Context())) hass.states.async_set("switch.test", "off") await hass.async_block_till_done() if max_exceeded is None: max_exceeded = "WARNING" if max_exceeded == "SILENT": assert not any( any( message in rec.message for message in ("Already running", "Maximum number of runs exceeded") ) for rec in caplog.records ) else: assert any( rec.levelname == max_exceeded and any( message in rec.message for message in ("Already running", "Maximum number of runs exceeded") ) for rec in caplog.records ) @pytest.mark.parametrize( "script_mode,messages,last_events", [("restart", ["Restarting"], [2]), ("parallel", [], [2, 2])], ) async def test_script_mode_2(hass, caplog, script_mode, messages, last_events): """Test overlapping runs with max_runs > 1.""" event = "test_event" events = async_capture_events(hass, event) sequence = cv.SCRIPT_SCHEMA( [ {"event": event, "event_data": {"value": 1}}, {"wait_template": "{{ states.switch.test.state == 'off' }}"}, {"event": event, "event_data": {"value": 2}}, ] ) logger = logging.getLogger("TEST") max_runs = 1 if script_mode == "restart" else 2 script_obj = script.Script( hass, sequence, "Test Name", "test_domain", script_mode=script_mode, max_runs=max_runs, logger=logger, ) wait_started_flag = async_watch_for_action(script_obj, "wait") try: hass.states.async_set("switch.test", "on") hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(wait_started_flag.wait(), 1) assert script_obj.is_running assert len(events) == 1 assert events[0].data["value"] == 1 # Start second run of script while first run is suspended in wait_template. wait_started_flag.clear() hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(wait_started_flag.wait(), 1) assert script_obj.is_running assert len(events) == 2 assert events[1].data["value"] == 1 assert all( any( rec.levelname == "INFO" and rec.name == "TEST" and message in rec.message for rec in caplog.records ) for message in messages ) except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 2 + len(last_events) for idx, value in enumerate(last_events, start=2): assert events[idx].data["value"] == value async def test_script_mode_queued(hass): """Test overlapping runs with script_mode = 'queued' & max_runs > 1.""" event = "test_event" events = async_capture_events(hass, event) sequence = cv.SCRIPT_SCHEMA( [ {"event": event, "event_data": {"value": 1}}, { "wait_template": "{{ states.switch.test.state == 'off' }}", "alias": "wait_1", }, {"event": event, "event_data": {"value": 2}}, { "wait_template": "{{ states.switch.test.state == 'on' }}", "alias": "wait_2", }, ] ) logger = logging.getLogger("TEST") script_obj = script.Script( hass, sequence, "Test Name", "test_domain", script_mode="queued", max_runs=2, logger=logger, ) watch_messages = [] @callback def check_action(): for message, flag in watch_messages: if script_obj.last_action and message in script_obj.last_action: flag.set() script_obj.change_listener = check_action wait_started_flag_1 = asyncio.Event() watch_messages.append(("wait_1", wait_started_flag_1)) wait_started_flag_2 = asyncio.Event() watch_messages.append(("wait_2", wait_started_flag_2)) try: assert not script_obj.is_running assert script_obj.runs == 0 hass.states.async_set("switch.test", "on") hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(wait_started_flag_1.wait(), 1) assert script_obj.is_running assert script_obj.runs == 1 assert len(events) == 1 assert events[0].data["value"] == 1 # Start second run of script while first run is suspended in wait_template. # This second run should not start until the first run has finished. hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.sleep(0) assert script_obj.is_running assert script_obj.runs == 2 assert len(events) == 1 hass.states.async_set("switch.test", "off") await asyncio.wait_for(wait_started_flag_2.wait(), 1) assert script_obj.is_running assert script_obj.runs == 2 assert len(events) == 2 assert events[1].data["value"] == 2 wait_started_flag_1.clear() hass.states.async_set("switch.test", "on") await asyncio.wait_for(wait_started_flag_1.wait(), 1) assert script_obj.is_running assert script_obj.runs == 1 assert len(events) == 3 assert events[2].data["value"] == 1 except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: hass.states.async_set("switch.test", "off") await asyncio.sleep(0) hass.states.async_set("switch.test", "on") await hass.async_block_till_done() assert not script_obj.is_running assert script_obj.runs == 0 assert len(events) == 4 assert events[3].data["value"] == 2 async def test_script_mode_queued_cancel(hass): """Test canceling with a queued run.""" script_obj = script.Script( hass, cv.SCRIPT_SCHEMA({"wait_template": "{{ false }}"}), "Test Name", "test_domain", script_mode="queued", max_runs=2, ) wait_started_flag = async_watch_for_action(script_obj, "wait") try: assert not script_obj.is_running assert script_obj.runs == 0 task1 = hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(wait_started_flag.wait(), 1) task2 = hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.sleep(0) assert script_obj.is_running assert script_obj.runs == 2 with pytest.raises(asyncio.CancelledError): task2.cancel() await task2 assert script_obj.is_running assert script_obj.runs == 1 with pytest.raises(asyncio.CancelledError): task1.cancel() await task1 assert not script_obj.is_running assert script_obj.runs == 0 except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise async def test_script_logging(hass, caplog): """Test script logging.""" script_obj = script.Script(hass, [], "Script with % Name", "test_domain") script_obj._log("Test message with name %s", 1) assert "Script with % Name: Test message with name 1" in caplog.text async def test_shutdown_at(hass, caplog): """Test stopping scripts at shutdown.""" delay_alias = "delay step" sequence = cv.SCRIPT_SCHEMA({"delay": {"seconds": 120}, "alias": delay_alias}) script_obj = script.Script(hass, sequence, "test script", "test_domain") delay_started_flag = async_watch_for_action(script_obj, delay_alias) try: hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(delay_started_flag.wait(), 1) assert script_obj.is_running assert script_obj.last_action == delay_alias except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: hass.bus.async_fire("homeassistant_stop") await hass.async_block_till_done() assert not script_obj.is_running assert "Stopping scripts running at shutdown: test script" in caplog.text async def test_shutdown_after(hass, caplog): """Test stopping scripts at shutdown.""" delay_alias = "delay step" sequence = cv.SCRIPT_SCHEMA({"delay": {"seconds": 120}, "alias": delay_alias}) script_obj = script.Script(hass, sequence, "test script", "test_domain") delay_started_flag = async_watch_for_action(script_obj, delay_alias) hass.state = CoreState.stopping hass.bus.async_fire("homeassistant_stop") await hass.async_block_till_done() try: hass.async_create_task(script_obj.async_run(context=Context())) await asyncio.wait_for(delay_started_flag.wait(), 1) assert script_obj.is_running assert script_obj.last_action == delay_alias except (AssertionError, asyncio.TimeoutError): await script_obj.async_stop() raise else: async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=60)) await hass.async_block_till_done() assert not script_obj.is_running assert ( "Stopping scripts running too long after shutdown: test script" in caplog.text ) async def test_update_logger(hass, caplog): """Test updating logger.""" sequence = cv.SCRIPT_SCHEMA({"event": "test_event"}) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") await script_obj.async_run(context=Context()) await hass.async_block_till_done() assert script.__name__ in caplog.text log_name = "testing.123" script_obj.update_logger(logging.getLogger(log_name)) await script_obj.async_run(context=Context()) await hass.async_block_till_done() assert log_name in caplog.text async def test_started_action(hass, caplog): """Test the callback of started_action.""" event = "test_event" log_message = "The script started!" logger = logging.getLogger("TEST") sequence = cv.SCRIPT_SCHEMA({"event": event}) script_obj = script.Script(hass, sequence, "Test Name", "test_domain") @callback def started_action(): logger.info(log_message) await script_obj.async_run(context=Context(), started_action=started_action) await hass.async_block_till_done() assert log_message in caplog.text async def test_set_variable(hass, caplog): """Test setting variables in scripts.""" sequence = cv.SCRIPT_SCHEMA( [ {"variables": {"variable": "value"}}, {"service": "test.script", "data": {"value": "{{ variable }}"}}, ] ) script_obj = script.Script(hass, sequence, "test script", "test_domain") mock_calls = async_mock_service(hass, "test", "script") await script_obj.async_run(context=Context()) await hass.async_block_till_done() assert mock_calls[0].data["value"] == "value" async def test_set_redefines_variable(hass, caplog): """Test setting variables based on their current value.""" sequence = cv.SCRIPT_SCHEMA( [ {"variables": {"variable": "1"}}, {"service": "test.script", "data": {"value": "{{ variable }}"}}, {"variables": {"variable": "{{ variable | int + 1 }}"}}, {"service": "test.script", "data": {"value": "{{ variable }}"}}, ] ) script_obj = script.Script(hass, sequence, "test script", "test_domain") mock_calls = async_mock_service(hass, "test", "script") await script_obj.async_run(context=Context()) await hass.async_block_till_done() assert mock_calls[0].data["value"] == 1 assert mock_calls[1].data["value"] == 2 async def test_validate_action_config(hass): """Validate action config.""" configs = { cv.SCRIPT_ACTION_CALL_SERVICE: {"service": "light.turn_on"}, cv.SCRIPT_ACTION_DELAY: {"delay": 5}, cv.SCRIPT_ACTION_WAIT_TEMPLATE: { "wait_template": "{{ states.light.kitchen.state == 'on' }}" }, cv.SCRIPT_ACTION_FIRE_EVENT: {"event": "my_event"}, cv.SCRIPT_ACTION_CHECK_CONDITION: { "condition": "{{ states.light.kitchen.state == 'on' }}" }, cv.SCRIPT_ACTION_DEVICE_AUTOMATION: { "domain": "light", "entity_id": "light.kitchen", "device_id": "abcd", "type": "turn_on", }, cv.SCRIPT_ACTION_ACTIVATE_SCENE: {"scene": "scene.relax"}, cv.SCRIPT_ACTION_REPEAT: { "repeat": {"count": 3, "sequence": [{"event": "repeat_event"}]} }, cv.SCRIPT_ACTION_CHOOSE: { "choose": [ { "condition": "{{ states.light.kitchen.state == 'on' }}", "sequence": [{"event": "choose_event"}], } ], "default": [{"event": "choose_default_event"}], }, cv.SCRIPT_ACTION_WAIT_FOR_TRIGGER: { "wait_for_trigger": [ {"platform": "event", "event_type": "wait_for_trigger_event"} ] }, cv.SCRIPT_ACTION_VARIABLES: {"variables": {"hello": "world"}}, } for key in cv.ACTION_TYPE_SCHEMAS: assert key in configs, f"No validate config test found for {key}" # Verify we raise if we don't know the action type with patch( "homeassistant.helpers.config_validation.determine_script_action", return_value="non-existing", ), pytest.raises(ValueError): await script.async_validate_action_config(hass, {}) for action_type, config in configs.items(): assert cv.determine_script_action(config) == action_type try: await script.async_validate_action_config(hass, config) except vol.Invalid as err: assert False, f"{action_type} config invalid: {err}" async def test_embedded_wait_for_trigger_in_automation(hass): """Test an embedded wait for trigger.""" assert await async_setup_component( hass, "automation", { "automation": { "trigger": {"platform": "event", "event_type": "test_event"}, "action": { "repeat": { "while": [ { "condition": "template", "value_template": '{{ is_state("test.value1", "trigger-while") }}', } ], "sequence": [ {"event": "trigger_wait_event"}, { "wait_for_trigger": [ { "platform": "template", "value_template": '{{ is_state("test.value2", "trigger-wait") }}', } ] }, {"service": "test.script"}, ], } }, } }, ) hass.states.async_set("test.value1", "trigger-while") hass.states.async_set("test.value2", "not-trigger-wait") mock_calls = async_mock_service(hass, "test", "script") async def trigger_wait_event(_): # give script the time to attach the trigger. await asyncio.sleep(0) hass.states.async_set("test.value1", "not-trigger-while") hass.states.async_set("test.value2", "trigger-wait") hass.bus.async_listen("trigger_wait_event", trigger_wait_event) # Start automation hass.bus.async_fire("test_event") await hass.async_block_till_done() assert len(mock_calls) == 1