hass-core/homeassistant/components/mqtt/__init__.py
Pascal Bach 69e9d39690 Allow setting the retain flag for mqtt switch.
Some devices can read the initial value on startup.
If the retain flag is set they always receive the value as last set by
home assistant.
2015-11-26 21:03:21 +01:00

319 lines
9.5 KiB
Python

"""
homeassistant.components.mqtt
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
MQTT component, using paho-mqtt.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/mqtt/
"""
import json
import logging
import os
import socket
import time
from homeassistant.exceptions import HomeAssistantError
import homeassistant.util as util
from homeassistant.helpers import validate_config
from homeassistant.const import (
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP)
_LOGGER = logging.getLogger(__name__)
DOMAIN = "mqtt"
MQTT_CLIENT = None
DEFAULT_PORT = 1883
DEFAULT_KEEPALIVE = 60
DEFAULT_QOS = 0
DEFAULT_RETAIN = False
SERVICE_PUBLISH = 'publish'
EVENT_MQTT_MESSAGE_RECEIVED = 'MQTT_MESSAGE_RECEIVED'
DEPENDENCIES = []
REQUIREMENTS = ['paho-mqtt==1.1', 'jsonpath-rw==1.4.0']
CONF_BROKER = 'broker'
CONF_PORT = 'port'
CONF_CLIENT_ID = 'client_id'
CONF_KEEPALIVE = 'keepalive'
CONF_USERNAME = 'username'
CONF_PASSWORD = 'password'
CONF_CERTIFICATE = 'certificate'
ATTR_TOPIC = 'topic'
ATTR_PAYLOAD = 'payload'
ATTR_QOS = 'qos'
ATTR_RETAIN = 'retain'
MAX_RECONNECT_WAIT = 300 # seconds
def publish(hass, topic, payload, qos=None, retain=None):
""" Send an MQTT message. """
data = {
ATTR_TOPIC: topic,
ATTR_PAYLOAD: payload,
}
if qos is not None:
data[ATTR_QOS] = qos
if retain is not None:
data[ATTR_RETAIN] = retain
hass.services.call(DOMAIN, SERVICE_PUBLISH, data)
def subscribe(hass, topic, callback, qos=DEFAULT_QOS):
""" Subscribe to a topic. """
def mqtt_topic_subscriber(event):
""" Match subscribed MQTT topic. """
if _match_topic(topic, event.data[ATTR_TOPIC]):
callback(event.data[ATTR_TOPIC], event.data[ATTR_PAYLOAD],
event.data[ATTR_QOS])
hass.bus.listen(EVENT_MQTT_MESSAGE_RECEIVED, mqtt_topic_subscriber)
MQTT_CLIENT.subscribe(topic, qos)
def setup(hass, config):
""" Get the MQTT protocol service. """
if not validate_config(config, {DOMAIN: ['broker']}, _LOGGER):
return False
conf = config[DOMAIN]
broker = conf[CONF_BROKER]
port = util.convert(conf.get(CONF_PORT), int, DEFAULT_PORT)
client_id = util.convert(conf.get(CONF_CLIENT_ID), str)
keepalive = util.convert(conf.get(CONF_KEEPALIVE), int, DEFAULT_KEEPALIVE)
username = util.convert(conf.get(CONF_USERNAME), str)
password = util.convert(conf.get(CONF_PASSWORD), str)
certificate = util.convert(conf.get(CONF_CERTIFICATE), str)
# For cloudmqtt.com, secured connection, auto fill in certificate
if certificate is None and 19999 < port < 30000 and \
broker.endswith('.cloudmqtt.com'):
certificate = os.path.join(os.path.dirname(__file__),
'addtrustexternalcaroot.crt')
global MQTT_CLIENT
try:
MQTT_CLIENT = MQTT(hass, broker, port, client_id, keepalive, username,
password, certificate)
except socket.error:
_LOGGER.exception("Can't connect to the broker. "
"Please check your settings and the broker "
"itself.")
return False
def stop_mqtt(event):
""" Stop MQTT component. """
MQTT_CLIENT.stop()
def start_mqtt(event):
""" Launch MQTT component when Home Assistant starts up. """
MQTT_CLIENT.start()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_mqtt)
def publish_service(call):
""" Handle MQTT publish service calls. """
msg_topic = call.data.get(ATTR_TOPIC)
payload = call.data.get(ATTR_PAYLOAD)
qos = call.data.get(ATTR_QOS, DEFAULT_QOS)
retain = call.data.get(ATTR_RETAIN, DEFAULT_RETAIN)
if msg_topic is None or payload is None:
return
MQTT_CLIENT.publish(msg_topic, payload, qos, retain)
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_mqtt)
hass.services.register(DOMAIN, SERVICE_PUBLISH, publish_service)
return True
# pylint: disable=too-few-public-methods
class _JsonFmtParser(object):
""" Implements a json parser on xpath. """
def __init__(self, jsonpath):
import jsonpath_rw
self._expr = jsonpath_rw.parse(jsonpath)
def __call__(self, payload):
match = self._expr.find(json.loads(payload))
return match[0].value if len(match) > 0 else payload
# pylint: disable=too-few-public-methods
class FmtParser(object):
""" Wrapper for all supported formats. """
def __init__(self, fmt):
self._parse = lambda x: x
if fmt:
if fmt.startswith('json:'):
self._parse = _JsonFmtParser(fmt[5:])
def __call__(self, payload):
return self._parse(payload)
# This is based on one of the paho-mqtt examples:
# http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.python.git/tree/examples/sub-class.py
# pylint: disable=too-many-arguments
class MQTT(object):
""" Implements messaging service for MQTT. """
def __init__(self, hass, broker, port, client_id, keepalive, username,
password, certificate):
import paho.mqtt.client as mqtt
self.userdata = {
'hass': hass,
'topics': {},
'progress': {},
}
if client_id is None:
self._mqttc = mqtt.Client()
else:
self._mqttc = mqtt.Client(client_id)
self._mqttc.user_data_set(self.userdata)
if username is not None:
self._mqttc.username_pw_set(username, password)
if certificate is not None:
self._mqttc.tls_set(certificate)
self._mqttc.on_subscribe = _mqtt_on_subscribe
self._mqttc.on_unsubscribe = _mqtt_on_unsubscribe
self._mqttc.on_connect = _mqtt_on_connect
self._mqttc.on_disconnect = _mqtt_on_disconnect
self._mqttc.on_message = _mqtt_on_message
self._mqttc.connect(broker, port, keepalive)
def publish(self, topic, payload, qos, retain):
""" Publish a MQTT message. """
self._mqttc.publish(topic, payload, qos, retain)
def start(self):
""" Run the MQTT client. """
self._mqttc.loop_start()
def stop(self):
""" Stop the MQTT client. """
self._mqttc.loop_stop()
def subscribe(self, topic, qos):
""" Subscribe to a topic. """
if topic in self.userdata['topics']:
return
result, mid = self._mqttc.subscribe(topic, qos)
_raise_on_error(result)
self.userdata['progress'][mid] = topic
self.userdata['topics'][topic] = None
def unsubscribe(self, topic):
""" Unsubscribe from topic. """
result, mid = self._mqttc.unsubscribe(topic)
_raise_on_error(result)
self.userdata['progress'][mid] = topic
def _mqtt_on_message(mqttc, userdata, msg):
""" Message callback """
userdata['hass'].bus.fire(EVENT_MQTT_MESSAGE_RECEIVED, {
ATTR_TOPIC: msg.topic,
ATTR_QOS: msg.qos,
ATTR_PAYLOAD: msg.payload.decode('utf-8'),
})
def _mqtt_on_connect(mqttc, userdata, flags, result_code):
""" On connect, resubscribe to all topics we were subscribed to. """
if result_code != 0:
_LOGGER.error('Unable to connect to the MQTT broker: %s', {
1: 'Incorrect protocol version',
2: 'Invalid client identifier',
3: 'Server unavailable',
4: 'Bad username or password',
5: 'Not authorised'
}.get(result_code, 'Unknown reason'))
mqttc.disconnect()
return
old_topics = userdata['topics']
userdata['topics'] = {}
userdata['progress'] = {}
for topic, qos in old_topics.items():
# qos is None if we were in process of subscribing
if qos is not None:
mqttc.subscribe(topic, qos)
def _mqtt_on_subscribe(mqttc, userdata, mid, granted_qos):
""" Called when subscribe successful. """
topic = userdata['progress'].pop(mid, None)
if topic is None:
return
userdata['topics'][topic] = granted_qos
def _mqtt_on_unsubscribe(mqttc, userdata, mid, granted_qos):
""" Called when subscribe successful. """
topic = userdata['progress'].pop(mid, None)
if topic is None:
return
userdata['topics'].pop(topic, None)
def _mqtt_on_disconnect(mqttc, userdata, result_code):
""" Called when being disconnected. """
# When disconnected because of calling disconnect()
if result_code == 0:
return
tries = 0
wait_time = 0
while True:
try:
if mqttc.reconnect() == 0:
_LOGGER.info('Successfully reconnected to the MQTT server')
break
except socket.error:
pass
wait_time = min(2**tries, MAX_RECONNECT_WAIT)
_LOGGER.warning(
'Disconnected from MQTT (%s). Trying to reconnect in %ss',
result_code, wait_time)
# It is ok to sleep here as we are in the MQTT thread.
time.sleep(wait_time)
tries += 1
def _raise_on_error(result):
""" Raise error if error result. """
if result != 0:
raise HomeAssistantError('Error talking to MQTT: {}'.format(result))
def _match_topic(subscription, topic):
""" Returns if topic matches subscription. """
if subscription.endswith('#'):
return (subscription[:-2] == topic or
topic.startswith(subscription[:-1]))
sub_parts = subscription.split('/')
topic_parts = topic.split('/')
return (len(sub_parts) == len(topic_parts) and
all(a == b for a, b in zip(sub_parts, topic_parts) if a != '+'))