mobile_app improvements (#21607)
* First webhook commands for getting and deleting single registrations * Keep a list of deleted webhook IDs so we can 410 if the webhook receives traffic in the future * Return a empty JSON object instead of None * Split up mobile_app bits into individual files * Add typing * Sort keys * Remove unused async_setup_entry * New decorator method of registering webhooks * Add tests for cloud hook forwarding and improve error handling for cloud hooks * Initial implementation of platform specific logic * Add get registrations by user ID websocket call, minor style fixes * Stop using resp dictionary during registration * Move mobile_app/ios.py to ios/mobile_app.py * Log any errors encountered during webhook * Improve update registration call * Split up mobile_app tests to match split up component * Fix tests * Remove integration_map in favor of component name in registration * Add a few helper functions for custom logic components to use * Load the app_component platform at device registration or component setup time * Remove extraneous function * Use guard function for checking if component is in device * Inline websocket schemas * Rename ATTR_s used in storage to DATA_ prefix * squash flake8 and pylint issues * Remove ios.mobile_app platform * Dont mark websocket_api as a dependency * Return standard empty_okay_response with 400 if no JSON sent * Ensure deleted webhook IDs are registered at launch * Remove the creation of cloudhooks during handle_webhook * Rename device to registration everywhere applicable * Dont check if cloud is logged in, just check if cloud is in components * Dont ever use cloudhook_id * Remove component loading logic for a later PR * Cast exception to string * Remove unused functions
This commit is contained in:
parent
49eaa34e03
commit
9ab0753cf7
12 changed files with 1022 additions and 613 deletions
|
@ -1,355 +1,37 @@
|
|||
"""Support for native mobile apps."""
|
||||
import logging
|
||||
import json
|
||||
from functools import partial
|
||||
"""Integrates Native Apps to Home Assistant."""
|
||||
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
|
||||
|
||||
import voluptuous as vol
|
||||
from aiohttp.web import json_response, Response
|
||||
from aiohttp.web_exceptions import HTTPBadRequest
|
||||
from .const import (DATA_DELETED_IDS, DATA_REGISTRATIONS, DATA_STORE, DOMAIN,
|
||||
STORAGE_KEY, STORAGE_VERSION)
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.auth.util import generate_secret
|
||||
import homeassistant.core as ha
|
||||
from homeassistant.core import Context
|
||||
from homeassistant.components import webhook
|
||||
from homeassistant.components.http import HomeAssistantView
|
||||
from homeassistant.components.http.data_validator import RequestDataValidator
|
||||
from homeassistant.components.device_tracker import (
|
||||
DOMAIN as DEVICE_TRACKER_DOMAIN, SERVICE_SEE as DEVICE_TRACKER_SEE,
|
||||
SERVICE_SEE_PAYLOAD_SCHEMA as SEE_SCHEMA)
|
||||
from homeassistant.const import (ATTR_DOMAIN, ATTR_SERVICE, ATTR_SERVICE_DATA,
|
||||
HTTP_BAD_REQUEST, HTTP_CREATED,
|
||||
HTTP_INTERNAL_SERVER_ERROR, CONF_WEBHOOK_ID)
|
||||
from homeassistant.exceptions import (HomeAssistantError, ServiceNotFound,
|
||||
TemplateError)
|
||||
from homeassistant.helpers import config_validation as cv, template
|
||||
from homeassistant.helpers.typing import HomeAssistantType
|
||||
|
||||
REQUIREMENTS = ['PyNaCl==1.3.0']
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
DOMAIN = 'mobile_app'
|
||||
from .http_api import register_http_handlers
|
||||
from .webhook import register_deleted_webhooks, setup_registration
|
||||
from .websocket_api import register_websocket_handlers
|
||||
|
||||
DEPENDENCIES = ['device_tracker', 'http', 'webhook']
|
||||
|
||||
STORAGE_KEY = DOMAIN
|
||||
STORAGE_VERSION = 1
|
||||
|
||||
CONF_SECRET = 'secret'
|
||||
CONF_USER_ID = 'user_id'
|
||||
|
||||
ATTR_APP_DATA = 'app_data'
|
||||
ATTR_APP_ID = 'app_id'
|
||||
ATTR_APP_NAME = 'app_name'
|
||||
ATTR_APP_VERSION = 'app_version'
|
||||
ATTR_DEVICE_NAME = 'device_name'
|
||||
ATTR_MANUFACTURER = 'manufacturer'
|
||||
ATTR_MODEL = 'model'
|
||||
ATTR_OS_VERSION = 'os_version'
|
||||
ATTR_SUPPORTS_ENCRYPTION = 'supports_encryption'
|
||||
|
||||
ATTR_EVENT_DATA = 'event_data'
|
||||
ATTR_EVENT_TYPE = 'event_type'
|
||||
|
||||
ATTR_TEMPLATE = 'template'
|
||||
ATTR_TEMPLATE_VARIABLES = 'variables'
|
||||
|
||||
ATTR_WEBHOOK_DATA = 'data'
|
||||
ATTR_WEBHOOK_ENCRYPTED = 'encrypted'
|
||||
ATTR_WEBHOOK_ENCRYPTED_DATA = 'encrypted_data'
|
||||
ATTR_WEBHOOK_TYPE = 'type'
|
||||
|
||||
WEBHOOK_TYPE_CALL_SERVICE = 'call_service'
|
||||
WEBHOOK_TYPE_FIRE_EVENT = 'fire_event'
|
||||
WEBHOOK_TYPE_RENDER_TEMPLATE = 'render_template'
|
||||
WEBHOOK_TYPE_UPDATE_LOCATION = 'update_location'
|
||||
WEBHOOK_TYPE_UPDATE_REGISTRATION = 'update_registration'
|
||||
|
||||
WEBHOOK_TYPES = [WEBHOOK_TYPE_CALL_SERVICE, WEBHOOK_TYPE_FIRE_EVENT,
|
||||
WEBHOOK_TYPE_RENDER_TEMPLATE, WEBHOOK_TYPE_UPDATE_LOCATION,
|
||||
WEBHOOK_TYPE_UPDATE_REGISTRATION]
|
||||
|
||||
REGISTER_DEVICE_SCHEMA = vol.Schema({
|
||||
vol.Optional(ATTR_APP_DATA, default={}): dict,
|
||||
vol.Required(ATTR_APP_ID): cv.string,
|
||||
vol.Optional(ATTR_APP_NAME): cv.string,
|
||||
vol.Required(ATTR_APP_VERSION): cv.string,
|
||||
vol.Required(ATTR_DEVICE_NAME): cv.string,
|
||||
vol.Required(ATTR_MANUFACTURER): cv.string,
|
||||
vol.Required(ATTR_MODEL): cv.string,
|
||||
vol.Optional(ATTR_OS_VERSION): cv.string,
|
||||
vol.Required(ATTR_SUPPORTS_ENCRYPTION, default=False): cv.boolean,
|
||||
})
|
||||
|
||||
UPDATE_DEVICE_SCHEMA = vol.Schema({
|
||||
vol.Optional(ATTR_APP_DATA, default={}): dict,
|
||||
vol.Required(ATTR_APP_VERSION): cv.string,
|
||||
vol.Required(ATTR_DEVICE_NAME): cv.string,
|
||||
vol.Required(ATTR_MANUFACTURER): cv.string,
|
||||
vol.Required(ATTR_MODEL): cv.string,
|
||||
vol.Optional(ATTR_OS_VERSION): cv.string,
|
||||
})
|
||||
|
||||
WEBHOOK_PAYLOAD_SCHEMA = vol.Schema({
|
||||
vol.Required(ATTR_WEBHOOK_TYPE): vol.In(WEBHOOK_TYPES),
|
||||
vol.Required(ATTR_WEBHOOK_DATA, default={}): dict,
|
||||
vol.Optional(ATTR_WEBHOOK_ENCRYPTED, default=False): cv.boolean,
|
||||
vol.Optional(ATTR_WEBHOOK_ENCRYPTED_DATA): cv.string,
|
||||
})
|
||||
|
||||
CALL_SERVICE_SCHEMA = vol.Schema({
|
||||
vol.Required(ATTR_DOMAIN): cv.string,
|
||||
vol.Required(ATTR_SERVICE): cv.string,
|
||||
vol.Optional(ATTR_SERVICE_DATA, default={}): dict,
|
||||
})
|
||||
|
||||
FIRE_EVENT_SCHEMA = vol.Schema({
|
||||
vol.Required(ATTR_EVENT_TYPE): cv.string,
|
||||
vol.Optional(ATTR_EVENT_DATA, default={}): dict,
|
||||
})
|
||||
|
||||
RENDER_TEMPLATE_SCHEMA = vol.Schema({
|
||||
vol.Required(ATTR_TEMPLATE): cv.string,
|
||||
vol.Optional(ATTR_TEMPLATE_VARIABLES, default={}): dict,
|
||||
})
|
||||
|
||||
WEBHOOK_SCHEMAS = {
|
||||
WEBHOOK_TYPE_CALL_SERVICE: CALL_SERVICE_SCHEMA,
|
||||
WEBHOOK_TYPE_FIRE_EVENT: FIRE_EVENT_SCHEMA,
|
||||
WEBHOOK_TYPE_RENDER_TEMPLATE: RENDER_TEMPLATE_SCHEMA,
|
||||
WEBHOOK_TYPE_UPDATE_LOCATION: SEE_SCHEMA,
|
||||
WEBHOOK_TYPE_UPDATE_REGISTRATION: UPDATE_DEVICE_SCHEMA,
|
||||
}
|
||||
REQUIREMENTS = ['PyNaCl==1.3.0']
|
||||
|
||||
|
||||
def get_cipher():
|
||||
"""Return decryption function and length of key.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
from nacl.secret import SecretBox
|
||||
from nacl.encoding import Base64Encoder
|
||||
|
||||
def decrypt(ciphertext, key):
|
||||
"""Decrypt ciphertext using key."""
|
||||
return SecretBox(key).decrypt(ciphertext, encoder=Base64Encoder)
|
||||
return (SecretBox.KEY_SIZE, decrypt)
|
||||
|
||||
|
||||
def _decrypt_payload(key, ciphertext):
|
||||
"""Decrypt encrypted payload."""
|
||||
try:
|
||||
keylen, decrypt = get_cipher()
|
||||
except OSError:
|
||||
_LOGGER.warning(
|
||||
"Ignoring encrypted payload because libsodium not installed")
|
||||
return None
|
||||
|
||||
if key is None:
|
||||
_LOGGER.warning(
|
||||
"Ignoring encrypted payload because no decryption key known")
|
||||
return None
|
||||
|
||||
key = key.encode("utf-8")
|
||||
key = key[:keylen]
|
||||
key = key.ljust(keylen, b'\0')
|
||||
|
||||
try:
|
||||
message = decrypt(ciphertext, key)
|
||||
message = json.loads(message.decode("utf-8"))
|
||||
_LOGGER.debug("Successfully decrypted mobile_app payload")
|
||||
return message
|
||||
except ValueError:
|
||||
_LOGGER.warning("Ignoring encrypted payload because unable to decrypt")
|
||||
return None
|
||||
|
||||
|
||||
def context(device):
|
||||
"""Generate a context from a request."""
|
||||
return Context(user_id=device[CONF_USER_ID])
|
||||
|
||||
|
||||
async def handle_webhook(store, hass: HomeAssistantType, webhook_id: str,
|
||||
request):
|
||||
"""Handle webhook callback."""
|
||||
device = hass.data[DOMAIN][webhook_id]
|
||||
|
||||
try:
|
||||
req_data = await request.json()
|
||||
except ValueError:
|
||||
_LOGGER.warning('Received invalid JSON from mobile_app')
|
||||
return json_response([], status=HTTP_BAD_REQUEST)
|
||||
|
||||
try:
|
||||
req_data = WEBHOOK_PAYLOAD_SCHEMA(req_data)
|
||||
except vol.Invalid as ex:
|
||||
err = vol.humanize.humanize_error(req_data, ex)
|
||||
_LOGGER.error('Received invalid webhook payload: %s', err)
|
||||
return Response(status=200)
|
||||
|
||||
webhook_type = req_data[ATTR_WEBHOOK_TYPE]
|
||||
|
||||
webhook_payload = req_data.get(ATTR_WEBHOOK_DATA, {})
|
||||
|
||||
if req_data[ATTR_WEBHOOK_ENCRYPTED]:
|
||||
enc_data = req_data[ATTR_WEBHOOK_ENCRYPTED_DATA]
|
||||
webhook_payload = _decrypt_payload(device[CONF_SECRET], enc_data)
|
||||
|
||||
try:
|
||||
data = WEBHOOK_SCHEMAS[webhook_type](webhook_payload)
|
||||
except vol.Invalid as ex:
|
||||
err = vol.humanize.humanize_error(webhook_payload, ex)
|
||||
_LOGGER.error('Received invalid webhook payload: %s', err)
|
||||
return Response(status=200)
|
||||
|
||||
if webhook_type == WEBHOOK_TYPE_CALL_SERVICE:
|
||||
try:
|
||||
await hass.services.async_call(data[ATTR_DOMAIN],
|
||||
data[ATTR_SERVICE],
|
||||
data[ATTR_SERVICE_DATA],
|
||||
blocking=True,
|
||||
context=context(device))
|
||||
except (vol.Invalid, ServiceNotFound):
|
||||
raise HTTPBadRequest()
|
||||
|
||||
return Response(status=200)
|
||||
|
||||
if webhook_type == WEBHOOK_TYPE_FIRE_EVENT:
|
||||
event_type = data[ATTR_EVENT_TYPE]
|
||||
hass.bus.async_fire(event_type, data[ATTR_EVENT_DATA],
|
||||
ha.EventOrigin.remote, context=context(device))
|
||||
return Response(status=200)
|
||||
|
||||
if webhook_type == WEBHOOK_TYPE_RENDER_TEMPLATE:
|
||||
try:
|
||||
tpl = template.Template(data[ATTR_TEMPLATE], hass)
|
||||
rendered = tpl.async_render(data.get(ATTR_TEMPLATE_VARIABLES))
|
||||
return json_response({"rendered": rendered})
|
||||
except (ValueError, TemplateError) as ex:
|
||||
return json_response(({"error": ex}), status=HTTP_BAD_REQUEST)
|
||||
|
||||
if webhook_type == WEBHOOK_TYPE_UPDATE_LOCATION:
|
||||
await hass.services.async_call(DEVICE_TRACKER_DOMAIN,
|
||||
DEVICE_TRACKER_SEE, data,
|
||||
blocking=True, context=context(device))
|
||||
return Response(status=200)
|
||||
|
||||
if webhook_type == WEBHOOK_TYPE_UPDATE_REGISTRATION:
|
||||
data[ATTR_APP_ID] = device[ATTR_APP_ID]
|
||||
data[ATTR_APP_NAME] = device[ATTR_APP_NAME]
|
||||
data[ATTR_SUPPORTS_ENCRYPTION] = device[ATTR_SUPPORTS_ENCRYPTION]
|
||||
data[CONF_SECRET] = device[CONF_SECRET]
|
||||
data[CONF_USER_ID] = device[CONF_USER_ID]
|
||||
data[CONF_WEBHOOK_ID] = device[CONF_WEBHOOK_ID]
|
||||
|
||||
hass.data[DOMAIN][webhook_id] = data
|
||||
|
||||
try:
|
||||
await store.async_save(hass.data[DOMAIN])
|
||||
except HomeAssistantError as ex:
|
||||
_LOGGER.error("Error updating mobile_app registration: %s", ex)
|
||||
return Response(status=200)
|
||||
|
||||
return json_response(safe_device(data))
|
||||
|
||||
|
||||
def supports_encryption():
|
||||
"""Test if we support encryption."""
|
||||
try:
|
||||
import nacl # noqa pylint: disable=unused-import
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def safe_device(device: dict):
|
||||
"""Return a device without webhook_id or secret."""
|
||||
return {
|
||||
ATTR_APP_DATA: device[ATTR_APP_DATA],
|
||||
ATTR_APP_ID: device[ATTR_APP_ID],
|
||||
ATTR_APP_NAME: device[ATTR_APP_NAME],
|
||||
ATTR_APP_VERSION: device[ATTR_APP_VERSION],
|
||||
ATTR_DEVICE_NAME: device[ATTR_DEVICE_NAME],
|
||||
ATTR_MANUFACTURER: device[ATTR_MANUFACTURER],
|
||||
ATTR_MODEL: device[ATTR_MODEL],
|
||||
ATTR_OS_VERSION: device[ATTR_OS_VERSION],
|
||||
ATTR_SUPPORTS_ENCRYPTION: device[ATTR_SUPPORTS_ENCRYPTION],
|
||||
}
|
||||
|
||||
|
||||
def register_device_webhook(hass: HomeAssistantType, store, device):
|
||||
"""Register the webhook for a device."""
|
||||
device_name = 'Mobile App: {}'.format(device[ATTR_DEVICE_NAME])
|
||||
webhook_id = device[CONF_WEBHOOK_ID]
|
||||
webhook.async_register(hass, DOMAIN, device_name, webhook_id,
|
||||
partial(handle_webhook, store))
|
||||
|
||||
|
||||
async def async_setup(hass, config):
|
||||
async def async_setup(hass: HomeAssistantType, config: ConfigType):
|
||||
"""Set up the mobile app component."""
|
||||
conf = config.get(DOMAIN)
|
||||
|
||||
store = hass.helpers.storage.Store(STORAGE_VERSION, STORAGE_KEY)
|
||||
app_config = await store.async_load()
|
||||
if app_config is None:
|
||||
app_config = {}
|
||||
app_config = {DATA_DELETED_IDS: [], DATA_REGISTRATIONS: {}}
|
||||
|
||||
hass.data[DOMAIN] = app_config
|
||||
if hass.data.get(DOMAIN) is None:
|
||||
hass.data[DOMAIN] = {DATA_DELETED_IDS: [], DATA_REGISTRATIONS: {}}
|
||||
|
||||
for device in app_config.values():
|
||||
register_device_webhook(hass, store, device)
|
||||
hass.data[DOMAIN][DATA_DELETED_IDS] = app_config[DATA_DELETED_IDS]
|
||||
hass.data[DOMAIN][DATA_REGISTRATIONS] = app_config[DATA_REGISTRATIONS]
|
||||
hass.data[DOMAIN][DATA_STORE] = store
|
||||
|
||||
if conf is not None:
|
||||
hass.async_create_task(hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={'source': config_entries.SOURCE_IMPORT}))
|
||||
for registration in app_config[DATA_REGISTRATIONS].values():
|
||||
setup_registration(hass, store, registration)
|
||||
|
||||
hass.http.register_view(DevicesView(store))
|
||||
register_http_handlers(hass, store)
|
||||
register_websocket_handlers(hass)
|
||||
register_deleted_webhooks(hass, store)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
async def async_setup_entry(hass, entry):
|
||||
"""Set up an mobile_app entry."""
|
||||
return True
|
||||
|
||||
|
||||
class DevicesView(HomeAssistantView):
|
||||
"""A view that accepts device registration requests."""
|
||||
|
||||
url = '/api/mobile_app/devices'
|
||||
name = 'api:mobile_app:register-device'
|
||||
|
||||
def __init__(self, store):
|
||||
"""Initialize the view."""
|
||||
self._store = store
|
||||
|
||||
@RequestDataValidator(REGISTER_DEVICE_SCHEMA)
|
||||
async def post(self, request, data):
|
||||
"""Handle the POST request for device registration."""
|
||||
hass = request.app['hass']
|
||||
|
||||
resp = {}
|
||||
|
||||
webhook_id = generate_secret()
|
||||
|
||||
data[CONF_WEBHOOK_ID] = resp[CONF_WEBHOOK_ID] = webhook_id
|
||||
|
||||
if data[ATTR_SUPPORTS_ENCRYPTION] and supports_encryption():
|
||||
secret = generate_secret(16)
|
||||
|
||||
data[CONF_SECRET] = resp[CONF_SECRET] = secret
|
||||
|
||||
data[CONF_USER_ID] = request['hass_user'].id
|
||||
|
||||
hass.data[DOMAIN][webhook_id] = data
|
||||
|
||||
try:
|
||||
await self._store.async_save(hass.data[DOMAIN])
|
||||
except HomeAssistantError:
|
||||
return self.json_message("Error saving device.",
|
||||
HTTP_INTERNAL_SERVER_ERROR)
|
||||
|
||||
register_device_webhook(hass, self._store, data)
|
||||
|
||||
return self.json(resp, status_code=HTTP_CREATED)
|
||||
|
|
104
homeassistant/components/mobile_app/const.py
Normal file
104
homeassistant/components/mobile_app/const.py
Normal file
|
@ -0,0 +1,104 @@
|
|||
"""Constants for mobile_app."""
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.device_tracker import SERVICE_SEE_PAYLOAD_SCHEMA
|
||||
from homeassistant.const import (ATTR_DOMAIN, ATTR_SERVICE, ATTR_SERVICE_DATA)
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
|
||||
DOMAIN = 'mobile_app'
|
||||
|
||||
STORAGE_KEY = DOMAIN
|
||||
STORAGE_VERSION = 1
|
||||
|
||||
CONF_CLOUDHOOK_URL = 'cloudhook_url'
|
||||
CONF_SECRET = 'secret'
|
||||
CONF_USER_ID = 'user_id'
|
||||
|
||||
DATA_DELETED_IDS = 'deleted_ids'
|
||||
DATA_REGISTRATIONS = 'registrations'
|
||||
DATA_STORE = 'store'
|
||||
|
||||
ATTR_APP_COMPONENT = 'app_component'
|
||||
ATTR_APP_DATA = 'app_data'
|
||||
ATTR_APP_ID = 'app_id'
|
||||
ATTR_APP_NAME = 'app_name'
|
||||
ATTR_APP_VERSION = 'app_version'
|
||||
ATTR_DEVICE_NAME = 'device_name'
|
||||
ATTR_MANUFACTURER = 'manufacturer'
|
||||
ATTR_MODEL = 'model'
|
||||
ATTR_OS_VERSION = 'os_version'
|
||||
ATTR_SUPPORTS_ENCRYPTION = 'supports_encryption'
|
||||
|
||||
ATTR_EVENT_DATA = 'event_data'
|
||||
ATTR_EVENT_TYPE = 'event_type'
|
||||
|
||||
ATTR_TEMPLATE = 'template'
|
||||
ATTR_TEMPLATE_VARIABLES = 'variables'
|
||||
|
||||
ATTR_WEBHOOK_DATA = 'data'
|
||||
ATTR_WEBHOOK_ENCRYPTED = 'encrypted'
|
||||
ATTR_WEBHOOK_ENCRYPTED_DATA = 'encrypted_data'
|
||||
ATTR_WEBHOOK_TYPE = 'type'
|
||||
|
||||
WEBHOOK_TYPE_CALL_SERVICE = 'call_service'
|
||||
WEBHOOK_TYPE_FIRE_EVENT = 'fire_event'
|
||||
WEBHOOK_TYPE_RENDER_TEMPLATE = 'render_template'
|
||||
WEBHOOK_TYPE_UPDATE_LOCATION = 'update_location'
|
||||
WEBHOOK_TYPE_UPDATE_REGISTRATION = 'update_registration'
|
||||
|
||||
WEBHOOK_TYPES = [WEBHOOK_TYPE_CALL_SERVICE, WEBHOOK_TYPE_FIRE_EVENT,
|
||||
WEBHOOK_TYPE_RENDER_TEMPLATE, WEBHOOK_TYPE_UPDATE_LOCATION,
|
||||
WEBHOOK_TYPE_UPDATE_REGISTRATION]
|
||||
|
||||
REGISTRATION_SCHEMA = vol.Schema({
|
||||
vol.Optional(ATTR_APP_COMPONENT): cv.string,
|
||||
vol.Optional(ATTR_APP_DATA, default={}): dict,
|
||||
vol.Required(ATTR_APP_ID): cv.string,
|
||||
vol.Optional(ATTR_APP_NAME): cv.string,
|
||||
vol.Required(ATTR_APP_VERSION): cv.string,
|
||||
vol.Required(ATTR_DEVICE_NAME): cv.string,
|
||||
vol.Required(ATTR_MANUFACTURER): cv.string,
|
||||
vol.Required(ATTR_MODEL): cv.string,
|
||||
vol.Optional(ATTR_OS_VERSION): cv.string,
|
||||
vol.Required(ATTR_SUPPORTS_ENCRYPTION, default=False): cv.boolean,
|
||||
})
|
||||
|
||||
UPDATE_REGISTRATION_SCHEMA = vol.Schema({
|
||||
vol.Optional(ATTR_APP_DATA, default={}): dict,
|
||||
vol.Required(ATTR_APP_VERSION): cv.string,
|
||||
vol.Required(ATTR_DEVICE_NAME): cv.string,
|
||||
vol.Required(ATTR_MANUFACTURER): cv.string,
|
||||
vol.Required(ATTR_MODEL): cv.string,
|
||||
vol.Optional(ATTR_OS_VERSION): cv.string,
|
||||
})
|
||||
|
||||
WEBHOOK_PAYLOAD_SCHEMA = vol.Schema({
|
||||
vol.Required(ATTR_WEBHOOK_TYPE): cv.string, # vol.In(WEBHOOK_TYPES)
|
||||
vol.Required(ATTR_WEBHOOK_DATA, default={}): dict,
|
||||
vol.Optional(ATTR_WEBHOOK_ENCRYPTED, default=False): cv.boolean,
|
||||
vol.Optional(ATTR_WEBHOOK_ENCRYPTED_DATA): cv.string,
|
||||
})
|
||||
|
||||
CALL_SERVICE_SCHEMA = vol.Schema({
|
||||
vol.Required(ATTR_DOMAIN): cv.string,
|
||||
vol.Required(ATTR_SERVICE): cv.string,
|
||||
vol.Optional(ATTR_SERVICE_DATA, default={}): dict,
|
||||
})
|
||||
|
||||
FIRE_EVENT_SCHEMA = vol.Schema({
|
||||
vol.Required(ATTR_EVENT_TYPE): cv.string,
|
||||
vol.Optional(ATTR_EVENT_DATA, default={}): dict,
|
||||
})
|
||||
|
||||
RENDER_TEMPLATE_SCHEMA = vol.Schema({
|
||||
vol.Required(ATTR_TEMPLATE): cv.string,
|
||||
vol.Optional(ATTR_TEMPLATE_VARIABLES, default={}): dict,
|
||||
})
|
||||
|
||||
WEBHOOK_SCHEMAS = {
|
||||
WEBHOOK_TYPE_CALL_SERVICE: CALL_SERVICE_SCHEMA,
|
||||
WEBHOOK_TYPE_FIRE_EVENT: FIRE_EVENT_SCHEMA,
|
||||
WEBHOOK_TYPE_RENDER_TEMPLATE: RENDER_TEMPLATE_SCHEMA,
|
||||
WEBHOOK_TYPE_UPDATE_LOCATION: SERVICE_SEE_PAYLOAD_SCHEMA,
|
||||
WEBHOOK_TYPE_UPDATE_REGISTRATION: UPDATE_REGISTRATION_SCHEMA,
|
||||
}
|
103
homeassistant/components/mobile_app/helpers.py
Normal file
103
homeassistant/components/mobile_app/helpers.py
Normal file
|
@ -0,0 +1,103 @@
|
|||
"""Helpers for mobile_app."""
|
||||
import logging
|
||||
import json
|
||||
from typing import Callable, Dict, Tuple
|
||||
|
||||
from aiohttp.web import Response
|
||||
|
||||
from homeassistant.core import Context
|
||||
from homeassistant.helpers.typing import HomeAssistantType
|
||||
|
||||
from .const import (ATTR_APP_DATA, ATTR_APP_ID, ATTR_APP_NAME,
|
||||
ATTR_APP_VERSION, DATA_DELETED_IDS, ATTR_DEVICE_NAME,
|
||||
ATTR_MANUFACTURER, ATTR_MODEL, ATTR_OS_VERSION,
|
||||
DATA_REGISTRATIONS, ATTR_SUPPORTS_ENCRYPTION,
|
||||
CONF_USER_ID, DOMAIN)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_cipher() -> Tuple[int, Callable]:
|
||||
"""Return decryption function and length of key.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
from nacl.secret import SecretBox
|
||||
from nacl.encoding import Base64Encoder
|
||||
|
||||
def decrypt(ciphertext, key):
|
||||
"""Decrypt ciphertext using key."""
|
||||
return SecretBox(key).decrypt(ciphertext, encoder=Base64Encoder)
|
||||
return (SecretBox.KEY_SIZE, decrypt)
|
||||
|
||||
|
||||
def _decrypt_payload(key: str, ciphertext: str) -> Dict[str, str]:
|
||||
"""Decrypt encrypted payload."""
|
||||
try:
|
||||
keylen, decrypt = get_cipher()
|
||||
except OSError:
|
||||
_LOGGER.warning(
|
||||
"Ignoring encrypted payload because libsodium not installed")
|
||||
return None
|
||||
|
||||
if key is None:
|
||||
_LOGGER.warning(
|
||||
"Ignoring encrypted payload because no decryption key known")
|
||||
return None
|
||||
|
||||
key = key.encode("utf-8")
|
||||
key = key[:keylen]
|
||||
key = key.ljust(keylen, b'\0')
|
||||
|
||||
try:
|
||||
message = decrypt(ciphertext, key)
|
||||
message = json.loads(message.decode("utf-8"))
|
||||
_LOGGER.debug("Successfully decrypted mobile_app payload")
|
||||
return message
|
||||
except ValueError:
|
||||
_LOGGER.warning("Ignoring encrypted payload because unable to decrypt")
|
||||
return None
|
||||
|
||||
|
||||
def registration_context(registration: Dict) -> Context:
|
||||
"""Generate a context from a request."""
|
||||
return Context(user_id=registration[CONF_USER_ID])
|
||||
|
||||
|
||||
def empty_okay_response(headers: Dict = None, status: int = 200) -> Response:
|
||||
"""Return a Response with empty JSON object and a 200."""
|
||||
return Response(body='{}', status=status, content_type='application/json',
|
||||
headers=headers)
|
||||
|
||||
|
||||
def supports_encryption() -> bool:
|
||||
"""Test if we support encryption."""
|
||||
try:
|
||||
import nacl # noqa pylint: disable=unused-import
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def safe_registration(registration: Dict) -> Dict:
|
||||
"""Return a registration without sensitive values."""
|
||||
# Sensitive values: webhook_id, secret, cloudhook_url
|
||||
return {
|
||||
ATTR_APP_DATA: registration[ATTR_APP_DATA],
|
||||
ATTR_APP_ID: registration[ATTR_APP_ID],
|
||||
ATTR_APP_NAME: registration[ATTR_APP_NAME],
|
||||
ATTR_APP_VERSION: registration[ATTR_APP_VERSION],
|
||||
ATTR_DEVICE_NAME: registration[ATTR_DEVICE_NAME],
|
||||
ATTR_MANUFACTURER: registration[ATTR_MANUFACTURER],
|
||||
ATTR_MODEL: registration[ATTR_MODEL],
|
||||
ATTR_OS_VERSION: registration[ATTR_OS_VERSION],
|
||||
ATTR_SUPPORTS_ENCRYPTION: registration[ATTR_SUPPORTS_ENCRYPTION],
|
||||
}
|
||||
|
||||
|
||||
def savable_state(hass: HomeAssistantType) -> Dict:
|
||||
"""Return a clean object containing things that should be saved."""
|
||||
return {
|
||||
DATA_DELETED_IDS: hass.data[DOMAIN][DATA_DELETED_IDS],
|
||||
DATA_REGISTRATIONS: hass.data[DOMAIN][DATA_REGISTRATIONS]
|
||||
}
|
78
homeassistant/components/mobile_app/http_api.py
Normal file
78
homeassistant/components/mobile_app/http_api.py
Normal file
|
@ -0,0 +1,78 @@
|
|||
"""Provides an HTTP API for mobile_app."""
|
||||
from typing import Dict
|
||||
|
||||
from aiohttp.web import Response, Request
|
||||
|
||||
from homeassistant.auth.util import generate_secret
|
||||
from homeassistant.components.cloud import async_create_cloudhook
|
||||
from homeassistant.components.http import HomeAssistantView
|
||||
from homeassistant.components.http.data_validator import RequestDataValidator
|
||||
from homeassistant.const import (HTTP_CREATED, HTTP_INTERNAL_SERVER_ERROR,
|
||||
CONF_WEBHOOK_ID)
|
||||
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers.storage import Store
|
||||
from homeassistant.helpers.typing import HomeAssistantType
|
||||
|
||||
from .const import (DATA_REGISTRATIONS, ATTR_SUPPORTS_ENCRYPTION,
|
||||
CONF_CLOUDHOOK_URL, CONF_SECRET, CONF_USER_ID,
|
||||
DOMAIN, REGISTRATION_SCHEMA)
|
||||
|
||||
from .helpers import supports_encryption, savable_state
|
||||
|
||||
from .webhook import setup_registration
|
||||
|
||||
|
||||
def register_http_handlers(hass: HomeAssistantType, store: Store) -> bool:
|
||||
"""Register the HTTP handlers/views."""
|
||||
hass.http.register_view(RegistrationsView(store))
|
||||
return True
|
||||
|
||||
|
||||
class RegistrationsView(HomeAssistantView):
|
||||
"""A view that accepts registration requests."""
|
||||
|
||||
url = '/api/mobile_app/registrations'
|
||||
name = 'api:mobile_app:register'
|
||||
|
||||
def __init__(self, store: Store) -> None:
|
||||
"""Initialize the view."""
|
||||
self._store = store
|
||||
|
||||
@RequestDataValidator(REGISTRATION_SCHEMA)
|
||||
async def post(self, request: Request, data: Dict) -> Response:
|
||||
"""Handle the POST request for registration."""
|
||||
hass = request.app['hass']
|
||||
|
||||
webhook_id = generate_secret()
|
||||
|
||||
if "cloud" in hass.config.components:
|
||||
cloudhook = await async_create_cloudhook(hass, webhook_id)
|
||||
|
||||
if cloudhook is not None:
|
||||
data[CONF_CLOUDHOOK_URL] = cloudhook[CONF_CLOUDHOOK_URL]
|
||||
|
||||
data[CONF_WEBHOOK_ID] = webhook_id
|
||||
|
||||
if data[ATTR_SUPPORTS_ENCRYPTION] and supports_encryption():
|
||||
secret = generate_secret(16)
|
||||
|
||||
data[CONF_SECRET] = secret
|
||||
|
||||
data[CONF_USER_ID] = request['hass_user'].id
|
||||
|
||||
hass.data[DOMAIN][DATA_REGISTRATIONS][webhook_id] = data
|
||||
|
||||
try:
|
||||
await self._store.async_save(savable_state(hass))
|
||||
except HomeAssistantError:
|
||||
return self.json_message("Error saving registration.",
|
||||
HTTP_INTERNAL_SERVER_ERROR)
|
||||
|
||||
setup_registration(hass, self._store, data)
|
||||
|
||||
return self.json({
|
||||
CONF_CLOUDHOOK_URL: data.get(CONF_CLOUDHOOK_URL),
|
||||
CONF_SECRET: data.get(CONF_SECRET),
|
||||
CONF_WEBHOOK_ID: data[CONF_WEBHOOK_ID],
|
||||
}, status_code=HTTP_CREATED)
|
162
homeassistant/components/mobile_app/webhook.py
Normal file
162
homeassistant/components/mobile_app/webhook.py
Normal file
|
@ -0,0 +1,162 @@
|
|||
"""Webhook handlers for mobile_app."""
|
||||
from functools import partial
|
||||
import logging
|
||||
from typing import Dict
|
||||
|
||||
from aiohttp.web import HTTPBadRequest, json_response, Response, Request
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.device_tracker import (DOMAIN as DT_DOMAIN,
|
||||
SERVICE_SEE as DT_SEE)
|
||||
from homeassistant.components.webhook import async_register as webhook_register
|
||||
|
||||
from homeassistant.const import (ATTR_DOMAIN, ATTR_SERVICE, ATTR_SERVICE_DATA,
|
||||
CONF_WEBHOOK_ID, HTTP_BAD_REQUEST)
|
||||
from homeassistant.core import EventOrigin
|
||||
from homeassistant.exceptions import (HomeAssistantError, ServiceNotFound,
|
||||
TemplateError)
|
||||
from homeassistant.helpers import template
|
||||
from homeassistant.helpers.discovery import load_platform
|
||||
from homeassistant.helpers.storage import Store
|
||||
from homeassistant.helpers.typing import HomeAssistantType
|
||||
|
||||
from .const import (ATTR_APP_COMPONENT, DATA_DELETED_IDS,
|
||||
ATTR_DEVICE_NAME, ATTR_EVENT_DATA, ATTR_EVENT_TYPE,
|
||||
DATA_REGISTRATIONS, ATTR_TEMPLATE, ATTR_TEMPLATE_VARIABLES,
|
||||
ATTR_WEBHOOK_DATA, ATTR_WEBHOOK_ENCRYPTED,
|
||||
ATTR_WEBHOOK_ENCRYPTED_DATA, ATTR_WEBHOOK_TYPE,
|
||||
CONF_SECRET, DOMAIN, WEBHOOK_PAYLOAD_SCHEMA,
|
||||
WEBHOOK_SCHEMAS, WEBHOOK_TYPE_CALL_SERVICE,
|
||||
WEBHOOK_TYPE_FIRE_EVENT, WEBHOOK_TYPE_RENDER_TEMPLATE,
|
||||
WEBHOOK_TYPE_UPDATE_LOCATION,
|
||||
WEBHOOK_TYPE_UPDATE_REGISTRATION)
|
||||
|
||||
from .helpers import (_decrypt_payload, empty_okay_response,
|
||||
registration_context, safe_registration, savable_state)
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def register_deleted_webhooks(hass: HomeAssistantType, store: Store):
|
||||
"""Register previously deleted webhook IDs so we can return 410."""
|
||||
for deleted_id in hass.data[DOMAIN][DATA_DELETED_IDS]:
|
||||
try:
|
||||
webhook_register(hass, DOMAIN, "Deleted Webhook", deleted_id,
|
||||
partial(handle_webhook, store))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def setup_registration(hass: HomeAssistantType, store: Store,
|
||||
registration: Dict) -> None:
|
||||
"""Register the webhook for a registration and loads the app component."""
|
||||
registration_name = 'Mobile App: {}'.format(registration[ATTR_DEVICE_NAME])
|
||||
webhook_id = registration[CONF_WEBHOOK_ID]
|
||||
webhook_register(hass, DOMAIN, registration_name, webhook_id,
|
||||
partial(handle_webhook, store))
|
||||
|
||||
if ATTR_APP_COMPONENT in registration:
|
||||
load_platform(hass, registration[ATTR_APP_COMPONENT], DOMAIN, {},
|
||||
{DOMAIN: {}})
|
||||
|
||||
|
||||
async def handle_webhook(store: Store, hass: HomeAssistantType,
|
||||
webhook_id: str, request: Request) -> Response:
|
||||
"""Handle webhook callback."""
|
||||
if webhook_id in hass.data[DOMAIN][DATA_DELETED_IDS]:
|
||||
return Response(status=410)
|
||||
|
||||
headers = {}
|
||||
|
||||
registration = hass.data[DOMAIN][DATA_REGISTRATIONS][webhook_id]
|
||||
|
||||
try:
|
||||
req_data = await request.json()
|
||||
except ValueError:
|
||||
_LOGGER.warning('Received invalid JSON from mobile_app')
|
||||
return empty_okay_response(status=HTTP_BAD_REQUEST)
|
||||
|
||||
try:
|
||||
req_data = WEBHOOK_PAYLOAD_SCHEMA(req_data)
|
||||
except vol.Invalid as ex:
|
||||
err = vol.humanize.humanize_error(req_data, ex)
|
||||
_LOGGER.error('Received invalid webhook payload: %s', err)
|
||||
return empty_okay_response()
|
||||
|
||||
webhook_type = req_data[ATTR_WEBHOOK_TYPE]
|
||||
|
||||
webhook_payload = req_data.get(ATTR_WEBHOOK_DATA, {})
|
||||
|
||||
if req_data[ATTR_WEBHOOK_ENCRYPTED]:
|
||||
enc_data = req_data[ATTR_WEBHOOK_ENCRYPTED_DATA]
|
||||
webhook_payload = _decrypt_payload(registration[CONF_SECRET], enc_data)
|
||||
|
||||
try:
|
||||
data = WEBHOOK_SCHEMAS[webhook_type](webhook_payload)
|
||||
except vol.Invalid as ex:
|
||||
err = vol.humanize.humanize_error(webhook_payload, ex)
|
||||
_LOGGER.error('Received invalid webhook payload: %s', err)
|
||||
return empty_okay_response(headers=headers)
|
||||
|
||||
context = registration_context(registration)
|
||||
|
||||
if webhook_type == WEBHOOK_TYPE_CALL_SERVICE:
|
||||
try:
|
||||
await hass.services.async_call(data[ATTR_DOMAIN],
|
||||
data[ATTR_SERVICE],
|
||||
data[ATTR_SERVICE_DATA],
|
||||
blocking=True, context=context)
|
||||
# noqa: E722 pylint: disable=broad-except
|
||||
except (vol.Invalid, ServiceNotFound, Exception) as ex:
|
||||
_LOGGER.error("Error when calling service during mobile_app "
|
||||
"webhook (device name: %s): %s",
|
||||
registration[ATTR_DEVICE_NAME], ex)
|
||||
raise HTTPBadRequest()
|
||||
|
||||
return empty_okay_response(headers=headers)
|
||||
|
||||
if webhook_type == WEBHOOK_TYPE_FIRE_EVENT:
|
||||
event_type = data[ATTR_EVENT_TYPE]
|
||||
hass.bus.async_fire(event_type, data[ATTR_EVENT_DATA],
|
||||
EventOrigin.remote,
|
||||
context=context)
|
||||
return empty_okay_response(headers=headers)
|
||||
|
||||
if webhook_type == WEBHOOK_TYPE_RENDER_TEMPLATE:
|
||||
try:
|
||||
tpl = template.Template(data[ATTR_TEMPLATE], hass)
|
||||
rendered = tpl.async_render(data.get(ATTR_TEMPLATE_VARIABLES))
|
||||
return json_response({"rendered": rendered}, headers=headers)
|
||||
# noqa: E722 pylint: disable=broad-except
|
||||
except (ValueError, TemplateError, Exception) as ex:
|
||||
_LOGGER.error("Error when rendering template during mobile_app "
|
||||
"webhook (device name: %s): %s",
|
||||
registration[ATTR_DEVICE_NAME], ex)
|
||||
return json_response(({"error": str(ex)}), status=HTTP_BAD_REQUEST,
|
||||
headers=headers)
|
||||
|
||||
if webhook_type == WEBHOOK_TYPE_UPDATE_LOCATION:
|
||||
try:
|
||||
await hass.services.async_call(DT_DOMAIN,
|
||||
DT_SEE, data,
|
||||
blocking=True, context=context)
|
||||
# noqa: E722 pylint: disable=broad-except
|
||||
except (vol.Invalid, ServiceNotFound, Exception) as ex:
|
||||
_LOGGER.error("Error when updating location during mobile_app "
|
||||
"webhook (device name: %s): %s",
|
||||
registration[ATTR_DEVICE_NAME], ex)
|
||||
return empty_okay_response(headers=headers)
|
||||
|
||||
if webhook_type == WEBHOOK_TYPE_UPDATE_REGISTRATION:
|
||||
new_registration = {**registration, **data}
|
||||
|
||||
hass.data[DOMAIN][DATA_REGISTRATIONS][webhook_id] = new_registration
|
||||
|
||||
try:
|
||||
await store.async_save(savable_state(hass))
|
||||
except HomeAssistantError as ex:
|
||||
_LOGGER.error("Error updating mobile_app registration: %s", ex)
|
||||
return empty_okay_response()
|
||||
|
||||
return json_response(safe_registration(new_registration))
|
143
homeassistant/components/mobile_app/websocket_api.py
Normal file
143
homeassistant/components/mobile_app/websocket_api.py
Normal file
|
@ -0,0 +1,143 @@
|
|||
"""Websocket API for mobile_app."""
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.cloud import async_delete_cloudhook
|
||||
from homeassistant.components.websocket_api import (ActiveConnection,
|
||||
async_register_command,
|
||||
async_response,
|
||||
error_message,
|
||||
result_message,
|
||||
websocket_command,
|
||||
ws_require_user)
|
||||
from homeassistant.components.websocket_api.const import (ERR_INVALID_FORMAT,
|
||||
ERR_NOT_FOUND,
|
||||
ERR_UNAUTHORIZED)
|
||||
from homeassistant.const import CONF_WEBHOOK_ID
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.typing import HomeAssistantType
|
||||
|
||||
from .const import (CONF_CLOUDHOOK_URL, CONF_USER_ID, DATA_DELETED_IDS,
|
||||
DATA_REGISTRATIONS, DATA_STORE, DOMAIN)
|
||||
|
||||
from .helpers import safe_registration, savable_state
|
||||
|
||||
|
||||
def register_websocket_handlers(hass: HomeAssistantType) -> bool:
|
||||
"""Register the websocket handlers."""
|
||||
async_register_command(hass, websocket_get_registration)
|
||||
|
||||
async_register_command(hass, websocket_get_user_registrations)
|
||||
|
||||
async_register_command(hass, websocket_delete_registration)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@ws_require_user()
|
||||
@async_response
|
||||
@websocket_command({
|
||||
vol.Required('type'): 'mobile_app/get_registration',
|
||||
vol.Required(CONF_WEBHOOK_ID): cv.string,
|
||||
})
|
||||
async def websocket_get_registration(
|
||||
hass: HomeAssistantType, connection: ActiveConnection,
|
||||
msg: dict) -> None:
|
||||
"""Return the registration for the given webhook_id."""
|
||||
user = connection.user
|
||||
|
||||
webhook_id = msg.get(CONF_WEBHOOK_ID)
|
||||
if webhook_id is None:
|
||||
connection.send_error(msg['id'], ERR_INVALID_FORMAT,
|
||||
"Webhook ID not provided")
|
||||
return
|
||||
|
||||
registration = hass.data[DOMAIN][DATA_REGISTRATIONS].get(webhook_id)
|
||||
|
||||
if registration is None:
|
||||
connection.send_error(msg['id'], ERR_NOT_FOUND,
|
||||
"Webhook ID not found in storage")
|
||||
return
|
||||
|
||||
if registration[CONF_USER_ID] != user.id and not user.is_admin:
|
||||
return error_message(
|
||||
msg['id'], ERR_UNAUTHORIZED, 'User is not registration owner')
|
||||
|
||||
connection.send_message(
|
||||
result_message(msg['id'], safe_registration(registration)))
|
||||
|
||||
|
||||
@ws_require_user()
|
||||
@async_response
|
||||
@websocket_command({
|
||||
vol.Required('type'): 'mobile_app/get_user_registrations',
|
||||
vol.Optional(CONF_USER_ID): cv.string,
|
||||
})
|
||||
async def websocket_get_user_registrations(
|
||||
hass: HomeAssistantType, connection: ActiveConnection,
|
||||
msg: dict) -> None:
|
||||
"""Return all registrations or just registrations for given user ID."""
|
||||
user_id = msg.get(CONF_USER_ID, connection.user.id)
|
||||
|
||||
if user_id != connection.user.id and not connection.user.is_admin:
|
||||
# If user ID is provided and is not current user ID and current user
|
||||
# isn't an admin user
|
||||
connection.send_error(msg['id'], ERR_UNAUTHORIZED, "Unauthorized")
|
||||
return
|
||||
|
||||
user_registrations = []
|
||||
|
||||
for registration in hass.data[DOMAIN][DATA_REGISTRATIONS].values():
|
||||
if connection.user.is_admin or registration[CONF_USER_ID] is user_id:
|
||||
user_registrations.append(safe_registration(registration))
|
||||
|
||||
connection.send_message(
|
||||
result_message(msg['id'], user_registrations))
|
||||
|
||||
|
||||
@ws_require_user()
|
||||
@async_response
|
||||
@websocket_command({
|
||||
vol.Required('type'): 'mobile_app/delete_registration',
|
||||
vol.Required(CONF_WEBHOOK_ID): cv.string,
|
||||
})
|
||||
async def websocket_delete_registration(hass: HomeAssistantType,
|
||||
connection: ActiveConnection,
|
||||
msg: dict) -> None:
|
||||
"""Delete the registration for the given webhook_id."""
|
||||
user = connection.user
|
||||
|
||||
webhook_id = msg.get(CONF_WEBHOOK_ID)
|
||||
if webhook_id is None:
|
||||
connection.send_error(msg['id'], ERR_INVALID_FORMAT,
|
||||
"Webhook ID not provided")
|
||||
return
|
||||
|
||||
registration = hass.data[DOMAIN][DATA_REGISTRATIONS].get(webhook_id)
|
||||
|
||||
if registration is None:
|
||||
connection.send_error(msg['id'], ERR_NOT_FOUND,
|
||||
"Webhook ID not found in storage")
|
||||
return
|
||||
|
||||
if registration[CONF_USER_ID] != user.id and not user.is_admin:
|
||||
return error_message(
|
||||
msg['id'], ERR_UNAUTHORIZED, 'User is not registration owner')
|
||||
|
||||
del hass.data[DOMAIN][DATA_REGISTRATIONS][webhook_id]
|
||||
|
||||
hass.data[DOMAIN][DATA_DELETED_IDS].append(webhook_id)
|
||||
|
||||
store = hass.data[DOMAIN][DATA_STORE]
|
||||
|
||||
try:
|
||||
await store.async_save(savable_state(hass))
|
||||
except HomeAssistantError:
|
||||
return error_message(
|
||||
msg['id'], 'internal_error', 'Error deleting registration')
|
||||
|
||||
if (CONF_CLOUDHOOK_URL in registration and
|
||||
"cloud" in hass.config.components):
|
||||
await async_delete_cloudhook(hass, webhook_id)
|
||||
|
||||
connection.send_message(result_message(msg['id'], 'ok'))
|
|
@ -1 +1,53 @@
|
|||
"""Tests for mobile_app component."""
|
||||
# pylint: disable=redefined-outer-name,unused-import
|
||||
import pytest
|
||||
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from homeassistant.components.mobile_app.const import (DATA_DELETED_IDS,
|
||||
DATA_REGISTRATIONS,
|
||||
CONF_SECRET,
|
||||
CONF_USER_ID, DOMAIN,
|
||||
STORAGE_KEY,
|
||||
STORAGE_VERSION)
|
||||
from homeassistant.const import CONF_WEBHOOK_ID
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def webhook_client(hass, aiohttp_client, hass_storage, hass_admin_user):
|
||||
"""mobile_app mock client."""
|
||||
hass_storage[STORAGE_KEY] = {
|
||||
'version': STORAGE_VERSION,
|
||||
'data': {
|
||||
DATA_REGISTRATIONS: {
|
||||
'mobile_app_test': {
|
||||
CONF_SECRET: '58eb127991594dad934d1584bdee5f27',
|
||||
'supports_encryption': True,
|
||||
CONF_WEBHOOK_ID: 'mobile_app_test',
|
||||
'device_name': 'Test Device',
|
||||
CONF_USER_ID: hass_admin_user.id,
|
||||
}
|
||||
},
|
||||
DATA_DELETED_IDS: [],
|
||||
}
|
||||
}
|
||||
|
||||
assert hass.loop.run_until_complete(async_setup_component(
|
||||
hass, DOMAIN, {
|
||||
DOMAIN: {}
|
||||
}))
|
||||
|
||||
return hass.loop.run_until_complete(aiohttp_client(hass.http.app))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def authed_api_client(hass, hass_client):
|
||||
"""Provide an authenticated client for mobile_app to use."""
|
||||
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
|
||||
return await hass_client()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
async def setup_ws(hass):
|
||||
"""Configure the websocket_api component."""
|
||||
assert await async_setup_component(hass, 'websocket_api', {})
|
||||
|
|
49
tests/components/mobile_app/const.py
Normal file
49
tests/components/mobile_app/const.py
Normal file
|
@ -0,0 +1,49 @@
|
|||
"""Constants for mobile_app tests."""
|
||||
CALL_SERVICE = {
|
||||
'type': 'call_service',
|
||||
'data': {
|
||||
'domain': 'test',
|
||||
'service': 'mobile_app',
|
||||
'service_data': {
|
||||
'foo': 'bar'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FIRE_EVENT = {
|
||||
'type': 'fire_event',
|
||||
'data': {
|
||||
'event_type': 'test_event',
|
||||
'event_data': {
|
||||
'hello': 'yo world'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
REGISTER = {
|
||||
'app_data': {'foo': 'bar'},
|
||||
'app_id': 'io.homeassistant.mobile_app_test',
|
||||
'app_name': 'Mobile App Tests',
|
||||
'app_version': '1.0.0',
|
||||
'device_name': 'Test 1',
|
||||
'manufacturer': 'mobile_app',
|
||||
'model': 'Test',
|
||||
'os_version': '1.0',
|
||||
'supports_encryption': True
|
||||
}
|
||||
|
||||
RENDER_TEMPLATE = {
|
||||
'type': 'render_template',
|
||||
'data': {
|
||||
'template': 'Hello world'
|
||||
}
|
||||
}
|
||||
|
||||
UPDATE = {
|
||||
'app_data': {'foo': 'bar'},
|
||||
'app_version': '2.0.0',
|
||||
'device_name': 'Test 1',
|
||||
'manufacturer': 'mobile_app',
|
||||
'model': 'Test',
|
||||
'os_version': '1.0'
|
||||
}
|
59
tests/components/mobile_app/test_http_api.py
Normal file
59
tests/components/mobile_app/test_http_api.py
Normal file
|
@ -0,0 +1,59 @@
|
|||
"""Tests for the mobile_app HTTP API."""
|
||||
# pylint: disable=redefined-outer-name,unused-import
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.mobile_app.const import CONF_SECRET
|
||||
from homeassistant.const import CONF_WEBHOOK_ID
|
||||
|
||||
from .const import REGISTER
|
||||
from . import authed_api_client # noqa: F401
|
||||
|
||||
|
||||
async def test_registration(hass_client, authed_api_client): # noqa: F811
|
||||
"""Test that registrations happen."""
|
||||
try:
|
||||
# pylint: disable=unused-import
|
||||
from nacl.secret import SecretBox # noqa: F401
|
||||
from nacl.encoding import Base64Encoder # noqa: F401
|
||||
except (ImportError, OSError):
|
||||
pytest.skip("libnacl/libsodium is not installed")
|
||||
return
|
||||
|
||||
import json
|
||||
|
||||
resp = await authed_api_client.post(
|
||||
'/api/mobile_app/registrations', json=REGISTER
|
||||
)
|
||||
|
||||
assert resp.status == 201
|
||||
register_json = await resp.json()
|
||||
assert CONF_WEBHOOK_ID in register_json
|
||||
assert CONF_SECRET in register_json
|
||||
|
||||
keylen = SecretBox.KEY_SIZE
|
||||
key = register_json[CONF_SECRET].encode("utf-8")
|
||||
key = key[:keylen]
|
||||
key = key.ljust(keylen, b'\0')
|
||||
|
||||
payload = json.dumps({'template': 'Hello world'}).encode("utf-8")
|
||||
|
||||
data = SecretBox(key).encrypt(payload,
|
||||
encoder=Base64Encoder).decode("utf-8")
|
||||
|
||||
container = {
|
||||
'type': 'render_template',
|
||||
'encrypted': True,
|
||||
'encrypted_data': data,
|
||||
}
|
||||
|
||||
webhook_client = await hass_client()
|
||||
|
||||
resp = await webhook_client.post(
|
||||
'/api/webhook/{}'.format(register_json[CONF_WEBHOOK_ID]),
|
||||
json=container
|
||||
)
|
||||
|
||||
assert resp.status == 200
|
||||
|
||||
webhook_json = await resp.json()
|
||||
assert webhook_json == {'rendered': 'Hello world'}
|
|
@ -1,275 +0,0 @@
|
|||
"""Test the mobile_app_http platform."""
|
||||
import pytest
|
||||
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from homeassistant.const import CONF_WEBHOOK_ID
|
||||
from homeassistant.components.mobile_app import (DOMAIN, STORAGE_KEY,
|
||||
STORAGE_VERSION,
|
||||
CONF_SECRET, CONF_USER_ID)
|
||||
from homeassistant.core import callback
|
||||
|
||||
from tests.common import async_mock_service
|
||||
|
||||
FIRE_EVENT = {
|
||||
'type': 'fire_event',
|
||||
'data': {
|
||||
'event_type': 'test_event',
|
||||
'event_data': {
|
||||
'hello': 'yo world'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RENDER_TEMPLATE = {
|
||||
'type': 'render_template',
|
||||
'data': {
|
||||
'template': 'Hello world'
|
||||
}
|
||||
}
|
||||
|
||||
CALL_SERVICE = {
|
||||
'type': 'call_service',
|
||||
'data': {
|
||||
'domain': 'test',
|
||||
'service': 'mobile_app',
|
||||
'service_data': {
|
||||
'foo': 'bar'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
REGISTER = {
|
||||
'app_data': {'foo': 'bar'},
|
||||
'app_id': 'io.homeassistant.mobile_app_test',
|
||||
'app_name': 'Mobile App Tests',
|
||||
'app_version': '1.0.0',
|
||||
'device_name': 'Test 1',
|
||||
'manufacturer': 'mobile_app',
|
||||
'model': 'Test',
|
||||
'os_version': '1.0',
|
||||
'supports_encryption': True
|
||||
}
|
||||
|
||||
UPDATE = {
|
||||
'app_data': {'foo': 'bar'},
|
||||
'app_version': '2.0.0',
|
||||
'device_name': 'Test 1',
|
||||
'manufacturer': 'mobile_app',
|
||||
'model': 'Test',
|
||||
'os_version': '1.0'
|
||||
}
|
||||
|
||||
# pylint: disable=redefined-outer-name
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mobile_app_client(hass, aiohttp_client, hass_storage, hass_admin_user):
|
||||
"""mobile_app mock client."""
|
||||
hass_storage[STORAGE_KEY] = {
|
||||
'version': STORAGE_VERSION,
|
||||
'data': {
|
||||
'mobile_app_test': {
|
||||
CONF_SECRET: '58eb127991594dad934d1584bdee5f27',
|
||||
'supports_encryption': True,
|
||||
CONF_WEBHOOK_ID: 'mobile_app_test',
|
||||
'device_name': 'Test Device',
|
||||
CONF_USER_ID: hass_admin_user.id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert hass.loop.run_until_complete(async_setup_component(
|
||||
hass, DOMAIN, {
|
||||
DOMAIN: {}
|
||||
}))
|
||||
|
||||
return hass.loop.run_until_complete(aiohttp_client(hass.http.app))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def mock_api_client(hass, hass_client):
|
||||
"""Provide an authenticated client for mobile_app to use."""
|
||||
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
|
||||
return await hass_client()
|
||||
|
||||
|
||||
async def test_handle_render_template(mobile_app_client):
|
||||
"""Test that we render templates properly."""
|
||||
resp = await mobile_app_client.post(
|
||||
'/api/webhook/mobile_app_test',
|
||||
json=RENDER_TEMPLATE
|
||||
)
|
||||
|
||||
assert resp.status == 200
|
||||
|
||||
json = await resp.json()
|
||||
assert json == {'rendered': 'Hello world'}
|
||||
|
||||
|
||||
async def test_handle_call_services(hass, mobile_app_client):
|
||||
"""Test that we call services properly."""
|
||||
calls = async_mock_service(hass, 'test', 'mobile_app')
|
||||
|
||||
resp = await mobile_app_client.post(
|
||||
'/api/webhook/mobile_app_test',
|
||||
json=CALL_SERVICE
|
||||
)
|
||||
|
||||
assert resp.status == 200
|
||||
|
||||
assert len(calls) == 1
|
||||
|
||||
|
||||
async def test_handle_fire_event(hass, mobile_app_client):
|
||||
"""Test that we can fire events."""
|
||||
events = []
|
||||
|
||||
@callback
|
||||
def store_event(event):
|
||||
"""Helepr to store events."""
|
||||
events.append(event)
|
||||
|
||||
hass.bus.async_listen('test_event', store_event)
|
||||
|
||||
resp = await mobile_app_client.post(
|
||||
'/api/webhook/mobile_app_test',
|
||||
json=FIRE_EVENT
|
||||
)
|
||||
|
||||
assert resp.status == 200
|
||||
text = await resp.text()
|
||||
assert text == ""
|
||||
|
||||
assert len(events) == 1
|
||||
assert events[0].data['hello'] == 'yo world'
|
||||
|
||||
|
||||
async def test_update_registration(mobile_app_client, hass_client):
|
||||
"""Test that a we can update an existing registration via webhook."""
|
||||
mock_api_client = await hass_client()
|
||||
register_resp = await mock_api_client.post(
|
||||
'/api/mobile_app/devices', json=REGISTER
|
||||
)
|
||||
|
||||
assert register_resp.status == 201
|
||||
register_json = await register_resp.json()
|
||||
|
||||
webhook_id = register_json[CONF_WEBHOOK_ID]
|
||||
|
||||
update_container = {
|
||||
'type': 'update_registration',
|
||||
'data': UPDATE
|
||||
}
|
||||
|
||||
update_resp = await mobile_app_client.post(
|
||||
'/api/webhook/{}'.format(webhook_id), json=update_container
|
||||
)
|
||||
|
||||
assert update_resp.status == 200
|
||||
update_json = await update_resp.json()
|
||||
assert update_json['app_version'] == '2.0.0'
|
||||
assert CONF_WEBHOOK_ID not in update_json
|
||||
assert CONF_SECRET not in update_json
|
||||
|
||||
|
||||
async def test_returns_error_incorrect_json(mobile_app_client, caplog):
|
||||
"""Test that an error is returned when JSON is invalid."""
|
||||
resp = await mobile_app_client.post(
|
||||
'/api/webhook/mobile_app_test',
|
||||
data='not json'
|
||||
)
|
||||
|
||||
assert resp.status == 400
|
||||
json = await resp.json()
|
||||
assert json == []
|
||||
assert 'invalid JSON' in caplog.text
|
||||
|
||||
|
||||
async def test_handle_decryption(mobile_app_client):
|
||||
"""Test that we can encrypt/decrypt properly."""
|
||||
try:
|
||||
# pylint: disable=unused-import
|
||||
from nacl.secret import SecretBox # noqa: F401
|
||||
from nacl.encoding import Base64Encoder # noqa: F401
|
||||
except (ImportError, OSError):
|
||||
pytest.skip("libnacl/libsodium is not installed")
|
||||
return
|
||||
|
||||
import json
|
||||
|
||||
keylen = SecretBox.KEY_SIZE
|
||||
key = "58eb127991594dad934d1584bdee5f27".encode("utf-8")
|
||||
key = key[:keylen]
|
||||
key = key.ljust(keylen, b'\0')
|
||||
|
||||
payload = json.dumps({'template': 'Hello world'}).encode("utf-8")
|
||||
|
||||
data = SecretBox(key).encrypt(payload,
|
||||
encoder=Base64Encoder).decode("utf-8")
|
||||
|
||||
container = {
|
||||
'type': 'render_template',
|
||||
'encrypted': True,
|
||||
'encrypted_data': data,
|
||||
}
|
||||
|
||||
resp = await mobile_app_client.post(
|
||||
'/api/webhook/mobile_app_test',
|
||||
json=container
|
||||
)
|
||||
|
||||
assert resp.status == 200
|
||||
|
||||
json = await resp.json()
|
||||
assert json == {'rendered': 'Hello world'}
|
||||
|
||||
|
||||
async def test_register_device(hass_client, mock_api_client):
|
||||
"""Test that a device can be registered."""
|
||||
try:
|
||||
# pylint: disable=unused-import
|
||||
from nacl.secret import SecretBox # noqa: F401
|
||||
from nacl.encoding import Base64Encoder # noqa: F401
|
||||
except (ImportError, OSError):
|
||||
pytest.skip("libnacl/libsodium is not installed")
|
||||
return
|
||||
|
||||
import json
|
||||
|
||||
resp = await mock_api_client.post(
|
||||
'/api/mobile_app/devices', json=REGISTER
|
||||
)
|
||||
|
||||
assert resp.status == 201
|
||||
register_json = await resp.json()
|
||||
assert CONF_WEBHOOK_ID in register_json
|
||||
assert CONF_SECRET in register_json
|
||||
|
||||
keylen = SecretBox.KEY_SIZE
|
||||
key = register_json[CONF_SECRET].encode("utf-8")
|
||||
key = key[:keylen]
|
||||
key = key.ljust(keylen, b'\0')
|
||||
|
||||
payload = json.dumps({'template': 'Hello world'}).encode("utf-8")
|
||||
|
||||
data = SecretBox(key).encrypt(payload,
|
||||
encoder=Base64Encoder).decode("utf-8")
|
||||
|
||||
container = {
|
||||
'type': 'render_template',
|
||||
'encrypted': True,
|
||||
'encrypted_data': data,
|
||||
}
|
||||
|
||||
mobile_app_client = await hass_client()
|
||||
|
||||
resp = await mobile_app_client.post(
|
||||
'/api/webhook/{}'.format(register_json[CONF_WEBHOOK_ID]),
|
||||
json=container
|
||||
)
|
||||
|
||||
assert resp.status == 200
|
||||
|
||||
webhook_json = await resp.json()
|
||||
assert webhook_json == {'rendered': 'Hello world'}
|
145
tests/components/mobile_app/test_webhook.py
Normal file
145
tests/components/mobile_app/test_webhook.py
Normal file
|
@ -0,0 +1,145 @@
|
|||
"""Webhook tests for mobile_app."""
|
||||
# pylint: disable=redefined-outer-name,unused-import
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.mobile_app.const import CONF_SECRET
|
||||
from homeassistant.const import CONF_WEBHOOK_ID
|
||||
from homeassistant.core import callback
|
||||
|
||||
from tests.common import async_mock_service
|
||||
|
||||
from . import authed_api_client, webhook_client # noqa: F401
|
||||
|
||||
from .const import (CALL_SERVICE, FIRE_EVENT, REGISTER, RENDER_TEMPLATE,
|
||||
UPDATE)
|
||||
|
||||
|
||||
async def test_webhook_handle_render_template(webhook_client): # noqa: F811
|
||||
"""Test that we render templates properly."""
|
||||
resp = await webhook_client.post(
|
||||
'/api/webhook/mobile_app_test',
|
||||
json=RENDER_TEMPLATE
|
||||
)
|
||||
|
||||
assert resp.status == 200
|
||||
|
||||
json = await resp.json()
|
||||
assert json == {'rendered': 'Hello world'}
|
||||
|
||||
|
||||
async def test_webhook_handle_call_services(hass, webhook_client): # noqa: E501 F811
|
||||
"""Test that we call services properly."""
|
||||
calls = async_mock_service(hass, 'test', 'mobile_app')
|
||||
|
||||
resp = await webhook_client.post(
|
||||
'/api/webhook/mobile_app_test',
|
||||
json=CALL_SERVICE
|
||||
)
|
||||
|
||||
assert resp.status == 200
|
||||
|
||||
assert len(calls) == 1
|
||||
|
||||
|
||||
async def test_webhook_handle_fire_event(hass, webhook_client): # noqa: F811
|
||||
"""Test that we can fire events."""
|
||||
events = []
|
||||
|
||||
@callback
|
||||
def store_event(event):
|
||||
"""Helepr to store events."""
|
||||
events.append(event)
|
||||
|
||||
hass.bus.async_listen('test_event', store_event)
|
||||
|
||||
resp = await webhook_client.post(
|
||||
'/api/webhook/mobile_app_test',
|
||||
json=FIRE_EVENT
|
||||
)
|
||||
|
||||
assert resp.status == 200
|
||||
json = await resp.json()
|
||||
assert json == {}
|
||||
|
||||
assert len(events) == 1
|
||||
assert events[0].data['hello'] == 'yo world'
|
||||
|
||||
|
||||
async def test_webhook_update_registration(webhook_client, hass_client): # noqa: E501 F811
|
||||
"""Test that a we can update an existing registration via webhook."""
|
||||
authed_api_client = await hass_client() # noqa: F811
|
||||
register_resp = await authed_api_client.post(
|
||||
'/api/mobile_app/registrations', json=REGISTER
|
||||
)
|
||||
|
||||
assert register_resp.status == 201
|
||||
register_json = await register_resp.json()
|
||||
|
||||
webhook_id = register_json[CONF_WEBHOOK_ID]
|
||||
|
||||
update_container = {
|
||||
'type': 'update_registration',
|
||||
'data': UPDATE
|
||||
}
|
||||
|
||||
update_resp = await webhook_client.post(
|
||||
'/api/webhook/{}'.format(webhook_id), json=update_container
|
||||
)
|
||||
|
||||
assert update_resp.status == 200
|
||||
update_json = await update_resp.json()
|
||||
assert update_json['app_version'] == '2.0.0'
|
||||
assert CONF_WEBHOOK_ID not in update_json
|
||||
assert CONF_SECRET not in update_json
|
||||
|
||||
|
||||
async def test_webhook_returns_error_incorrect_json(webhook_client, caplog): # noqa: E501 F811
|
||||
"""Test that an error is returned when JSON is invalid."""
|
||||
resp = await webhook_client.post(
|
||||
'/api/webhook/mobile_app_test',
|
||||
data='not json'
|
||||
)
|
||||
|
||||
assert resp.status == 400
|
||||
json = await resp.json()
|
||||
assert json == {}
|
||||
assert 'invalid JSON' in caplog.text
|
||||
|
||||
|
||||
async def test_webhook_handle_decryption(webhook_client): # noqa: F811
|
||||
"""Test that we can encrypt/decrypt properly."""
|
||||
try:
|
||||
# pylint: disable=unused-import
|
||||
from nacl.secret import SecretBox # noqa: F401
|
||||
from nacl.encoding import Base64Encoder # noqa: F401
|
||||
except (ImportError, OSError):
|
||||
pytest.skip("libnacl/libsodium is not installed")
|
||||
return
|
||||
|
||||
import json
|
||||
|
||||
keylen = SecretBox.KEY_SIZE
|
||||
key = "58eb127991594dad934d1584bdee5f27".encode("utf-8")
|
||||
key = key[:keylen]
|
||||
key = key.ljust(keylen, b'\0')
|
||||
|
||||
payload = json.dumps({'template': 'Hello world'}).encode("utf-8")
|
||||
|
||||
data = SecretBox(key).encrypt(payload,
|
||||
encoder=Base64Encoder).decode("utf-8")
|
||||
|
||||
container = {
|
||||
'type': 'render_template',
|
||||
'encrypted': True,
|
||||
'encrypted_data': data,
|
||||
}
|
||||
|
||||
resp = await webhook_client.post(
|
||||
'/api/webhook/mobile_app_test',
|
||||
json=container
|
||||
)
|
||||
|
||||
assert resp.status == 200
|
||||
|
||||
json = await resp.json()
|
||||
assert json == {'rendered': 'Hello world'}
|
107
tests/components/mobile_app/test_websocket_api.py
Normal file
107
tests/components/mobile_app/test_websocket_api.py
Normal file
|
@ -0,0 +1,107 @@
|
|||
"""Test the mobile_app websocket API."""
|
||||
# pylint: disable=redefined-outer-name,unused-import
|
||||
from homeassistant.components.mobile_app.const import (CONF_SECRET, DOMAIN)
|
||||
from homeassistant.components.websocket_api.const import TYPE_RESULT
|
||||
from homeassistant.const import CONF_WEBHOOK_ID
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from . import authed_api_client, setup_ws, webhook_client # noqa: F401
|
||||
from .const import (CALL_SERVICE, REGISTER)
|
||||
|
||||
|
||||
async def test_webocket_get_registration(hass, setup_ws, authed_api_client, # noqa: E501 F811
|
||||
hass_ws_client):
|
||||
"""Test get_registration websocket command."""
|
||||
register_resp = await authed_api_client.post(
|
||||
'/api/mobile_app/registrations', json=REGISTER
|
||||
)
|
||||
|
||||
assert register_resp.status == 201
|
||||
register_json = await register_resp.json()
|
||||
assert CONF_WEBHOOK_ID in register_json
|
||||
assert CONF_SECRET in register_json
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
await client.send_json({
|
||||
'id': 5,
|
||||
'type': 'mobile_app/get_registration',
|
||||
CONF_WEBHOOK_ID: register_json[CONF_WEBHOOK_ID],
|
||||
})
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert msg['id'] == 5
|
||||
assert msg['type'] == TYPE_RESULT
|
||||
assert msg['success']
|
||||
assert msg['result']['app_id'] == 'io.homeassistant.mobile_app_test'
|
||||
|
||||
|
||||
async def test_webocket_get_user_registrations(hass, aiohttp_client,
|
||||
hass_ws_client,
|
||||
hass_read_only_access_token):
|
||||
"""Test get_user_registrations websocket command from admin perspective."""
|
||||
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
|
||||
|
||||
user_api_client = await aiohttp_client(hass.http.app, headers={
|
||||
'Authorization': "Bearer {}".format(hass_read_only_access_token)
|
||||
})
|
||||
|
||||
# First a read only user registers.
|
||||
register_resp = await user_api_client.post(
|
||||
'/api/mobile_app/registrations', json=REGISTER
|
||||
)
|
||||
|
||||
assert register_resp.status == 201
|
||||
register_json = await register_resp.json()
|
||||
assert CONF_WEBHOOK_ID in register_json
|
||||
assert CONF_SECRET in register_json
|
||||
|
||||
# Then the admin user attempts to access it.
|
||||
client = await hass_ws_client(hass)
|
||||
await client.send_json({
|
||||
'id': 5,
|
||||
'type': 'mobile_app/get_user_registrations',
|
||||
})
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert msg['id'] == 5
|
||||
assert msg['type'] == TYPE_RESULT
|
||||
assert msg['success']
|
||||
assert len(msg['result']) == 1
|
||||
|
||||
|
||||
async def test_webocket_delete_registration(hass, hass_client,
|
||||
hass_ws_client, webhook_client): # noqa: E501 F811
|
||||
"""Test delete_registration websocket command."""
|
||||
authed_api_client = await hass_client() # noqa: F811
|
||||
register_resp = await authed_api_client.post(
|
||||
'/api/mobile_app/registrations', json=REGISTER
|
||||
)
|
||||
|
||||
assert register_resp.status == 201
|
||||
register_json = await register_resp.json()
|
||||
assert CONF_WEBHOOK_ID in register_json
|
||||
assert CONF_SECRET in register_json
|
||||
|
||||
webhook_id = register_json[CONF_WEBHOOK_ID]
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
await client.send_json({
|
||||
'id': 5,
|
||||
'type': 'mobile_app/delete_registration',
|
||||
CONF_WEBHOOK_ID: webhook_id,
|
||||
})
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert msg['id'] == 5
|
||||
assert msg['type'] == TYPE_RESULT
|
||||
assert msg['success']
|
||||
assert msg['result'] == 'ok'
|
||||
|
||||
ensure_four_ten_gone = await webhook_client.post(
|
||||
'/api/webhook/{}'.format(webhook_id), json=CALL_SERVICE
|
||||
)
|
||||
|
||||
assert ensure_four_ten_gone.status == 410
|
Loading…
Add table
Add a link
Reference in a new issue