Fix websocket async (#4752)

* Ensure we write to websocket from inside event loop

* Inline service call helper
This commit is contained in:
Paulus Schoutsen 2016-12-05 18:03:06 -08:00
parent 5e492db9a3
commit a458ce8069

View file

@ -204,7 +204,6 @@ class ActiveConnection:
self.hass = hass
self.request = request
self.wsock = None
self.socket_task = None
self.event_listeners = {}
def debug(self, message1, message2=''):
@ -220,34 +219,6 @@ class ActiveConnection:
self.debug('Sending', message)
self.wsock.send_json(message, dumps=JSON_DUMP)
@callback
def _cancel_connection(self, event):
"""Cancel this connection."""
self.socket_task.cancel()
@asyncio.coroutine
def _call_service_helper(self, msg):
"""Helper to call a service and fire complete message."""
yield from self.hass.services.async_call(msg['domain'], msg['service'],
msg['service_data'], True)
try:
self.send_message(result_message(msg['id']))
except RuntimeError:
# Socket has been closed.
pass
@callback
def _forward_event(self, iden, event):
"""Helper to forward events to websocket."""
if event.event_type == EVENT_TIME_CHANGED:
return
try:
self.send_message(event_message(iden, event))
except RuntimeError:
# Socket has been closed.
pass
@asyncio.coroutine
def handle(self):
"""Handle the websocket connection."""
@ -255,9 +226,15 @@ class ActiveConnection:
yield from wsock.prepare(self.request)
# Set up to cancel this connection when Home Assistant shuts down
self.socket_task = asyncio.Task.current_task(loop=self.hass.loop)
self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP,
self._cancel_connection)
socket_task = asyncio.Task.current_task(loop=self.hass.loop)
@callback
def cancel_connection(event):
"""Cancel this connection."""
socket_task.cancel()
unsub_stop = self.hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP,
cancel_connection)
self.debug('Connected')
@ -351,6 +328,8 @@ class ActiveConnection:
_LOGGER.exception(error)
finally:
unsub_stop()
for unsub in self.event_listeners.values():
unsub()
@ -363,8 +342,20 @@ class ActiveConnection:
"""Handle subscribe events command."""
msg = SUBSCRIBE_EVENTS_MESSAGE_SCHEMA(msg)
@callback
def forward_events(event):
"""Helper to forward events to websocket."""
if event.event_type == EVENT_TIME_CHANGED:
return
try:
self.send_message(event_message(msg['id'], event))
except RuntimeError:
# Socket has been closed.
pass
self.event_listeners[msg['id']] = self.hass.bus.async_listen(
msg['event_type'], partial(self._forward_event, msg['id']))
msg['event_type'], forward_events)
self.send_message(result_message(msg['id']))
@ -386,7 +377,18 @@ class ActiveConnection:
"""Handle call service command."""
msg = CALL_SERVICE_MESSAGE_SCHEMA(msg)
self.hass.async_add_job(self._call_service_helper(msg))
@asyncio.coroutine
def call_service_helper(msg):
"""Helper to call a service and fire complete message."""
yield from self.hass.services.async_call(
msg['domain'], msg['service'], msg['service_data'], True)
try:
self.send_message(result_message(msg['id']))
except RuntimeError:
# Socket has been closed.
pass
self.hass.async_add_job(call_service_helper(msg))
def handle_get_states(self, msg):
"""Handle get states command."""