Add ping to streaming events API
This commit is contained in:
parent
a7c6413d07
commit
f46e0408b3
4 changed files with 39 additions and 19 deletions
|
@ -23,6 +23,9 @@ from homeassistant.const import (
|
||||||
DOMAIN = 'api'
|
DOMAIN = 'api'
|
||||||
DEPENDENCIES = ['http']
|
DEPENDENCIES = ['http']
|
||||||
|
|
||||||
|
STREAM_PING_PAYLOAD = "ping"
|
||||||
|
STREAM_PING_INTERVAL = 50 # seconds
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -86,38 +89,55 @@ def _handle_get_api(handler, path_match, data):
|
||||||
|
|
||||||
def _handle_get_api_stream(handler, path_match, data):
|
def _handle_get_api_stream(handler, path_match, data):
|
||||||
""" Provide a streaming interface for the event bus. """
|
""" Provide a streaming interface for the event bus. """
|
||||||
|
gracefully_closed = False
|
||||||
hass = handler.server.hass
|
hass = handler.server.hass
|
||||||
wfile = handler.wfile
|
wfile = handler.wfile
|
||||||
|
write_lock = threading.Lock()
|
||||||
block = threading.Event()
|
block = threading.Event()
|
||||||
|
|
||||||
def event_sourcer(event):
|
def write_message(payload):
|
||||||
|
""" Writes a message to the output. """
|
||||||
|
with write_lock:
|
||||||
|
msg = "data: {}\n\n".format(payload)
|
||||||
|
|
||||||
|
try:
|
||||||
|
wfile.write(msg.encode("UTF-8"))
|
||||||
|
wfile.flush()
|
||||||
|
except IOError:
|
||||||
|
block.set()
|
||||||
|
|
||||||
|
def forward_events(event):
|
||||||
""" Forwards events to the open request. """
|
""" Forwards events to the open request. """
|
||||||
|
nonlocal gracefully_closed
|
||||||
|
|
||||||
if block.is_set() or event.event_type == EVENT_TIME_CHANGED:
|
if block.is_set() or event.event_type == EVENT_TIME_CHANGED:
|
||||||
return
|
return
|
||||||
elif event.event_type == EVENT_HOMEASSISTANT_STOP:
|
elif event.event_type == EVENT_HOMEASSISTANT_STOP:
|
||||||
|
gracefully_closed = True
|
||||||
block.set()
|
block.set()
|
||||||
return
|
return
|
||||||
|
|
||||||
msg = "data: {}\n\n".format(
|
write_message(json.dumps(event, cls=rem.JSONEncoder))
|
||||||
json.dumps(event.as_dict(), cls=rem.JSONEncoder))
|
|
||||||
|
|
||||||
try:
|
|
||||||
wfile.write(msg.encode("UTF-8"))
|
|
||||||
wfile.flush()
|
|
||||||
except IOError:
|
|
||||||
block.set()
|
|
||||||
|
|
||||||
handler.send_response(HTTP_OK)
|
handler.send_response(HTTP_OK)
|
||||||
handler.send_header('Content-type', 'text/event-stream')
|
handler.send_header('Content-type', 'text/event-stream')
|
||||||
handler.end_headers()
|
handler.end_headers()
|
||||||
|
|
||||||
hass.bus.listen(MATCH_ALL, event_sourcer)
|
hass.bus.listen(MATCH_ALL, forward_events)
|
||||||
|
|
||||||
block.wait()
|
while True:
|
||||||
|
block.wait(STREAM_PING_INTERVAL)
|
||||||
|
|
||||||
_LOGGER.info("Found broken event stream to %s, cleaning up",
|
if block.is_set():
|
||||||
handler.client_address[0])
|
break
|
||||||
hass.bus.remove_listener(MATCH_ALL, event_sourcer)
|
|
||||||
|
write_message(STREAM_PING_PAYLOAD)
|
||||||
|
|
||||||
|
if not gracefully_closed:
|
||||||
|
_LOGGER.info("Found broken event stream to %s, cleaning up",
|
||||||
|
handler.client_address[0])
|
||||||
|
|
||||||
|
hass.bus.remove_listener(MATCH_ALL, forward_events)
|
||||||
|
|
||||||
|
|
||||||
def _handle_get_api_states(handler, path_match, data):
|
def _handle_get_api_states(handler, path_match, data):
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
""" DO NOT MODIFY. Auto-generated by build_frontend script """
|
""" DO NOT MODIFY. Auto-generated by build_frontend script """
|
||||||
VERSION = "84205c070e3e5992ace85220c4cd3da6"
|
VERSION = "516c0ee306fbd56fb3867e23dafdd7a6"
|
||||||
|
|
File diff suppressed because one or more lines are too long
|
@ -1 +1 @@
|
||||||
Subproject commit 002f04e6cce669b3d144c38007884cddc3478d20
|
Subproject commit 83d86a9328a79bfb73da16787d8b59862a83fa98
|
Loading…
Add table
Reference in a new issue