Fix filtering of EVENT_CALL_SERVICE and EVENT_SERVICE_EXECUTED events
This commit is contained in:
parent
8ace656657
commit
6f398f59df
1 changed files with 40 additions and 21 deletions
|
@ -18,6 +18,8 @@ mqtt_eventstream:
|
||||||
"""
|
"""
|
||||||
import json
|
import json
|
||||||
from homeassistant.core import EventOrigin, State
|
from homeassistant.core import EventOrigin, State
|
||||||
|
from homeassistant.components.mqtt import DOMAIN as MQTT_DOMAIN
|
||||||
|
from homeassistant.components.mqtt import SERVICE_PUBLISH as MQTT_SVC_PUBLISH
|
||||||
from homeassistant.const import (
|
from homeassistant.const import (
|
||||||
MATCH_ALL,
|
MATCH_ALL,
|
||||||
EVENT_TIME_CHANGED,
|
EVENT_TIME_CHANGED,
|
||||||
|
@ -25,7 +27,6 @@ from homeassistant.const import (
|
||||||
EVENT_SERVICE_EXECUTED,
|
EVENT_SERVICE_EXECUTED,
|
||||||
EVENT_STATE_CHANGED,
|
EVENT_STATE_CHANGED,
|
||||||
)
|
)
|
||||||
|
|
||||||
import homeassistant.loader as loader
|
import homeassistant.loader as loader
|
||||||
from homeassistant.remote import JSONEncoder
|
from homeassistant.remote import JSONEncoder
|
||||||
|
|
||||||
|
@ -38,41 +39,59 @@ DEPENDENCIES = ['mqtt']
|
||||||
|
|
||||||
def setup(hass, config):
|
def setup(hass, config):
|
||||||
""" Setup our mqtt_eventstream component. """
|
""" Setup our mqtt_eventstream component. """
|
||||||
def _event_handler(event):
|
|
||||||
""" Handle events by publishing them on the mqtt queue. """
|
|
||||||
if event.origin != EventOrigin.local:
|
|
||||||
return
|
|
||||||
if event.event_type in (
|
|
||||||
EVENT_TIME_CHANGED,
|
|
||||||
EVENT_CALL_SERVICE,
|
|
||||||
EVENT_SERVICE_EXECUTED
|
|
||||||
):
|
|
||||||
return
|
|
||||||
event = {'event_type': event.event_type, 'event_data': event.data}
|
|
||||||
msg = json.dumps(event, cls=JSONEncoder)
|
|
||||||
mqtt.publish(hass, pub_topic, msg)
|
|
||||||
|
|
||||||
mqtt = loader.get_component('mqtt')
|
mqtt = loader.get_component('mqtt')
|
||||||
pub_topic = config[DOMAIN].get('publish_topic', None)
|
pub_topic = config[DOMAIN].get('publish_topic', None)
|
||||||
sub_topic = config[DOMAIN].get('subscribe_topic', None)
|
sub_topic = config[DOMAIN].get('subscribe_topic', None)
|
||||||
|
|
||||||
|
def _event_publisher(event):
|
||||||
|
""" Handle events by publishing them on the mqtt queue. """
|
||||||
|
if event.origin != EventOrigin.local:
|
||||||
|
return
|
||||||
|
if event.event_type == EVENT_TIME_CHANGED:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Filter out the events that were triggered by publishing
|
||||||
|
# to the MQTT topic, or you will end up in an infinite loop.
|
||||||
|
if event.event_type == EVENT_CALL_SERVICE:
|
||||||
|
if (
|
||||||
|
event.data.get('domain') == MQTT_DOMAIN and
|
||||||
|
event.data.get('service') == MQTT_SVC_PUBLISH and
|
||||||
|
event.data.get('topic') == pub_topic
|
||||||
|
):
|
||||||
|
return
|
||||||
|
|
||||||
|
# Filter out all the "event service executed" events because they
|
||||||
|
# are only used internally by core as callbacks for blocking
|
||||||
|
# during the interval while a service is being executed.
|
||||||
|
# They will serve no purpose to the external system,
|
||||||
|
# and thus are unnecessary traffic.
|
||||||
|
# And at any rate it would cause an infinite loop to publish them
|
||||||
|
# because publishing to an MQTT topic itself triggers one.
|
||||||
|
if event.event_type == EVENT_SERVICE_EXECUTED:
|
||||||
|
return
|
||||||
|
|
||||||
|
event_info = {'event_type': event.event_type, 'event_data': event.data}
|
||||||
|
msg = json.dumps(event_info, cls=JSONEncoder)
|
||||||
|
mqtt.publish(hass, pub_topic, msg)
|
||||||
|
|
||||||
# Only listen for local events if you are going to publish them
|
# Only listen for local events if you are going to publish them
|
||||||
if (pub_topic):
|
if pub_topic:
|
||||||
hass.bus.listen(MATCH_ALL, _event_handler)
|
hass.bus.listen(MATCH_ALL, _event_publisher)
|
||||||
|
|
||||||
# Process events from a remote server that are received on a queue
|
# Process events from a remote server that are received on a queue
|
||||||
def _event_receiver(topic, payload, qos):
|
def _event_receiver(topic, payload, qos):
|
||||||
"""
|
"""
|
||||||
A new MQTT message, published by the other HA instance,
|
Receive events published by the other HA instance and fire
|
||||||
has been received.
|
them on this hass instance.
|
||||||
"""
|
"""
|
||||||
# TODO error handling
|
|
||||||
event = json.loads(payload)
|
event = json.loads(payload)
|
||||||
event_type = event.get('event_type')
|
event_type = event.get('event_type')
|
||||||
event_data = event.get('event_data')
|
event_data = event.get('event_data')
|
||||||
|
|
||||||
# Special case handling for event STATE_CHANGED
|
# Special case handling for event STATE_CHANGED
|
||||||
# We will try to convert state dicts back to State objects
|
# We will try to convert state dicts back to State objects
|
||||||
|
# Copied over from the _handle_api_post_events_event method
|
||||||
|
# of the api component.
|
||||||
if event_type == EVENT_STATE_CHANGED and event_data:
|
if event_type == EVENT_STATE_CHANGED and event_data:
|
||||||
for key in ('old_state', 'new_state'):
|
for key in ('old_state', 'new_state'):
|
||||||
state = State.from_dict(event_data.get(key))
|
state = State.from_dict(event_data.get(key))
|
||||||
|
@ -87,7 +106,7 @@ def setup(hass, config):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Only subscribe if you specified a topic
|
# Only subscribe if you specified a topic
|
||||||
if (sub_topic):
|
if sub_topic:
|
||||||
mqtt.subscribe(hass, sub_topic, _event_receiver)
|
mqtt.subscribe(hass, sub_topic, _event_receiver)
|
||||||
|
|
||||||
hass.states.set('{domain}.initialized'.format(domain=DOMAIN), True)
|
hass.states.set('{domain}.initialized'.format(domain=DOMAIN), True)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue