Add python_script response (#97937)

* Add response to python_script

* Reset output to empty dict if not valid dict

* Add tests for python_script response

* Raise Exceptions on service execution

* Add info on exception type

* Raise ServiceValidationError instead of ValueError

* Raise only on return_response=True

* Fix exception logger if no service response

* Create issue if exception is not raised

* Revert "Create issue if exception is not raised"

This reverts commit a61dd8619f.

---------

Co-authored-by: rikroe <rikroe@users.noreply.github.com>
This commit is contained in:
Richard Kroegel 2024-01-05 14:30:15 +01:00 committed by GitHub
parent 8645d9c717
commit e7573c3ed4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 188 additions and 8 deletions

View file

@ -20,8 +20,13 @@ from RestrictedPython.Guards import (
import voluptuous as vol
from homeassistant.const import CONF_DESCRIPTION, CONF_NAME, SERVICE_RELOAD
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.exceptions import HomeAssistantError
from homeassistant.core import (
HomeAssistant,
ServiceCall,
ServiceResponse,
SupportsResponse,
)
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from homeassistant.helpers.service import async_set_service_schema
from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import bind_hass
@ -107,9 +112,9 @@ def discover_scripts(hass):
_LOGGER.warning("Folder %s not found in configuration folder", FOLDER)
return False
def python_script_service_handler(call: ServiceCall) -> None:
def python_script_service_handler(call: ServiceCall) -> ServiceResponse:
"""Handle python script service calls."""
execute_script(hass, call.service, call.data)
return execute_script(hass, call.service, call.data, call.return_response)
existing = hass.services.services.get(DOMAIN, {}).keys()
for existing_service in existing:
@ -126,7 +131,12 @@ def discover_scripts(hass):
for fil in glob.iglob(os.path.join(path, "*.py")):
name = os.path.splitext(os.path.basename(fil))[0]
hass.services.register(DOMAIN, name, python_script_service_handler)
hass.services.register(
DOMAIN,
name,
python_script_service_handler,
supports_response=SupportsResponse.OPTIONAL,
)
service_desc = {
CONF_NAME: services_dict.get(name, {}).get("name", name),
@ -137,17 +147,17 @@ def discover_scripts(hass):
@bind_hass
def execute_script(hass, name, data=None):
def execute_script(hass, name, data=None, return_response=False):
"""Execute a script."""
filename = f"{name}.py"
raise_if_invalid_filename(filename)
with open(hass.config.path(FOLDER, filename), encoding="utf8") as fil:
source = fil.read()
execute(hass, filename, source, data)
return execute(hass, filename, source, data, return_response=return_response)
@bind_hass
def execute(hass, filename, source, data=None):
def execute(hass, filename, source, data=None, return_response=False):
"""Execute Python source."""
compiled = compile_restricted_exec(source, filename=filename)
@ -216,16 +226,39 @@ def execute(hass, filename, source, data=None):
"hass": hass,
"data": data or {},
"logger": logger,
"output": {},
}
try:
_LOGGER.info("Executing %s: %s", filename, data)
# pylint: disable-next=exec-used
exec(compiled.code, restricted_globals) # noqa: S102
_LOGGER.debug(
"Output of python_script: `%s`:\n%s",
filename,
restricted_globals["output"],
)
# Ensure that we're always returning a dictionary
if not isinstance(restricted_globals["output"], dict):
output_type = type(restricted_globals["output"])
restricted_globals["output"] = {}
raise ScriptError(
f"Expected `output` to be a dictionary, was {output_type}"
)
except ScriptError as err:
if return_response:
raise ServiceValidationError(f"Error executing script: {err}") from err
logger.error("Error executing script: %s", err)
return None
except Exception as err: # pylint: disable=broad-except
if return_response:
raise HomeAssistantError(
f"Error executing script ({type(err).__name__}): {err}"
) from err
logger.exception("Error executing script: %s", err)
return None
return restricted_globals["output"]
class StubPrinter: