Find referenced floors in automations & scripts (#113802)
This commit is contained in:
parent
44211dc761
commit
38d0854b70
6 changed files with 232 additions and 10 deletions
|
@ -233,6 +233,18 @@ def areas_in_automation(hass: HomeAssistant, entity_id: str) -> list[str]:
|
|||
return _x_in_automation(hass, entity_id, "referenced_areas")
|
||||
|
||||
|
||||
@callback
|
||||
def automations_with_floor(hass: HomeAssistant, floor_id: str) -> list[str]:
|
||||
"""Return all automations that reference the floor."""
|
||||
return _automations_with_x(hass, floor_id, "referenced_floors")
|
||||
|
||||
|
||||
@callback
|
||||
def floors_in_automation(hass: HomeAssistant, entity_id: str) -> list[str]:
|
||||
"""Return all floors in an automation."""
|
||||
return _x_in_automation(hass, entity_id, "referenced_floors")
|
||||
|
||||
|
||||
@callback
|
||||
def automations_with_blueprint(hass: HomeAssistant, blueprint_path: str) -> list[str]:
|
||||
"""Return all automations that reference the blueprint."""
|
||||
|
@ -341,6 +353,11 @@ class BaseAutomationEntity(ToggleEntity, ABC):
|
|||
return {CONF_ID: self.unique_id}
|
||||
return None
|
||||
|
||||
@cached_property
|
||||
@abstractmethod
|
||||
def referenced_floors(self) -> set[str]:
|
||||
"""Return a set of referenced floors."""
|
||||
|
||||
@cached_property
|
||||
@abstractmethod
|
||||
def referenced_areas(self) -> set[str]:
|
||||
|
@ -396,6 +413,11 @@ class UnavailableAutomationEntity(BaseAutomationEntity):
|
|||
"""Return the name of the entity."""
|
||||
return self._name
|
||||
|
||||
@cached_property
|
||||
def referenced_floors(self) -> set[str]:
|
||||
"""Return a set of referenced floors."""
|
||||
return set()
|
||||
|
||||
@cached_property
|
||||
def referenced_areas(self) -> set[str]:
|
||||
"""Return a set of referenced areas."""
|
||||
|
@ -483,6 +505,11 @@ class AutomationEntity(BaseAutomationEntity, RestoreEntity):
|
|||
"""Return True if entity is on."""
|
||||
return self._async_detach_triggers is not None or self._is_enabled
|
||||
|
||||
@property
|
||||
def referenced_floors(self) -> set[str]:
|
||||
"""Return a set of referenced floors."""
|
||||
return self.action_script.referenced_floors
|
||||
|
||||
@cached_property
|
||||
def referenced_areas(self) -> set[str]:
|
||||
"""Return a set of referenced areas."""
|
||||
|
|
|
@ -158,6 +158,18 @@ def areas_in_script(hass: HomeAssistant, entity_id: str) -> list[str]:
|
|||
return _x_in_script(hass, entity_id, "referenced_areas")
|
||||
|
||||
|
||||
@callback
|
||||
def scripts_with_floor(hass: HomeAssistant, floor_id: str) -> list[str]:
|
||||
"""Return all scripts that reference the floor."""
|
||||
return _scripts_with_x(hass, floor_id, "referenced_floors")
|
||||
|
||||
|
||||
@callback
|
||||
def floors_in_script(hass: HomeAssistant, entity_id: str) -> list[str]:
|
||||
"""Return all floors in a script."""
|
||||
return _x_in_script(hass, entity_id, "referenced_floors")
|
||||
|
||||
|
||||
@callback
|
||||
def scripts_with_blueprint(hass: HomeAssistant, blueprint_path: str) -> list[str]:
|
||||
"""Return all scripts that reference the blueprint."""
|
||||
|
@ -389,6 +401,11 @@ class BaseScriptEntity(ToggleEntity, ABC):
|
|||
|
||||
raw_config: ConfigType | None
|
||||
|
||||
@cached_property
|
||||
@abstractmethod
|
||||
def referenced_floors(self) -> set[str]:
|
||||
"""Return a set of referenced floors."""
|
||||
|
||||
@cached_property
|
||||
@abstractmethod
|
||||
def referenced_areas(self) -> set[str]:
|
||||
|
@ -434,6 +451,11 @@ class UnavailableScriptEntity(BaseScriptEntity):
|
|||
"""Return the name of the entity."""
|
||||
return self._name
|
||||
|
||||
@cached_property
|
||||
def referenced_floors(self) -> set[str]:
|
||||
"""Return a set of referenced floors."""
|
||||
return set()
|
||||
|
||||
@cached_property
|
||||
def referenced_areas(self) -> set[str]:
|
||||
"""Return a set of referenced areas."""
|
||||
|
@ -517,6 +539,11 @@ class ScriptEntity(BaseScriptEntity, RestoreEntity):
|
|||
"""Return true if script is on."""
|
||||
return self.script.is_running
|
||||
|
||||
@cached_property
|
||||
def referenced_floors(self) -> set[str]:
|
||||
"""Return a set of referenced floors."""
|
||||
return self.script.referenced_floors
|
||||
|
||||
@cached_property
|
||||
def referenced_areas(self) -> set[str]:
|
||||
"""Return a set of referenced areas."""
|
||||
|
|
|
@ -13,7 +13,7 @@ from functools import partial
|
|||
import itertools
|
||||
import logging
|
||||
from types import MappingProxyType
|
||||
from typing import TYPE_CHECKING, Any, TypedDict, TypeVar, cast
|
||||
from typing import TYPE_CHECKING, Any, Literal, TypedDict, TypeVar, cast
|
||||
|
||||
import async_interrupt
|
||||
import voluptuous as vol
|
||||
|
@ -26,6 +26,7 @@ from homeassistant.const import (
|
|||
ATTR_AREA_ID,
|
||||
ATTR_DEVICE_ID,
|
||||
ATTR_ENTITY_ID,
|
||||
ATTR_FLOOR_ID,
|
||||
CONF_ALIAS,
|
||||
CONF_CHOOSE,
|
||||
CONF_CONDITION,
|
||||
|
@ -1380,17 +1381,27 @@ class Script:
|
|||
"""Return true if the current mode support max."""
|
||||
return self.script_mode in (SCRIPT_MODE_PARALLEL, SCRIPT_MODE_QUEUED)
|
||||
|
||||
@cached_property
|
||||
def referenced_floors(self) -> set[str]:
|
||||
"""Return a set of referenced fooors."""
|
||||
referenced_floors: set[str] = set()
|
||||
Script._find_referenced_target(ATTR_FLOOR_ID, referenced_floors, self.sequence)
|
||||
return referenced_floors
|
||||
|
||||
@cached_property
|
||||
def referenced_areas(self) -> set[str]:
|
||||
"""Return a set of referenced areas."""
|
||||
referenced_areas: set[str] = set()
|
||||
Script._find_referenced_areas(referenced_areas, self.sequence)
|
||||
Script._find_referenced_target(ATTR_AREA_ID, referenced_areas, self.sequence)
|
||||
return referenced_areas
|
||||
|
||||
@staticmethod
|
||||
def _find_referenced_areas(
|
||||
referenced: set[str], sequence: Sequence[dict[str, Any]]
|
||||
def _find_referenced_target(
|
||||
target: Literal["area_id", "floor_id"],
|
||||
referenced: set[str],
|
||||
sequence: Sequence[dict[str, Any]],
|
||||
) -> None:
|
||||
"""Find referenced target in a sequence."""
|
||||
for step in sequence:
|
||||
action = cv.determine_script_action(step)
|
||||
|
||||
|
@ -1400,22 +1411,28 @@ class Script:
|
|||
step.get(CONF_SERVICE_DATA),
|
||||
step.get(CONF_SERVICE_DATA_TEMPLATE),
|
||||
):
|
||||
_referenced_extract_ids(data, ATTR_AREA_ID, referenced)
|
||||
_referenced_extract_ids(data, target, referenced)
|
||||
|
||||
elif action == cv.SCRIPT_ACTION_CHOOSE:
|
||||
for choice in step[CONF_CHOOSE]:
|
||||
Script._find_referenced_areas(referenced, choice[CONF_SEQUENCE])
|
||||
Script._find_referenced_target(
|
||||
target, referenced, choice[CONF_SEQUENCE]
|
||||
)
|
||||
if CONF_DEFAULT in step:
|
||||
Script._find_referenced_areas(referenced, step[CONF_DEFAULT])
|
||||
Script._find_referenced_target(
|
||||
target, referenced, step[CONF_DEFAULT]
|
||||
)
|
||||
|
||||
elif action == cv.SCRIPT_ACTION_IF:
|
||||
Script._find_referenced_areas(referenced, step[CONF_THEN])
|
||||
Script._find_referenced_target(target, referenced, step[CONF_THEN])
|
||||
if CONF_ELSE in step:
|
||||
Script._find_referenced_areas(referenced, step[CONF_ELSE])
|
||||
Script._find_referenced_target(target, referenced, step[CONF_ELSE])
|
||||
|
||||
elif action == cv.SCRIPT_ACTION_PARALLEL:
|
||||
for script in step[CONF_PARALLEL]:
|
||||
Script._find_referenced_areas(referenced, script[CONF_SEQUENCE])
|
||||
Script._find_referenced_target(
|
||||
target, referenced, script[CONF_SEQUENCE]
|
||||
)
|
||||
|
||||
@cached_property
|
||||
def referenced_devices(self) -> set[str]:
|
||||
|
|
|
@ -1561,6 +1561,8 @@ async def test_extraction_functions_not_setup(hass: HomeAssistant) -> None:
|
|||
assert automation.devices_in_automation(hass, "automation.test") == []
|
||||
assert automation.automations_with_entity(hass, "light.in_both") == []
|
||||
assert automation.entities_in_automation(hass, "automation.test") == []
|
||||
assert automation.automations_with_floor(hass, "floor-in-both") == []
|
||||
assert automation.floors_in_automation(hass, "automation.test") == []
|
||||
|
||||
|
||||
async def test_extraction_functions_unknown_automation(hass: HomeAssistant) -> None:
|
||||
|
@ -1570,6 +1572,7 @@ async def test_extraction_functions_unknown_automation(hass: HomeAssistant) -> N
|
|||
assert automation.blueprint_in_automation(hass, "automation.unknown") is None
|
||||
assert automation.devices_in_automation(hass, "automation.unknown") == []
|
||||
assert automation.entities_in_automation(hass, "automation.unknown") == []
|
||||
assert automation.floors_in_automation(hass, "automation.unknown") == []
|
||||
|
||||
|
||||
async def test_extraction_functions_unavailable_automation(hass: HomeAssistant) -> None:
|
||||
|
@ -1595,6 +1598,8 @@ async def test_extraction_functions_unavailable_automation(hass: HomeAssistant)
|
|||
assert automation.devices_in_automation(hass, entity_id) == []
|
||||
assert automation.automations_with_entity(hass, "light.in_both") == []
|
||||
assert automation.entities_in_automation(hass, entity_id) == []
|
||||
assert automation.automations_with_floor(hass, "floor-in-both") == []
|
||||
assert automation.floors_in_automation(hass, entity_id) == []
|
||||
|
||||
|
||||
async def test_extraction_functions(
|
||||
|
@ -1694,6 +1699,10 @@ async def test_extraction_functions(
|
|||
"service": "test.test",
|
||||
"target": {"area_id": "area-in-both"},
|
||||
},
|
||||
{
|
||||
"service": "test.test",
|
||||
"target": {"floor_id": "floor-in-both"},
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
|
@ -1813,6 +1822,14 @@ async def test_extraction_functions(
|
|||
"service": "test.test",
|
||||
"target": {"area_id": "area-in-last"},
|
||||
},
|
||||
{
|
||||
"service": "test.test",
|
||||
"target": {"floor_id": "floor-in-both"},
|
||||
},
|
||||
{
|
||||
"service": "test.test",
|
||||
"target": {"floor_id": "floor-in-last"},
|
||||
},
|
||||
],
|
||||
},
|
||||
]
|
||||
|
@ -1855,6 +1872,14 @@ async def test_extraction_functions(
|
|||
"area-in-both",
|
||||
"area-in-last",
|
||||
}
|
||||
assert set(automation.automations_with_floor(hass, "floor-in-both")) == {
|
||||
"automation.test1",
|
||||
"automation.test3",
|
||||
}
|
||||
assert set(automation.floors_in_automation(hass, "automation.test3")) == {
|
||||
"floor-in-both",
|
||||
"floor-in-last",
|
||||
}
|
||||
assert automation.blueprint_in_automation(hass, "automation.test3") is None
|
||||
|
||||
|
||||
|
|
|
@ -684,11 +684,14 @@ async def test_extraction_functions_not_setup(hass: HomeAssistant) -> None:
|
|||
assert script.devices_in_script(hass, "script.test") == []
|
||||
assert script.scripts_with_entity(hass, "light.in_both") == []
|
||||
assert script.entities_in_script(hass, "script.test") == []
|
||||
assert script.scripts_with_floor(hass, "floor-in-both") == []
|
||||
assert script.floors_in_script(hass, "script.test") == []
|
||||
|
||||
|
||||
async def test_extraction_functions_unknown_script(hass: HomeAssistant) -> None:
|
||||
"""Test extraction functions for an unknown script."""
|
||||
assert await async_setup_component(hass, DOMAIN, {})
|
||||
assert script.floors_in_script(hass, "script.unknown") == []
|
||||
assert script.areas_in_script(hass, "script.unknown") == []
|
||||
assert script.blueprint_in_script(hass, "script.unknown") is None
|
||||
assert script.devices_in_script(hass, "script.unknown") == []
|
||||
|
@ -712,6 +715,8 @@ async def test_extraction_functions_unavailable_script(hass: HomeAssistant) -> N
|
|||
assert script.devices_in_script(hass, entity_id) == []
|
||||
assert script.scripts_with_entity(hass, "light.in_both") == []
|
||||
assert script.entities_in_script(hass, entity_id) == []
|
||||
assert script.scripts_with_floor(hass, "floor-in-both") == []
|
||||
assert script.floors_in_script(hass, entity_id) == []
|
||||
|
||||
|
||||
async def test_extraction_functions(
|
||||
|
@ -756,6 +761,10 @@ async def test_extraction_functions(
|
|||
"service": "test.test",
|
||||
"target": {"area_id": "area-in-both"},
|
||||
},
|
||||
{
|
||||
"service": "test.test",
|
||||
"target": {"floor_id": "floor-in-both"},
|
||||
},
|
||||
]
|
||||
},
|
||||
"test2": {
|
||||
|
@ -804,6 +813,14 @@ async def test_extraction_functions(
|
|||
"service": "test.test",
|
||||
"target": {"area_id": "area-in-last"},
|
||||
},
|
||||
{
|
||||
"service": "test.test",
|
||||
"target": {"floor_id": "floor-in-both"},
|
||||
},
|
||||
{
|
||||
"service": "test.test",
|
||||
"target": {"floor_id": "floor-in-last"},
|
||||
},
|
||||
],
|
||||
},
|
||||
}
|
||||
|
@ -835,6 +852,14 @@ async def test_extraction_functions(
|
|||
"area-in-both",
|
||||
"area-in-last",
|
||||
}
|
||||
assert set(script.scripts_with_floor(hass, "floor-in-both")) == {
|
||||
"script.test1",
|
||||
"script.test3",
|
||||
}
|
||||
assert set(script.floors_in_script(hass, "script.test3")) == {
|
||||
"floor-in-both",
|
||||
"floor-in-last",
|
||||
}
|
||||
assert script.blueprint_in_script(hass, "script.test3") is None
|
||||
|
||||
|
||||
|
|
|
@ -3695,6 +3695,107 @@ async def test_propagate_error_service_exception(hass: HomeAssistant) -> None:
|
|||
assert_action_trace(expected_trace, expected_script_execution="error")
|
||||
|
||||
|
||||
async def test_referenced_floors(hass: HomeAssistant) -> None:
|
||||
"""Test referenced floors."""
|
||||
script_obj = script.Script(
|
||||
hass,
|
||||
cv.SCRIPT_SCHEMA(
|
||||
[
|
||||
{
|
||||
"service": "test.script",
|
||||
"data": {"floor_id": "floor_service_not_list"},
|
||||
},
|
||||
{
|
||||
"service": "test.script",
|
||||
"data": {"floor_id": ["floor_service_list"]},
|
||||
},
|
||||
{
|
||||
"service": "test.script",
|
||||
"data": {"floor_id": "{{ 'floor_service_template' }}"},
|
||||
},
|
||||
{
|
||||
"service": "test.script",
|
||||
"target": {"floor_id": "floor_in_target"},
|
||||
},
|
||||
{
|
||||
"service": "test.script",
|
||||
"data_template": {"floor_id": "floor_in_data_template"},
|
||||
},
|
||||
{"service": "test.script", "data": {"without": "floor_id"}},
|
||||
{
|
||||
"choose": [
|
||||
{
|
||||
"conditions": "{{ true == false }}",
|
||||
"sequence": [
|
||||
{
|
||||
"service": "test.script",
|
||||
"data": {"floor_id": "floor_choice_1_seq"},
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"conditions": "{{ true == false }}",
|
||||
"sequence": [
|
||||
{
|
||||
"service": "test.script",
|
||||
"data": {"floor_id": "floor_choice_2_seq"},
|
||||
}
|
||||
],
|
||||
},
|
||||
],
|
||||
"default": [
|
||||
{
|
||||
"service": "test.script",
|
||||
"data": {"floor_id": "floor_default_seq"},
|
||||
}
|
||||
],
|
||||
},
|
||||
{"event": "test_event"},
|
||||
{"delay": "{{ delay_period }}"},
|
||||
{
|
||||
"if": [],
|
||||
"then": [
|
||||
{
|
||||
"service": "test.script",
|
||||
"data": {"floor_id": "floor_if_then"},
|
||||
}
|
||||
],
|
||||
"else": [
|
||||
{
|
||||
"service": "test.script",
|
||||
"data": {"floor_id": "floor_if_else"},
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"parallel": [
|
||||
{
|
||||
"service": "test.script",
|
||||
"data": {"floor_id": "floor_parallel"},
|
||||
}
|
||||
],
|
||||
},
|
||||
]
|
||||
),
|
||||
"Test Name",
|
||||
"test_domain",
|
||||
)
|
||||
assert script_obj.referenced_floors == {
|
||||
"floor_choice_1_seq",
|
||||
"floor_choice_2_seq",
|
||||
"floor_default_seq",
|
||||
"floor_in_data_template",
|
||||
"floor_in_target",
|
||||
"floor_service_list",
|
||||
"floor_service_not_list",
|
||||
"floor_if_then",
|
||||
"floor_if_else",
|
||||
"floor_parallel",
|
||||
}
|
||||
# Test we cache results.
|
||||
assert script_obj.referenced_floors is script_obj.referenced_floors
|
||||
|
||||
|
||||
async def test_referenced_areas(hass: HomeAssistant) -> None:
|
||||
"""Test referenced areas."""
|
||||
script_obj = script.Script(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue