Allow easy extension of websocket API (#14186)
* Allow easy extension of websocket API * Lint * Move panel test to frontend * Register websocket commands * Simplify test * Lint
This commit is contained in:
parent
cdd45e7878
commit
d82693b460
5 changed files with 203 additions and 154 deletions
|
@ -18,8 +18,8 @@ from voluptuous.humanize import humanize_error
|
|||
from homeassistant.const import (
|
||||
MATCH_ALL, EVENT_TIME_CHANGED, EVENT_HOMEASSISTANT_STOP,
|
||||
__version__)
|
||||
from homeassistant.components import frontend
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.loader import bind_hass
|
||||
from homeassistant.remote import JSONEncoder
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.service import async_get_all_descriptions
|
||||
|
@ -46,7 +46,6 @@ TYPE_AUTH_REQUIRED = 'auth_required'
|
|||
TYPE_CALL_SERVICE = 'call_service'
|
||||
TYPE_EVENT = 'event'
|
||||
TYPE_GET_CONFIG = 'get_config'
|
||||
TYPE_GET_PANELS = 'get_panels'
|
||||
TYPE_GET_SERVICES = 'get_services'
|
||||
TYPE_GET_STATES = 'get_states'
|
||||
TYPE_PING = 'ping'
|
||||
|
@ -64,62 +63,56 @@ AUTH_MESSAGE_SCHEMA = vol.Schema({
|
|||
vol.Required('api_password'): str,
|
||||
})
|
||||
|
||||
SUBSCRIBE_EVENTS_MESSAGE_SCHEMA = vol.Schema({
|
||||
# Minimal requirements of a message
|
||||
MINIMAL_MESSAGE_SCHEMA = vol.Schema({
|
||||
vol.Required('id'): cv.positive_int,
|
||||
vol.Required('type'): cv.string,
|
||||
}, extra=vol.ALLOW_EXTRA)
|
||||
# Base schema to extend by message handlers
|
||||
BASE_COMMAND_MESSAGE_SCHEMA = vol.Schema({
|
||||
vol.Required('id'): cv.positive_int,
|
||||
})
|
||||
|
||||
|
||||
SCHEMA_SUBSCRIBE_EVENTS = BASE_COMMAND_MESSAGE_SCHEMA.extend({
|
||||
vol.Required('type'): TYPE_SUBSCRIBE_EVENTS,
|
||||
vol.Optional('event_type', default=MATCH_ALL): str,
|
||||
})
|
||||
|
||||
UNSUBSCRIBE_EVENTS_MESSAGE_SCHEMA = vol.Schema({
|
||||
vol.Required('id'): cv.positive_int,
|
||||
|
||||
SCHEMA_UNSUBSCRIBE_EVENTS = BASE_COMMAND_MESSAGE_SCHEMA.extend({
|
||||
vol.Required('type'): TYPE_UNSUBSCRIBE_EVENTS,
|
||||
vol.Required('subscription'): cv.positive_int,
|
||||
})
|
||||
|
||||
CALL_SERVICE_MESSAGE_SCHEMA = vol.Schema({
|
||||
vol.Required('id'): cv.positive_int,
|
||||
|
||||
SCHEMA_CALL_SERVICE = BASE_COMMAND_MESSAGE_SCHEMA.extend({
|
||||
vol.Required('type'): TYPE_CALL_SERVICE,
|
||||
vol.Required('domain'): str,
|
||||
vol.Required('service'): str,
|
||||
vol.Optional('service_data'): dict
|
||||
})
|
||||
|
||||
GET_STATES_MESSAGE_SCHEMA = vol.Schema({
|
||||
vol.Required('id'): cv.positive_int,
|
||||
|
||||
SCHEMA_GET_STATES = BASE_COMMAND_MESSAGE_SCHEMA.extend({
|
||||
vol.Required('type'): TYPE_GET_STATES,
|
||||
})
|
||||
|
||||
GET_SERVICES_MESSAGE_SCHEMA = vol.Schema({
|
||||
vol.Required('id'): cv.positive_int,
|
||||
|
||||
SCHEMA_GET_SERVICES = BASE_COMMAND_MESSAGE_SCHEMA.extend({
|
||||
vol.Required('type'): TYPE_GET_SERVICES,
|
||||
})
|
||||
|
||||
GET_CONFIG_MESSAGE_SCHEMA = vol.Schema({
|
||||
vol.Required('id'): cv.positive_int,
|
||||
|
||||
SCHEMA_GET_CONFIG = BASE_COMMAND_MESSAGE_SCHEMA.extend({
|
||||
vol.Required('type'): TYPE_GET_CONFIG,
|
||||
})
|
||||
|
||||
GET_PANELS_MESSAGE_SCHEMA = vol.Schema({
|
||||
vol.Required('id'): cv.positive_int,
|
||||
vol.Required('type'): TYPE_GET_PANELS,
|
||||
})
|
||||
|
||||
PING_MESSAGE_SCHEMA = vol.Schema({
|
||||
vol.Required('id'): cv.positive_int,
|
||||
SCHEMA_PING = BASE_COMMAND_MESSAGE_SCHEMA.extend({
|
||||
vol.Required('type'): TYPE_PING,
|
||||
})
|
||||
|
||||
BASE_COMMAND_MESSAGE_SCHEMA = vol.Schema({
|
||||
vol.Required('id'): cv.positive_int,
|
||||
vol.Required('type'): vol.Any(TYPE_CALL_SERVICE,
|
||||
TYPE_SUBSCRIBE_EVENTS,
|
||||
TYPE_UNSUBSCRIBE_EVENTS,
|
||||
TYPE_GET_STATES,
|
||||
TYPE_GET_SERVICES,
|
||||
TYPE_GET_CONFIG,
|
||||
TYPE_GET_PANELS,
|
||||
TYPE_PING)
|
||||
}, extra=vol.ALLOW_EXTRA)
|
||||
|
||||
# Define the possible errors that occur when connections are cancelled.
|
||||
# Originally, this was just asyncio.CancelledError, but issue #9546 showed
|
||||
|
@ -191,9 +184,36 @@ def result_message(iden, result=None):
|
|||
}
|
||||
|
||||
|
||||
@bind_hass
|
||||
@callback
|
||||
def async_register_command(hass, command, handler, schema):
|
||||
"""Register a websocket command."""
|
||||
handlers = hass.data.get(DOMAIN)
|
||||
if handlers is None:
|
||||
handlers = hass.data[DOMAIN] = {}
|
||||
handlers[command] = (handler, schema)
|
||||
|
||||
|
||||
async def async_setup(hass, config):
|
||||
"""Initialize the websocket API."""
|
||||
hass.http.register_view(WebsocketAPIView)
|
||||
|
||||
async_register_command(hass, TYPE_SUBSCRIBE_EVENTS,
|
||||
handle_subscribe_events, SCHEMA_SUBSCRIBE_EVENTS)
|
||||
async_register_command(hass, TYPE_UNSUBSCRIBE_EVENTS,
|
||||
handle_unsubscribe_events,
|
||||
SCHEMA_UNSUBSCRIBE_EVENTS)
|
||||
async_register_command(hass, TYPE_CALL_SERVICE,
|
||||
handle_call_service, SCHEMA_CALL_SERVICE)
|
||||
async_register_command(hass, TYPE_GET_STATES,
|
||||
handle_get_states, SCHEMA_GET_STATES)
|
||||
async_register_command(hass, TYPE_GET_SERVICES,
|
||||
handle_get_services, SCHEMA_GET_SERVICES)
|
||||
async_register_command(hass, TYPE_GET_CONFIG,
|
||||
handle_get_config, SCHEMA_GET_CONFIG)
|
||||
async_register_command(hass, TYPE_PING,
|
||||
handle_ping, SCHEMA_PING)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
@ -316,10 +336,11 @@ class ActiveConnection:
|
|||
|
||||
msg = await wsock.receive_json()
|
||||
last_id = 0
|
||||
handlers = self.hass.data[DOMAIN]
|
||||
|
||||
while msg:
|
||||
self.debug("Received", msg)
|
||||
msg = BASE_COMMAND_MESSAGE_SCHEMA(msg)
|
||||
msg = MINIMAL_MESSAGE_SCHEMA(msg)
|
||||
cur_id = msg['id']
|
||||
|
||||
if cur_id <= last_id:
|
||||
|
@ -327,9 +348,13 @@ class ActiveConnection:
|
|||
cur_id, ERR_ID_REUSE,
|
||||
'Identifier values have to increase.'))
|
||||
|
||||
elif msg['type'] not in handlers:
|
||||
# Unknown command
|
||||
break
|
||||
|
||||
else:
|
||||
handler_name = 'handle_{}'.format(msg['type'])
|
||||
getattr(self, handler_name)(msg)
|
||||
handler, schema = handlers[msg['type']]
|
||||
handler(self.hass, self, schema(msg))
|
||||
|
||||
last_id = cur_id
|
||||
msg = await wsock.receive_json()
|
||||
|
@ -403,109 +428,89 @@ class ActiveConnection:
|
|||
|
||||
return wsock
|
||||
|
||||
def handle_subscribe_events(self, msg):
|
||||
"""Handle subscribe events command.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
msg = SUBSCRIBE_EVENTS_MESSAGE_SCHEMA(msg)
|
||||
def handle_subscribe_events(hass, connection, msg):
|
||||
"""Handle subscribe events command.
|
||||
|
||||
async def forward_events(event):
|
||||
"""Forward events to websocket."""
|
||||
if event.event_type == EVENT_TIME_CHANGED:
|
||||
return
|
||||
Async friendly.
|
||||
"""
|
||||
async def forward_events(event):
|
||||
"""Forward events to websocket."""
|
||||
if event.event_type == EVENT_TIME_CHANGED:
|
||||
return
|
||||
|
||||
self.send_message_outside(event_message(msg['id'], event))
|
||||
connection.send_message_outside(event_message(msg['id'], event))
|
||||
|
||||
self.event_listeners[msg['id']] = self.hass.bus.async_listen(
|
||||
msg['event_type'], forward_events)
|
||||
connection.event_listeners[msg['id']] = hass.bus.async_listen(
|
||||
msg['event_type'], forward_events)
|
||||
|
||||
self.to_write.put_nowait(result_message(msg['id']))
|
||||
connection.to_write.put_nowait(result_message(msg['id']))
|
||||
|
||||
def handle_unsubscribe_events(self, msg):
|
||||
"""Handle unsubscribe events command.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
msg = UNSUBSCRIBE_EVENTS_MESSAGE_SCHEMA(msg)
|
||||
def handle_unsubscribe_events(hass, connection, msg):
|
||||
"""Handle unsubscribe events command.
|
||||
|
||||
subscription = msg['subscription']
|
||||
Async friendly.
|
||||
"""
|
||||
subscription = msg['subscription']
|
||||
|
||||
if subscription in self.event_listeners:
|
||||
self.event_listeners.pop(subscription)()
|
||||
self.to_write.put_nowait(result_message(msg['id']))
|
||||
else:
|
||||
self.to_write.put_nowait(error_message(
|
||||
msg['id'], ERR_NOT_FOUND,
|
||||
'Subscription not found.'))
|
||||
if subscription in connection.event_listeners:
|
||||
connection.event_listeners.pop(subscription)()
|
||||
connection.to_write.put_nowait(result_message(msg['id']))
|
||||
else:
|
||||
connection.to_write.put_nowait(error_message(
|
||||
msg['id'], ERR_NOT_FOUND, 'Subscription not found.'))
|
||||
|
||||
def handle_call_service(self, msg):
|
||||
"""Handle call service command.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
msg = CALL_SERVICE_MESSAGE_SCHEMA(msg)
|
||||
def handle_call_service(hass, connection, msg):
|
||||
"""Handle call service command.
|
||||
|
||||
async def call_service_helper(msg):
|
||||
"""Call a service and fire complete message."""
|
||||
await self.hass.services.async_call(
|
||||
msg['domain'], msg['service'], msg.get('service_data'), True)
|
||||
self.send_message_outside(result_message(msg['id']))
|
||||
Async friendly.
|
||||
"""
|
||||
async def call_service_helper(msg):
|
||||
"""Call a service and fire complete message."""
|
||||
await hass.services.async_call(
|
||||
msg['domain'], msg['service'], msg.get('service_data'), True)
|
||||
connection.send_message_outside(result_message(msg['id']))
|
||||
|
||||
self.hass.async_add_job(call_service_helper(msg))
|
||||
hass.async_add_job(call_service_helper(msg))
|
||||
|
||||
def handle_get_states(self, msg):
|
||||
"""Handle get states command.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
msg = GET_STATES_MESSAGE_SCHEMA(msg)
|
||||
def handle_get_states(hass, connection, msg):
|
||||
"""Handle get states command.
|
||||
|
||||
self.to_write.put_nowait(result_message(
|
||||
msg['id'], self.hass.states.async_all()))
|
||||
Async friendly.
|
||||
"""
|
||||
connection.to_write.put_nowait(result_message(
|
||||
msg['id'], hass.states.async_all()))
|
||||
|
||||
def handle_get_services(self, msg):
|
||||
"""Handle get services command.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
msg = GET_SERVICES_MESSAGE_SCHEMA(msg)
|
||||
def handle_get_services(hass, connection, msg):
|
||||
"""Handle get services command.
|
||||
|
||||
async def get_services_helper(msg):
|
||||
"""Get available services and fire complete message."""
|
||||
descriptions = await async_get_all_descriptions(self.hass)
|
||||
self.send_message_outside(result_message(msg['id'], descriptions))
|
||||
Async friendly.
|
||||
"""
|
||||
async def get_services_helper(msg):
|
||||
"""Get available services and fire complete message."""
|
||||
descriptions = await async_get_all_descriptions(hass)
|
||||
connection.send_message_outside(
|
||||
result_message(msg['id'], descriptions))
|
||||
|
||||
self.hass.async_add_job(get_services_helper(msg))
|
||||
hass.async_add_job(get_services_helper(msg))
|
||||
|
||||
def handle_get_config(self, msg):
|
||||
"""Handle get config command.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
msg = GET_CONFIG_MESSAGE_SCHEMA(msg)
|
||||
def handle_get_config(hass, connection, msg):
|
||||
"""Handle get config command.
|
||||
|
||||
self.to_write.put_nowait(result_message(
|
||||
msg['id'], self.hass.config.as_dict()))
|
||||
Async friendly.
|
||||
"""
|
||||
connection.to_write.put_nowait(result_message(
|
||||
msg['id'], hass.config.as_dict()))
|
||||
|
||||
def handle_get_panels(self, msg):
|
||||
"""Handle get panels command.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
msg = GET_PANELS_MESSAGE_SCHEMA(msg)
|
||||
panels = {
|
||||
panel:
|
||||
self.hass.data[frontend.DATA_PANELS][panel].to_response(
|
||||
self.hass, self.request)
|
||||
for panel in self.hass.data[frontend.DATA_PANELS]}
|
||||
def handle_ping(hass, connection, msg):
|
||||
"""Handle ping command.
|
||||
|
||||
self.to_write.put_nowait(result_message(
|
||||
msg['id'], panels))
|
||||
|
||||
def handle_ping(self, msg):
|
||||
"""Handle ping command.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
self.to_write.put_nowait(pong_message(msg['id']))
|
||||
Async friendly.
|
||||
"""
|
||||
connection.to_write.put_nowait(pong_message(msg['id']))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue