Clean up mobile app webhooks (#30123)

This commit is contained in:
Paulus Schoutsen 2019-12-21 22:45:06 +01:00 committed by GitHub
parent fb3bb8220b
commit 834929a14e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 356 additions and 341 deletions

View file

@ -20,6 +20,9 @@ class RequestDataValidator:
def __init__(self, schema, allow_empty=False): def __init__(self, schema, allow_empty=False):
"""Initialize the decorator.""" """Initialize the decorator."""
if isinstance(schema, dict):
schema = vol.Schema(schema)
self._schema = schema self._schema = schema
self._allow_empty = allow_empty self._allow_empty = allow_empty

View file

@ -1,19 +1,4 @@
"""Constants for mobile_app.""" """Constants for mobile_app."""
import voluptuous as vol
from homeassistant.components.binary_sensor import (
DEVICE_CLASSES as BINARY_SENSOR_CLASSES,
)
from homeassistant.components.device_tracker import (
ATTR_BATTERY,
ATTR_GPS,
ATTR_GPS_ACCURACY,
ATTR_LOCATION_NAME,
)
from homeassistant.components.sensor import DEVICE_CLASSES as SENSOR_CLASSES
from homeassistant.const import ATTR_DOMAIN, ATTR_SERVICE, ATTR_SERVICE_DATA
from homeassistant.helpers import config_validation as cv
DOMAIN = "mobile_app" DOMAIN = "mobile_app"
STORAGE_KEY = DOMAIN STORAGE_KEY = DOMAIN
@ -71,100 +56,6 @@ ERR_ENCRYPTION_REQUIRED = "encryption_required"
ERR_SENSOR_NOT_REGISTERED = "not_registered" ERR_SENSOR_NOT_REGISTERED = "not_registered"
ERR_SENSOR_DUPLICATE_UNIQUE_ID = "duplicate_unique_id" ERR_SENSOR_DUPLICATE_UNIQUE_ID = "duplicate_unique_id"
WEBHOOK_TYPE_CALL_SERVICE = "call_service"
WEBHOOK_TYPE_FIRE_EVENT = "fire_event"
WEBHOOK_TYPE_GET_CONFIG = "get_config"
WEBHOOK_TYPE_GET_ZONES = "get_zones"
WEBHOOK_TYPE_REGISTER_SENSOR = "register_sensor"
WEBHOOK_TYPE_RENDER_TEMPLATE = "render_template"
WEBHOOK_TYPE_UPDATE_LOCATION = "update_location"
WEBHOOK_TYPE_UPDATE_REGISTRATION = "update_registration"
WEBHOOK_TYPE_UPDATE_SENSOR_STATES = "update_sensor_states"
WEBHOOK_TYPES = [
WEBHOOK_TYPE_CALL_SERVICE,
WEBHOOK_TYPE_FIRE_EVENT,
WEBHOOK_TYPE_GET_CONFIG,
WEBHOOK_TYPE_GET_ZONES,
WEBHOOK_TYPE_REGISTER_SENSOR,
WEBHOOK_TYPE_RENDER_TEMPLATE,
WEBHOOK_TYPE_UPDATE_LOCATION,
WEBHOOK_TYPE_UPDATE_REGISTRATION,
WEBHOOK_TYPE_UPDATE_SENSOR_STATES,
]
REGISTRATION_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_APP_DATA, default={}): dict,
vol.Required(ATTR_APP_ID): cv.string,
vol.Required(ATTR_APP_NAME): cv.string,
vol.Required(ATTR_APP_VERSION): cv.string,
vol.Required(ATTR_DEVICE_NAME): cv.string,
vol.Required(ATTR_MANUFACTURER): cv.string,
vol.Required(ATTR_MODEL): cv.string,
vol.Required(ATTR_OS_NAME): cv.string,
vol.Optional(ATTR_OS_VERSION): cv.string,
vol.Required(ATTR_SUPPORTS_ENCRYPTION, default=False): cv.boolean,
}
)
UPDATE_REGISTRATION_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_APP_DATA, default={}): dict,
vol.Required(ATTR_APP_VERSION): cv.string,
vol.Required(ATTR_DEVICE_NAME): cv.string,
vol.Required(ATTR_MANUFACTURER): cv.string,
vol.Required(ATTR_MODEL): cv.string,
vol.Optional(ATTR_OS_VERSION): cv.string,
}
)
WEBHOOK_PAYLOAD_SCHEMA = vol.Schema(
{
vol.Required(ATTR_WEBHOOK_TYPE): cv.string, # vol.In(WEBHOOK_TYPES)
vol.Required(ATTR_WEBHOOK_DATA, default={}): vol.Any(dict, list),
vol.Optional(ATTR_WEBHOOK_ENCRYPTED, default=False): cv.boolean,
vol.Optional(ATTR_WEBHOOK_ENCRYPTED_DATA): cv.string,
}
)
CALL_SERVICE_SCHEMA = vol.Schema(
{
vol.Required(ATTR_DOMAIN): cv.string,
vol.Required(ATTR_SERVICE): cv.string,
vol.Optional(ATTR_SERVICE_DATA, default={}): dict,
}
)
FIRE_EVENT_SCHEMA = vol.Schema(
{
vol.Required(ATTR_EVENT_TYPE): cv.string,
vol.Optional(ATTR_EVENT_DATA, default={}): dict,
}
)
RENDER_TEMPLATE_SCHEMA = vol.Schema(
{
str: {
vol.Required(ATTR_TEMPLATE): cv.template,
vol.Optional(ATTR_TEMPLATE_VARIABLES, default={}): dict,
}
}
)
UPDATE_LOCATION_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_LOCATION_NAME): cv.string,
vol.Required(ATTR_GPS): cv.gps,
vol.Required(ATTR_GPS_ACCURACY): cv.positive_int,
vol.Optional(ATTR_BATTERY): cv.positive_int,
vol.Optional(ATTR_SPEED): cv.positive_int,
vol.Optional(ATTR_ALTITUDE): vol.Coerce(float),
vol.Optional(ATTR_COURSE): cv.positive_int,
vol.Optional(ATTR_VERTICAL_ACCURACY): cv.positive_int,
}
)
ATTR_SENSOR_ATTRIBUTES = "attributes" ATTR_SENSOR_ATTRIBUTES = "attributes"
ATTR_SENSOR_DEVICE_CLASS = "device_class" ATTR_SENSOR_DEVICE_CLASS = "device_class"
@ -177,49 +68,5 @@ ATTR_SENSOR_TYPE_SENSOR = "sensor"
ATTR_SENSOR_UNIQUE_ID = "unique_id" ATTR_SENSOR_UNIQUE_ID = "unique_id"
ATTR_SENSOR_UOM = "unit_of_measurement" ATTR_SENSOR_UOM = "unit_of_measurement"
SENSOR_TYPES = [ATTR_SENSOR_TYPE_BINARY_SENSOR, ATTR_SENSOR_TYPE_SENSOR]
COMBINED_CLASSES = sorted(set(BINARY_SENSOR_CLASSES + SENSOR_CLASSES))
SIGNAL_SENSOR_UPDATE = DOMAIN + "_sensor_update" SIGNAL_SENSOR_UPDATE = DOMAIN + "_sensor_update"
SIGNAL_LOCATION_UPDATE = DOMAIN + "_location_update_{}" SIGNAL_LOCATION_UPDATE = DOMAIN + "_location_update_{}"
REGISTER_SENSOR_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_SENSOR_ATTRIBUTES, default={}): dict,
vol.Optional(ATTR_SENSOR_DEVICE_CLASS): vol.All(
vol.Lower, vol.In(COMBINED_CLASSES)
),
vol.Required(ATTR_SENSOR_NAME): cv.string,
vol.Required(ATTR_SENSOR_TYPE): vol.In(SENSOR_TYPES),
vol.Required(ATTR_SENSOR_UNIQUE_ID): cv.string,
vol.Optional(ATTR_SENSOR_UOM): cv.string,
vol.Required(ATTR_SENSOR_STATE): vol.Any(bool, str, int, float),
vol.Optional(ATTR_SENSOR_ICON, default="mdi:cellphone"): cv.icon,
}
)
UPDATE_SENSOR_STATE_SCHEMA = vol.All(
cv.ensure_list,
[
vol.Schema(
{
vol.Optional(ATTR_SENSOR_ATTRIBUTES, default={}): dict,
vol.Optional(ATTR_SENSOR_ICON, default="mdi:cellphone"): cv.icon,
vol.Required(ATTR_SENSOR_STATE): vol.Any(bool, str, int, float),
vol.Required(ATTR_SENSOR_TYPE): vol.In(SENSOR_TYPES),
vol.Required(ATTR_SENSOR_UNIQUE_ID): cv.string,
}
)
],
)
WEBHOOK_SCHEMAS = {
WEBHOOK_TYPE_CALL_SERVICE: CALL_SERVICE_SCHEMA,
WEBHOOK_TYPE_FIRE_EVENT: FIRE_EVENT_SCHEMA,
WEBHOOK_TYPE_REGISTER_SENSOR: REGISTER_SENSOR_SCHEMA,
WEBHOOK_TYPE_RENDER_TEMPLATE: RENDER_TEMPLATE_SCHEMA,
WEBHOOK_TYPE_UPDATE_LOCATION: UPDATE_LOCATION_SCHEMA,
WEBHOOK_TYPE_UPDATE_REGISTRATION: UPDATE_REGISTRATION_SCHEMA,
WEBHOOK_TYPE_UPDATE_SENSOR_STATES: UPDATE_SENSOR_STATE_SCHEMA,
}

View file

@ -1,6 +1,12 @@
"""Device tracker platform that adds support for OwnTracks over MQTT.""" """Device tracker platform that adds support for OwnTracks over MQTT."""
import logging import logging
from homeassistant.components.device_tracker import (
ATTR_BATTERY,
ATTR_GPS,
ATTR_GPS_ACCURACY,
ATTR_LOCATION_NAME,
)
from homeassistant.components.device_tracker.config_entry import TrackerEntity from homeassistant.components.device_tracker.config_entry import TrackerEntity
from homeassistant.components.device_tracker.const import SOURCE_TYPE_GPS from homeassistant.components.device_tracker.const import SOURCE_TYPE_GPS
from homeassistant.const import ATTR_BATTERY_LEVEL, ATTR_LATITUDE, ATTR_LONGITUDE from homeassistant.const import ATTR_BATTERY_LEVEL, ATTR_LATITUDE, ATTR_LONGITUDE
@ -9,13 +15,9 @@ from homeassistant.helpers.restore_state import RestoreEntity
from .const import ( from .const import (
ATTR_ALTITUDE, ATTR_ALTITUDE,
ATTR_BATTERY,
ATTR_COURSE, ATTR_COURSE,
ATTR_DEVICE_ID, ATTR_DEVICE_ID,
ATTR_DEVICE_NAME, ATTR_DEVICE_NAME,
ATTR_GPS,
ATTR_GPS_ACCURACY,
ATTR_LOCATION_NAME,
ATTR_SPEED, ATTR_SPEED,
ATTR_VERTICAL_ACCURACY, ATTR_VERTICAL_ACCURACY,
SIGNAL_LOCATION_UPDATE, SIGNAL_LOCATION_UPDATE,

View file

@ -5,20 +5,30 @@ import uuid
from aiohttp.web import Request, Response from aiohttp.web import Request, Response
from nacl.secret import SecretBox from nacl.secret import SecretBox
import voluptuous as vol
from homeassistant.components.http import HomeAssistantView from homeassistant.components.http import HomeAssistantView
from homeassistant.components.http.data_validator import RequestDataValidator from homeassistant.components.http.data_validator import RequestDataValidator
from homeassistant.const import CONF_WEBHOOK_ID, HTTP_CREATED from homeassistant.const import CONF_WEBHOOK_ID, HTTP_CREATED
from homeassistant.helpers import config_validation as cv
from .const import ( from .const import (
ATTR_APP_DATA,
ATTR_APP_ID,
ATTR_APP_NAME,
ATTR_APP_VERSION,
ATTR_DEVICE_ID, ATTR_DEVICE_ID,
ATTR_DEVICE_NAME,
ATTR_MANUFACTURER,
ATTR_MODEL,
ATTR_OS_NAME,
ATTR_OS_VERSION,
ATTR_SUPPORTS_ENCRYPTION, ATTR_SUPPORTS_ENCRYPTION,
CONF_CLOUDHOOK_URL, CONF_CLOUDHOOK_URL,
CONF_REMOTE_UI_URL, CONF_REMOTE_UI_URL,
CONF_SECRET, CONF_SECRET,
CONF_USER_ID, CONF_USER_ID,
DOMAIN, DOMAIN,
REGISTRATION_SCHEMA,
) )
from .helpers import supports_encryption from .helpers import supports_encryption
@ -29,7 +39,20 @@ class RegistrationsView(HomeAssistantView):
url = "/api/mobile_app/registrations" url = "/api/mobile_app/registrations"
name = "api:mobile_app:register" name = "api:mobile_app:register"
@RequestDataValidator(REGISTRATION_SCHEMA) @RequestDataValidator(
{
vol.Optional(ATTR_APP_DATA, default={}): dict,
vol.Required(ATTR_APP_ID): cv.string,
vol.Required(ATTR_APP_NAME): cv.string,
vol.Required(ATTR_APP_VERSION): cv.string,
vol.Required(ATTR_DEVICE_NAME): cv.string,
vol.Required(ATTR_MANUFACTURER): cv.string,
vol.Required(ATTR_MODEL): cv.string,
vol.Required(ATTR_OS_NAME): cv.string,
vol.Optional(ATTR_OS_VERSION): cv.string,
vol.Required(ATTR_SUPPORTS_ENCRYPTION, default=False): cv.boolean,
}
)
async def post(self, request: Request, data: Dict) -> Response: async def post(self, request: Request, data: Dict) -> Response:
"""Handle the POST request for registration.""" """Handle the POST request for registration."""
hass = request.app["hass"] hass = request.app["hass"]

View file

@ -1,10 +1,21 @@
"""Webhook handlers for mobile_app.""" """Webhook handlers for mobile_app."""
from functools import wraps
import logging import logging
from aiohttp.web import HTTPBadRequest, Request, Response from aiohttp.web import HTTPBadRequest, Request, Response
import voluptuous as vol import voluptuous as vol
from homeassistant.components.binary_sensor import (
DEVICE_CLASSES as BINARY_SENSOR_CLASSES,
)
from homeassistant.components.device_tracker import (
ATTR_BATTERY,
ATTR_GPS,
ATTR_GPS_ACCURACY,
ATTR_LOCATION_NAME,
)
from homeassistant.components.frontend import MANIFEST_JSON from homeassistant.components.frontend import MANIFEST_JSON
from homeassistant.components.sensor import DEVICE_CLASSES as SENSOR_CLASSES
from homeassistant.components.zone.const import DOMAIN as ZONE_DOMAIN from homeassistant.components.zone.const import DOMAIN as ZONE_DOMAIN
from homeassistant.const import ( from homeassistant.const import (
ATTR_DOMAIN, ATTR_DOMAIN,
@ -16,12 +27,17 @@ from homeassistant.const import (
) )
from homeassistant.core import EventOrigin from homeassistant.core import EventOrigin
from homeassistant.exceptions import HomeAssistantError, ServiceNotFound, TemplateError from homeassistant.exceptions import HomeAssistantError, ServiceNotFound, TemplateError
from homeassistant.helpers import device_registry as dr from homeassistant.helpers import config_validation as cv, device_registry as dr
from homeassistant.helpers.dispatcher import async_dispatcher_send from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.template import attach from homeassistant.helpers.template import attach
from homeassistant.helpers.typing import HomeAssistantType from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util.decorator import Registry
from .const import ( from .const import (
ATTR_ALTITUDE,
ATTR_APP_DATA,
ATTR_APP_VERSION,
ATTR_COURSE,
ATTR_DEVICE_ID, ATTR_DEVICE_ID,
ATTR_DEVICE_NAME, ATTR_DEVICE_NAME,
ATTR_EVENT_DATA, ATTR_EVENT_DATA,
@ -29,11 +45,21 @@ from .const import (
ATTR_MANUFACTURER, ATTR_MANUFACTURER,
ATTR_MODEL, ATTR_MODEL,
ATTR_OS_VERSION, ATTR_OS_VERSION,
ATTR_SENSOR_ATTRIBUTES,
ATTR_SENSOR_DEVICE_CLASS,
ATTR_SENSOR_ICON,
ATTR_SENSOR_NAME,
ATTR_SENSOR_STATE,
ATTR_SENSOR_TYPE, ATTR_SENSOR_TYPE,
ATTR_SENSOR_TYPE_BINARY_SENSOR,
ATTR_SENSOR_TYPE_SENSOR,
ATTR_SENSOR_UNIQUE_ID, ATTR_SENSOR_UNIQUE_ID,
ATTR_SENSOR_UOM,
ATTR_SPEED,
ATTR_SUPPORTS_ENCRYPTION, ATTR_SUPPORTS_ENCRYPTION,
ATTR_TEMPLATE, ATTR_TEMPLATE,
ATTR_TEMPLATE_VARIABLES, ATTR_TEMPLATE_VARIABLES,
ATTR_VERTICAL_ACCURACY,
ATTR_WEBHOOK_DATA, ATTR_WEBHOOK_DATA,
ATTR_WEBHOOK_ENCRYPTED, ATTR_WEBHOOK_ENCRYPTED,
ATTR_WEBHOOK_ENCRYPTED_DATA, ATTR_WEBHOOK_ENCRYPTED_DATA,
@ -50,18 +76,6 @@ from .const import (
ERR_SENSOR_NOT_REGISTERED, ERR_SENSOR_NOT_REGISTERED,
SIGNAL_LOCATION_UPDATE, SIGNAL_LOCATION_UPDATE,
SIGNAL_SENSOR_UPDATE, SIGNAL_SENSOR_UPDATE,
WEBHOOK_PAYLOAD_SCHEMA,
WEBHOOK_SCHEMAS,
WEBHOOK_TYPE_CALL_SERVICE,
WEBHOOK_TYPE_FIRE_EVENT,
WEBHOOK_TYPE_GET_CONFIG,
WEBHOOK_TYPE_GET_ZONES,
WEBHOOK_TYPE_REGISTER_SENSOR,
WEBHOOK_TYPE_RENDER_TEMPLATE,
WEBHOOK_TYPE_UPDATE_LOCATION,
WEBHOOK_TYPE_UPDATE_REGISTRATION,
WEBHOOK_TYPE_UPDATE_SENSOR_STATES,
WEBHOOK_TYPES,
) )
from .helpers import ( from .helpers import (
_decrypt_payload, _decrypt_payload,
@ -76,6 +90,46 @@ from .helpers import (
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
WEBHOOK_COMMANDS = Registry()
COMBINED_CLASSES = set(BINARY_SENSOR_CLASSES + SENSOR_CLASSES)
SENSOR_TYPES = [ATTR_SENSOR_TYPE_BINARY_SENSOR, ATTR_SENSOR_TYPE_SENSOR]
WEBHOOK_PAYLOAD_SCHEMA = vol.Schema(
{
vol.Required(ATTR_WEBHOOK_TYPE): cv.string,
vol.Required(ATTR_WEBHOOK_DATA, default={}): vol.Any(dict, list),
vol.Optional(ATTR_WEBHOOK_ENCRYPTED, default=False): cv.boolean,
vol.Optional(ATTR_WEBHOOK_ENCRYPTED_DATA): cv.string,
}
)
def validate_schema(schema):
"""Decorate a webhook function with a schema."""
if isinstance(schema, dict):
schema = vol.Schema(schema)
def wrapper(func):
"""Wrap function so we validate schema."""
@wraps(func)
async def validate_and_run(hass, config_entry, data):
"""Validate input and call handler."""
try:
data = schema(data)
except vol.Invalid as ex:
err = vol.humanize.humanize_error(data, ex)
_LOGGER.error("Received invalid webhook payload: %s", err)
return empty_okay_response()
return await func(hass, config_entry, data)
return validate_and_run
return wrapper
async def handle_webhook( async def handle_webhook(
hass: HomeAssistantType, webhook_id: str, request: Request hass: HomeAssistantType, webhook_id: str, request: Request
) -> Response: ) -> Response:
@ -83,12 +137,8 @@ async def handle_webhook(
if webhook_id in hass.data[DOMAIN][DATA_DELETED_IDS]: if webhook_id in hass.data[DOMAIN][DATA_DELETED_IDS]:
return Response(status=410) return Response(status=410)
headers = {}
config_entry = hass.data[DOMAIN][DATA_CONFIG_ENTRIES][webhook_id] config_entry = hass.data[DOMAIN][DATA_CONFIG_ENTRIES][webhook_id]
registration = config_entry.data
try: try:
req_data = await request.json() req_data = await request.json()
except ValueError: except ValueError:
@ -97,11 +147,11 @@ async def handle_webhook(
if ( if (
ATTR_WEBHOOK_ENCRYPTED not in req_data ATTR_WEBHOOK_ENCRYPTED not in req_data
and registration[ATTR_SUPPORTS_ENCRYPTION] and config_entry.data[ATTR_SUPPORTS_ENCRYPTION]
): ):
_LOGGER.warning( _LOGGER.warning(
"Refusing to accept unencrypted webhook from %s", "Refusing to accept unencrypted webhook from %s",
registration[ATTR_DEVICE_NAME], config_entry.data[ATTR_DEVICE_NAME],
) )
return error_response(ERR_ENCRYPTION_REQUIRED, "Encryption required") return error_response(ERR_ENCRYPTION_REQUIRED, "Encryption required")
@ -118,197 +168,286 @@ async def handle_webhook(
if req_data[ATTR_WEBHOOK_ENCRYPTED]: if req_data[ATTR_WEBHOOK_ENCRYPTED]:
enc_data = req_data[ATTR_WEBHOOK_ENCRYPTED_DATA] enc_data = req_data[ATTR_WEBHOOK_ENCRYPTED_DATA]
webhook_payload = _decrypt_payload(registration[CONF_SECRET], enc_data) webhook_payload = _decrypt_payload(config_entry.data[CONF_SECRET], enc_data)
if webhook_type not in WEBHOOK_TYPES: if webhook_type not in WEBHOOK_COMMANDS:
_LOGGER.error("Received invalid webhook type: %s", webhook_type) _LOGGER.error("Received invalid webhook type: %s", webhook_type)
return empty_okay_response() return empty_okay_response()
data = webhook_payload _LOGGER.debug(
"Received webhook payload for type %s: %s", webhook_type, webhook_payload
)
_LOGGER.debug("Received webhook payload for type %s: %s", webhook_type, data) return await WEBHOOK_COMMANDS[webhook_type](hass, config_entry, webhook_payload)
if webhook_type in WEBHOOK_SCHEMAS:
@WEBHOOK_COMMANDS.register("call_service")
@validate_schema(
{
vol.Required(ATTR_DOMAIN): cv.string,
vol.Required(ATTR_SERVICE): cv.string,
vol.Optional(ATTR_SERVICE_DATA, default={}): dict,
}
)
async def webhook_call_service(hass, config_entry, data):
"""Handle a call service webhook."""
try:
await hass.services.async_call(
data[ATTR_DOMAIN],
data[ATTR_SERVICE],
data[ATTR_SERVICE_DATA],
blocking=True,
context=registration_context(config_entry.data),
)
except (vol.Invalid, ServiceNotFound, Exception) as ex:
_LOGGER.error(
"Error when calling service during mobile_app "
"webhook (device name: %s): %s",
config_entry.data[ATTR_DEVICE_NAME],
ex,
)
raise HTTPBadRequest()
return empty_okay_response()
@WEBHOOK_COMMANDS.register("fire_event")
@validate_schema(
{
vol.Required(ATTR_EVENT_TYPE): cv.string,
vol.Optional(ATTR_EVENT_DATA, default={}): dict,
}
)
async def webhook_fire_event(hass, config_entry, data):
"""Handle a fire event webhook."""
event_type = data[ATTR_EVENT_TYPE]
hass.bus.async_fire(
event_type,
data[ATTR_EVENT_DATA],
EventOrigin.remote,
context=registration_context(config_entry.data),
)
return empty_okay_response()
@WEBHOOK_COMMANDS.register("render_template")
@validate_schema(
{
str: {
vol.Required(ATTR_TEMPLATE): cv.template,
vol.Optional(ATTR_TEMPLATE_VARIABLES, default={}): dict,
}
}
)
async def webhook_render_template(hass, config_entry, data):
"""Handle a render template webhook."""
resp = {}
for key, item in data.items():
try: try:
data = WEBHOOK_SCHEMAS[webhook_type](webhook_payload) tpl = item[ATTR_TEMPLATE]
except vol.Invalid as ex: attach(hass, tpl)
err = vol.humanize.humanize_error(webhook_payload, ex) resp[key] = tpl.async_render(item.get(ATTR_TEMPLATE_VARIABLES))
_LOGGER.error("Received invalid webhook payload: %s", err) except TemplateError as ex:
return empty_okay_response(headers=headers) resp[key] = {"error": str(ex)}
context = registration_context(registration) return webhook_response(resp, registration=config_entry.data)
if webhook_type == WEBHOOK_TYPE_CALL_SERVICE:
try: @WEBHOOK_COMMANDS.register("update_location")
await hass.services.async_call( @validate_schema(
data[ATTR_DOMAIN], {
data[ATTR_SERVICE], vol.Optional(ATTR_LOCATION_NAME): cv.string,
data[ATTR_SERVICE_DATA], vol.Required(ATTR_GPS): cv.gps,
blocking=True, vol.Required(ATTR_GPS_ACCURACY): cv.positive_int,
context=context, vol.Optional(ATTR_BATTERY): cv.positive_int,
vol.Optional(ATTR_SPEED): cv.positive_int,
vol.Optional(ATTR_ALTITUDE): vol.Coerce(float),
vol.Optional(ATTR_COURSE): cv.positive_int,
vol.Optional(ATTR_VERTICAL_ACCURACY): cv.positive_int,
}
)
async def webhook_update_location(hass, config_entry, data):
"""Handle an update location webhook."""
hass.helpers.dispatcher.async_dispatcher_send(
SIGNAL_LOCATION_UPDATE.format(config_entry.entry_id), data
)
return empty_okay_response()
@WEBHOOK_COMMANDS.register("update_registration")
@validate_schema(
{
vol.Optional(ATTR_APP_DATA, default={}): dict,
vol.Required(ATTR_APP_VERSION): cv.string,
vol.Required(ATTR_DEVICE_NAME): cv.string,
vol.Required(ATTR_MANUFACTURER): cv.string,
vol.Required(ATTR_MODEL): cv.string,
vol.Optional(ATTR_OS_VERSION): cv.string,
}
)
async def webhook_update_registration(hass, config_entry, data):
"""Handle an update registration webhook."""
new_registration = {**config_entry.data, **data}
device_registry = await dr.async_get_registry(hass)
device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
identifiers={(DOMAIN, config_entry.data[ATTR_DEVICE_ID])},
manufacturer=new_registration[ATTR_MANUFACTURER],
model=new_registration[ATTR_MODEL],
name=new_registration[ATTR_DEVICE_NAME],
sw_version=new_registration[ATTR_OS_VERSION],
)
hass.config_entries.async_update_entry(config_entry, data=new_registration)
return webhook_response(
safe_registration(new_registration), registration=new_registration,
)
@WEBHOOK_COMMANDS.register("register_sensor")
@validate_schema(
{
vol.Optional(ATTR_SENSOR_ATTRIBUTES, default={}): dict,
vol.Optional(ATTR_SENSOR_DEVICE_CLASS): vol.All(
vol.Lower, vol.In(COMBINED_CLASSES)
),
vol.Required(ATTR_SENSOR_NAME): cv.string,
vol.Required(ATTR_SENSOR_TYPE): vol.In(SENSOR_TYPES),
vol.Required(ATTR_SENSOR_UNIQUE_ID): cv.string,
vol.Optional(ATTR_SENSOR_UOM): cv.string,
vol.Required(ATTR_SENSOR_STATE): vol.Any(bool, str, int, float),
vol.Optional(ATTR_SENSOR_ICON, default="mdi:cellphone"): cv.icon,
}
)
async def webhook_register_sensor(hass, config_entry, data):
"""Handle a register sensor webhook."""
entity_type = data[ATTR_SENSOR_TYPE]
unique_id = data[ATTR_SENSOR_UNIQUE_ID]
unique_store_key = f"{config_entry.data[CONF_WEBHOOK_ID]}_{unique_id}"
if unique_store_key in hass.data[DOMAIN][entity_type]:
_LOGGER.error("Refusing to re-register existing sensor %s!", unique_id)
return error_response(
ERR_SENSOR_DUPLICATE_UNIQUE_ID,
f"{entity_type} {unique_id} already exists!",
status=409,
)
data[CONF_WEBHOOK_ID] = config_entry.data[CONF_WEBHOOK_ID]
hass.data[DOMAIN][entity_type][unique_store_key] = data
try:
await hass.data[DOMAIN][DATA_STORE].async_save(savable_state(hass))
except HomeAssistantError as ex:
_LOGGER.error("Error registering sensor: %s", ex)
return empty_okay_response()
register_signal = "{}_{}_register".format(DOMAIN, data[ATTR_SENSOR_TYPE])
async_dispatcher_send(hass, register_signal, data)
return webhook_response(
{"success": True}, registration=config_entry.data, status=HTTP_CREATED,
)
@WEBHOOK_COMMANDS.register("update_sensor_states")
@validate_schema(
vol.All(
cv.ensure_list,
[
vol.Schema(
{
vol.Optional(ATTR_SENSOR_ATTRIBUTES, default={}): dict,
vol.Optional(ATTR_SENSOR_ICON, default="mdi:cellphone"): cv.icon,
vol.Required(ATTR_SENSOR_STATE): vol.Any(bool, str, int, float),
vol.Required(ATTR_SENSOR_TYPE): vol.In(SENSOR_TYPES),
vol.Required(ATTR_SENSOR_UNIQUE_ID): cv.string,
}
) )
except (vol.Invalid, ServiceNotFound, Exception) as ex: ],
)
)
async def webhook_update_sensor_states(hass, config_entry, data):
"""Handle an update sensor states webhook."""
resp = {}
for sensor in data:
entity_type = sensor[ATTR_SENSOR_TYPE]
unique_id = sensor[ATTR_SENSOR_UNIQUE_ID]
unique_store_key = f"{config_entry.data[CONF_WEBHOOK_ID]}_{unique_id}"
if unique_store_key not in hass.data[DOMAIN][entity_type]:
_LOGGER.error( _LOGGER.error(
"Error when calling service during mobile_app " "Refusing to update non-registered sensor: %s", unique_store_key
"webhook (device name: %s): %s",
registration[ATTR_DEVICE_NAME],
ex,
) )
raise HTTPBadRequest() err_msg = f"{entity_type} {unique_id} is not registered"
resp[unique_id] = {
"success": False,
"error": {"code": ERR_SENSOR_NOT_REGISTERED, "message": err_msg},
}
continue
return empty_okay_response(headers=headers) entry = hass.data[DOMAIN][entity_type][unique_store_key]
if webhook_type == WEBHOOK_TYPE_FIRE_EVENT: new_state = {**entry, **sensor}
event_type = data[ATTR_EVENT_TYPE]
hass.bus.async_fire(
event_type, data[ATTR_EVENT_DATA], EventOrigin.remote, context=context
)
return empty_okay_response(headers=headers)
if webhook_type == WEBHOOK_TYPE_RENDER_TEMPLATE: hass.data[DOMAIN][entity_type][unique_store_key] = new_state
resp = {}
for key, item in data.items():
try:
tpl = item[ATTR_TEMPLATE]
attach(hass, tpl)
resp[key] = tpl.async_render(item.get(ATTR_TEMPLATE_VARIABLES))
except TemplateError as ex:
resp[key] = {"error": str(ex)}
return webhook_response(resp, registration=registration, headers=headers) safe = savable_state(hass)
if webhook_type == WEBHOOK_TYPE_UPDATE_LOCATION:
hass.helpers.dispatcher.async_dispatcher_send(
SIGNAL_LOCATION_UPDATE.format(config_entry.entry_id), data
)
return empty_okay_response(headers=headers)
if webhook_type == WEBHOOK_TYPE_UPDATE_REGISTRATION:
new_registration = {**registration, **data}
device_registry = await dr.async_get_registry(hass)
device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
identifiers={(DOMAIN, registration[ATTR_DEVICE_ID])},
manufacturer=new_registration[ATTR_MANUFACTURER],
model=new_registration[ATTR_MODEL],
name=new_registration[ATTR_DEVICE_NAME],
sw_version=new_registration[ATTR_OS_VERSION],
)
hass.config_entries.async_update_entry(config_entry, data=new_registration)
return webhook_response(
safe_registration(new_registration),
registration=registration,
headers=headers,
)
if webhook_type == WEBHOOK_TYPE_REGISTER_SENSOR:
entity_type = data[ATTR_SENSOR_TYPE]
unique_id = data[ATTR_SENSOR_UNIQUE_ID]
unique_store_key = f"{webhook_id}_{unique_id}"
if unique_store_key in hass.data[DOMAIN][entity_type]:
_LOGGER.error("Refusing to re-register existing sensor %s!", unique_id)
return error_response(
ERR_SENSOR_DUPLICATE_UNIQUE_ID,
f"{entity_type} {unique_id} already exists!",
status=409,
)
data[CONF_WEBHOOK_ID] = webhook_id
hass.data[DOMAIN][entity_type][unique_store_key] = data
try: try:
await hass.data[DOMAIN][DATA_STORE].async_save(savable_state(hass)) await hass.data[DOMAIN][DATA_STORE].async_save(safe)
except HomeAssistantError as ex: except HomeAssistantError as ex:
_LOGGER.error("Error registering sensor: %s", ex) _LOGGER.error("Error updating mobile_app registration: %s", ex)
return empty_okay_response() return empty_okay_response()
register_signal = "{}_{}_register".format(DOMAIN, data[ATTR_SENSOR_TYPE]) async_dispatcher_send(hass, SIGNAL_SENSOR_UPDATE, new_state)
async_dispatcher_send(hass, register_signal, data)
return webhook_response( resp[unique_id] = {"success": True}
{"success": True},
registration=registration,
status=HTTP_CREATED,
headers=headers,
)
if webhook_type == WEBHOOK_TYPE_UPDATE_SENSOR_STATES: return webhook_response(resp, registration=config_entry.data)
resp = {}
for sensor in data:
entity_type = sensor[ATTR_SENSOR_TYPE]
unique_id = sensor[ATTR_SENSOR_UNIQUE_ID]
unique_store_key = f"{webhook_id}_{unique_id}" @WEBHOOK_COMMANDS.register("get_zones")
async def webhook_get_zones(hass, config_entry, data):
"""Handle a get zones webhook."""
zones = [
hass.states.get(entity_id)
for entity_id in sorted(hass.states.async_entity_ids(ZONE_DOMAIN))
]
return webhook_response(zones, registration=config_entry.data)
if unique_store_key not in hass.data[DOMAIN][entity_type]:
_LOGGER.error(
"Refusing to update non-registered sensor: %s", unique_store_key
)
err_msg = f"{entity_type} {unique_id} is not registered"
resp[unique_id] = {
"success": False,
"error": {"code": ERR_SENSOR_NOT_REGISTERED, "message": err_msg},
}
continue
entry = hass.data[DOMAIN][entity_type][unique_store_key] @WEBHOOK_COMMANDS.register("get_config")
async def webhook_get_config(hass, config_entry, data):
"""Handle a get config webhook."""
hass_config = hass.config.as_dict()
new_state = {**entry, **sensor} resp = {
"latitude": hass_config["latitude"],
"longitude": hass_config["longitude"],
"elevation": hass_config["elevation"],
"unit_system": hass_config["unit_system"],
"location_name": hass_config["location_name"],
"time_zone": hass_config["time_zone"],
"components": hass_config["components"],
"version": hass_config["version"],
"theme_color": MANIFEST_JSON["theme_color"],
}
hass.data[DOMAIN][entity_type][unique_store_key] = new_state if CONF_CLOUDHOOK_URL in config_entry.data:
resp[CONF_CLOUDHOOK_URL] = config_entry.data[CONF_CLOUDHOOK_URL]
safe = savable_state(hass) try:
resp[CONF_REMOTE_UI_URL] = hass.components.cloud.async_remote_ui_url()
except hass.components.cloud.CloudNotAvailable:
pass
try: return webhook_response(resp, registration=config_entry.data)
await hass.data[DOMAIN][DATA_STORE].async_save(safe)
except HomeAssistantError as ex:
_LOGGER.error("Error updating mobile_app registration: %s", ex)
return empty_okay_response()
async_dispatcher_send(hass, SIGNAL_SENSOR_UPDATE, new_state)
resp[unique_id] = {"success": True}
return webhook_response(resp, registration=registration, headers=headers)
if webhook_type == WEBHOOK_TYPE_GET_ZONES:
zones = (
hass.states.get(entity_id)
for entity_id in sorted(hass.states.async_entity_ids(ZONE_DOMAIN))
)
return webhook_response(list(zones), registration=registration, headers=headers)
if webhook_type == WEBHOOK_TYPE_GET_CONFIG:
hass_config = hass.config.as_dict()
resp = {
"latitude": hass_config["latitude"],
"longitude": hass_config["longitude"],
"elevation": hass_config["elevation"],
"unit_system": hass_config["unit_system"],
"location_name": hass_config["location_name"],
"time_zone": hass_config["time_zone"],
"components": hass_config["components"],
"version": hass_config["version"],
"theme_color": MANIFEST_JSON["theme_color"],
}
if CONF_CLOUDHOOK_URL in registration:
resp[CONF_CLOUDHOOK_URL] = registration[CONF_CLOUDHOOK_URL]
try:
resp[CONF_REMOTE_UI_URL] = hass.components.cloud.async_remote_ui_url()
except hass.components.cloud.CloudNotAvailable:
pass
return webhook_response(resp, registration=registration, headers=headers)

View file

@ -130,6 +130,7 @@ IGNORE_VIOLATIONS = [
"prometheus", "prometheus",
"conversation", "conversation",
"logbook", "logbook",
"mobile_app",
# These should be extracted to external package # These should be extracted to external package
"pvoutput", "pvoutput",
"dwd_weather_warnings", "dwd_weather_warnings",