hass-core/homeassistant/components/mqtt/__init__.py
Otto Winter b1c0cabe6c Fix MQTT retained message not being re-dispatched (#12004)
* Fix MQTT retained message not being re-dispatched

* Fix tests

* Use paho-mqtt for retained messages

* Improve code style

* Store list of subscribers

* Fix lint error

* Adhere to Home Assistant's logging standard

"Try to avoid brackets and additional quotes around the output to make it easier for users to parse the log."
 - https://home-assistant.io/developers/development_guidelines/

* Add reconnect tests

* Fix lint error

* Introduce Subscription

Tests still need to be updated

* Use namedtuple for MQTT messages

... And fix issues

Accessing the config manually at runtime isn't ideal

* Fix MQTT __init__.py tests

* Updated usage of Mocks
* Moved tests that were testing subscriptions out of the MQTTComponent test, because of how mock.patch was used
* Adjusted the remaining tests for the MQTT clients new behavior - e.g. self.progress was removed
* Updated the async_fire_mqtt_message helper

*  Update MQTT tests

* Re-introduce the MQTT subscriptions through the dispatcher for tests - quite ugly though...  🚧
* Update fixtures to use our new MQTT mock 🎨

* 📝 Update base code according to comments

* 🔨 Adjust MQTT test base

* 🔨 Update other MQTT tests

* 🍎 Fix carriage return in source files

Apparently test_mqtt_json.py and test_mqtt_template.py were written on Windows. In order to not mess up the diff, I'll just redo the carriage return.

* 🎨 Remove unused import

* 📝 Remove fire_mqtt_client_message

* 🐛 Fix using python 3.6 method

What's very interesting is that 3.4 didn't fail on travis...

* 🐛 Fix using assert directly
2018-02-11 09:17:58 -08:00

705 lines
24 KiB
Python

"""
Support for MQTT message handling.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/mqtt/
"""
import asyncio
from collections import namedtuple
from itertools import groupby
from typing import Optional
from operator import attrgetter
import logging
import os
import socket
import time
import ssl
import re
import requests.certs
import voluptuous as vol
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.core import callback
from homeassistant.setup import async_prepare_setup_platform
from homeassistant.exceptions import HomeAssistantError
from homeassistant.loader import bind_hass
from homeassistant.helpers import template, ConfigType, config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.util.async import (
run_coroutine_threadsafe, run_callback_threadsafe)
from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP, CONF_VALUE_TEMPLATE, CONF_USERNAME,
CONF_PASSWORD, CONF_PORT, CONF_PROTOCOL, CONF_PAYLOAD)
from homeassistant.components.mqtt.server import HBMQTT_CONFIG_SCHEMA
REQUIREMENTS = ['paho-mqtt==1.3.1']
_LOGGER = logging.getLogger(__name__)
DOMAIN = 'mqtt'
DATA_MQTT = 'mqtt'
SERVICE_PUBLISH = 'publish'
CONF_EMBEDDED = 'embedded'
CONF_BROKER = 'broker'
CONF_CLIENT_ID = 'client_id'
CONF_DISCOVERY = 'discovery'
CONF_DISCOVERY_PREFIX = 'discovery_prefix'
CONF_KEEPALIVE = 'keepalive'
CONF_CERTIFICATE = 'certificate'
CONF_CLIENT_KEY = 'client_key'
CONF_CLIENT_CERT = 'client_cert'
CONF_TLS_INSECURE = 'tls_insecure'
CONF_TLS_VERSION = 'tls_version'
CONF_BIRTH_MESSAGE = 'birth_message'
CONF_WILL_MESSAGE = 'will_message'
CONF_STATE_TOPIC = 'state_topic'
CONF_COMMAND_TOPIC = 'command_topic'
CONF_AVAILABILITY_TOPIC = 'availability_topic'
CONF_PAYLOAD_AVAILABLE = 'payload_available'
CONF_PAYLOAD_NOT_AVAILABLE = 'payload_not_available'
CONF_QOS = 'qos'
CONF_RETAIN = 'retain'
PROTOCOL_31 = '3.1'
PROTOCOL_311 = '3.1.1'
DEFAULT_PORT = 1883
DEFAULT_KEEPALIVE = 60
DEFAULT_QOS = 0
DEFAULT_RETAIN = False
DEFAULT_PROTOCOL = PROTOCOL_311
DEFAULT_DISCOVERY = False
DEFAULT_DISCOVERY_PREFIX = 'homeassistant'
DEFAULT_TLS_PROTOCOL = 'auto'
DEFAULT_PAYLOAD_AVAILABLE = 'online'
DEFAULT_PAYLOAD_NOT_AVAILABLE = 'offline'
ATTR_TOPIC = 'topic'
ATTR_PAYLOAD = 'payload'
ATTR_PAYLOAD_TEMPLATE = 'payload_template'
ATTR_QOS = CONF_QOS
ATTR_RETAIN = CONF_RETAIN
MAX_RECONNECT_WAIT = 300 # seconds
def valid_subscribe_topic(value, invalid_chars='\0'):
"""Validate that we can subscribe using this MQTT topic."""
value = cv.string(value)
if all(c not in value for c in invalid_chars):
return vol.Length(min=1, max=65535)(value)
raise vol.Invalid('Invalid MQTT topic name')
def valid_publish_topic(value):
"""Validate that we can publish using this MQTT topic."""
return valid_subscribe_topic(value, invalid_chars='#+\0')
def valid_discovery_topic(value):
"""Validate a discovery topic."""
return valid_subscribe_topic(value, invalid_chars='#+\0/')
_VALID_QOS_SCHEMA = vol.All(vol.Coerce(int), vol.In([0, 1, 2]))
CLIENT_KEY_AUTH_MSG = 'client_key and client_cert must both be present in ' \
'the MQTT broker configuration'
MQTT_WILL_BIRTH_SCHEMA = vol.Schema({
vol.Required(ATTR_TOPIC): valid_publish_topic,
vol.Required(ATTR_PAYLOAD, CONF_PAYLOAD): cv.string,
vol.Optional(ATTR_QOS, default=DEFAULT_QOS): _VALID_QOS_SCHEMA,
vol.Optional(ATTR_RETAIN, default=DEFAULT_RETAIN): cv.boolean,
}, required=True)
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Optional(CONF_CLIENT_ID): cv.string,
vol.Optional(CONF_KEEPALIVE, default=DEFAULT_KEEPALIVE):
vol.All(vol.Coerce(int), vol.Range(min=15)),
vol.Optional(CONF_BROKER): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_USERNAME): cv.string,
vol.Optional(CONF_PASSWORD): cv.string,
vol.Optional(CONF_CERTIFICATE): vol.Any('auto', cv.isfile),
vol.Inclusive(CONF_CLIENT_KEY, 'client_key_auth',
msg=CLIENT_KEY_AUTH_MSG): cv.isfile,
vol.Inclusive(CONF_CLIENT_CERT, 'client_key_auth',
msg=CLIENT_KEY_AUTH_MSG): cv.isfile,
vol.Optional(CONF_TLS_INSECURE): cv.boolean,
vol.Optional(CONF_TLS_VERSION, default=DEFAULT_TLS_PROTOCOL):
vol.Any('auto', '1.0', '1.1', '1.2'),
vol.Optional(CONF_PROTOCOL, default=DEFAULT_PROTOCOL):
vol.All(cv.string, vol.In([PROTOCOL_31, PROTOCOL_311])),
vol.Optional(CONF_EMBEDDED): HBMQTT_CONFIG_SCHEMA,
vol.Optional(CONF_WILL_MESSAGE): MQTT_WILL_BIRTH_SCHEMA,
vol.Optional(CONF_BIRTH_MESSAGE): MQTT_WILL_BIRTH_SCHEMA,
vol.Optional(CONF_DISCOVERY, default=DEFAULT_DISCOVERY): cv.boolean,
vol.Optional(CONF_DISCOVERY_PREFIX,
default=DEFAULT_DISCOVERY_PREFIX): valid_discovery_topic,
}),
}, extra=vol.ALLOW_EXTRA)
SCHEMA_BASE = {
vol.Optional(CONF_QOS, default=DEFAULT_QOS): _VALID_QOS_SCHEMA,
}
MQTT_AVAILABILITY_SCHEMA = vol.Schema({
vol.Optional(CONF_AVAILABILITY_TOPIC): valid_subscribe_topic,
vol.Optional(CONF_PAYLOAD_AVAILABLE,
default=DEFAULT_PAYLOAD_AVAILABLE): cv.string,
vol.Optional(CONF_PAYLOAD_NOT_AVAILABLE,
default=DEFAULT_PAYLOAD_NOT_AVAILABLE): cv.string,
})
MQTT_BASE_PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend(SCHEMA_BASE)
# Sensor type platforms subscribe to MQTT events
MQTT_RO_PLATFORM_SCHEMA = MQTT_BASE_PLATFORM_SCHEMA.extend({
vol.Required(CONF_STATE_TOPIC): valid_subscribe_topic,
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
})
# Switch type platforms publish to MQTT and may subscribe
MQTT_RW_PLATFORM_SCHEMA = MQTT_BASE_PLATFORM_SCHEMA.extend({
vol.Required(CONF_COMMAND_TOPIC): valid_publish_topic,
vol.Optional(CONF_RETAIN, default=DEFAULT_RETAIN): cv.boolean,
vol.Optional(CONF_STATE_TOPIC): valid_subscribe_topic,
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
})
# Service call validation schema
MQTT_PUBLISH_SCHEMA = vol.Schema({
vol.Required(ATTR_TOPIC): valid_publish_topic,
vol.Exclusive(ATTR_PAYLOAD, CONF_PAYLOAD): object,
vol.Exclusive(ATTR_PAYLOAD_TEMPLATE, CONF_PAYLOAD): cv.string,
vol.Optional(ATTR_QOS, default=DEFAULT_QOS): _VALID_QOS_SCHEMA,
vol.Optional(ATTR_RETAIN, default=DEFAULT_RETAIN): cv.boolean,
}, required=True)
def _build_publish_data(topic, qos, retain):
"""Build the arguments for the publish service without the payload."""
data = {ATTR_TOPIC: topic}
if qos is not None:
data[ATTR_QOS] = qos
if retain is not None:
data[ATTR_RETAIN] = retain
return data
@bind_hass
def publish(hass, topic, payload, qos=None, retain=None):
"""Publish message to an MQTT topic."""
hass.add_job(async_publish, hass, topic, payload, qos, retain)
@callback
@bind_hass
def async_publish(hass, topic, payload, qos=None, retain=None):
"""Publish message to an MQTT topic."""
data = _build_publish_data(topic, qos, retain)
data[ATTR_PAYLOAD] = payload
hass.async_add_job(hass.services.async_call(DOMAIN, SERVICE_PUBLISH, data))
@bind_hass
def publish_template(hass, topic, payload_template, qos=None, retain=None):
"""Publish message to an MQTT topic using a template payload."""
data = _build_publish_data(topic, qos, retain)
data[ATTR_PAYLOAD_TEMPLATE] = payload_template
hass.services.call(DOMAIN, SERVICE_PUBLISH, data)
@asyncio.coroutine
@bind_hass
def async_subscribe(hass, topic, msg_callback, qos=DEFAULT_QOS,
encoding='utf-8'):
"""Subscribe to an MQTT topic.
Call the return value to unsubscribe.
"""
async_remove = \
yield from hass.data[DATA_MQTT].async_subscribe(topic, msg_callback,
qos, encoding)
return async_remove
@bind_hass
def subscribe(hass, topic, msg_callback, qos=DEFAULT_QOS,
encoding='utf-8'):
"""Subscribe to an MQTT topic."""
async_remove = run_coroutine_threadsafe(
async_subscribe(hass, topic, msg_callback, qos, encoding), hass.loop
).result()
def remove():
"""Remove listener convert."""
run_callback_threadsafe(hass.loop, async_remove).result()
return remove
@asyncio.coroutine
def _async_setup_server(hass, config):
"""Try to start embedded MQTT broker.
This method is a coroutine.
"""
conf = config.get(DOMAIN, {})
server = yield from async_prepare_setup_platform(
hass, config, DOMAIN, 'server')
if server is None:
_LOGGER.error("Unable to load embedded server")
return None
success, broker_config = \
yield from server.async_start(hass, conf.get(CONF_EMBEDDED))
return success and broker_config
@asyncio.coroutine
def _async_setup_discovery(hass, config):
"""Try to start the discovery of MQTT devices.
This method is a coroutine.
"""
conf = config.get(DOMAIN, {})
discovery = yield from async_prepare_setup_platform(
hass, config, DOMAIN, 'discovery')
if discovery is None:
_LOGGER.error("Unable to load MQTT discovery")
return None
success = yield from discovery.async_start(
hass, conf[CONF_DISCOVERY_PREFIX], config)
return success
@asyncio.coroutine
def async_setup(hass: HomeAssistantType, config: ConfigType):
"""Start the MQTT protocol service."""
conf = config.get(DOMAIN)
if conf is None:
conf = CONFIG_SCHEMA({DOMAIN: {}})[DOMAIN]
client_id = conf.get(CONF_CLIENT_ID)
keepalive = conf.get(CONF_KEEPALIVE)
# Only setup if embedded config passed in or no broker specified
if CONF_EMBEDDED not in conf and CONF_BROKER in conf:
broker_config = None
else:
broker_config = yield from _async_setup_server(hass, config)
if CONF_BROKER in conf:
broker = conf[CONF_BROKER]
port = conf[CONF_PORT]
username = conf.get(CONF_USERNAME)
password = conf.get(CONF_PASSWORD)
certificate = conf.get(CONF_CERTIFICATE)
client_key = conf.get(CONF_CLIENT_KEY)
client_cert = conf.get(CONF_CLIENT_CERT)
tls_insecure = conf.get(CONF_TLS_INSECURE)
protocol = conf[CONF_PROTOCOL]
elif broker_config:
# If no broker passed in, auto config to internal server
broker, port, username, password, certificate, protocol = broker_config
# Embedded broker doesn't have some ssl variables
client_key, client_cert, tls_insecure = None, None, None
# hbmqtt requires a client id to be set.
if client_id is None:
client_id = 'home-assistant'
else:
err = "Unable to start MQTT broker."
if conf.get(CONF_EMBEDDED) is not None:
# Explicit embedded config, requires explicit broker config
err += " (Broker configuration required.)"
_LOGGER.error(err)
return False
# For cloudmqtt.com, secured connection, auto fill in certificate
if (certificate is None and 19999 < port < 30000 and
broker.endswith('.cloudmqtt.com')):
certificate = os.path.join(os.path.dirname(__file__),
'addtrustexternalcaroot.crt')
# When the certificate is set to auto, use bundled certs from requests
if certificate == 'auto':
certificate = requests.certs.where()
will_message = None
if conf.get(CONF_WILL_MESSAGE) is not None:
will_message = Message(**conf.get(CONF_WILL_MESSAGE))
birth_message = None
if conf.get(CONF_BIRTH_MESSAGE) is not None:
birth_message = Message(**conf.get(CONF_BIRTH_MESSAGE))
# Be able to override versions other than TLSv1.0 under Python3.6
conf_tls_version = conf.get(CONF_TLS_VERSION)
if conf_tls_version == '1.2':
tls_version = ssl.PROTOCOL_TLSv1_2
elif conf_tls_version == '1.1':
tls_version = ssl.PROTOCOL_TLSv1_1
elif conf_tls_version == '1.0':
tls_version = ssl.PROTOCOL_TLSv1
else:
import sys
# Python3.6 supports automatic negotiation of highest TLS version
if sys.hexversion >= 0x03060000:
tls_version = ssl.PROTOCOL_TLS # pylint: disable=no-member
else:
tls_version = ssl.PROTOCOL_TLSv1
try:
hass.data[DATA_MQTT] = MQTT(
hass, broker, port, client_id, keepalive, username, password,
certificate, client_key, client_cert, tls_insecure, protocol,
will_message, birth_message, tls_version)
except socket.error:
_LOGGER.exception("Can't connect to the broker. "
"Please check your settings and the broker itself")
return False
@asyncio.coroutine
def async_stop_mqtt(event):
"""Stop MQTT component."""
yield from hass.data[DATA_MQTT].async_disconnect()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_stop_mqtt)
success = yield from hass.data[DATA_MQTT].async_connect()
if not success:
return False
@asyncio.coroutine
def async_publish_service(call):
"""Handle MQTT publish service calls."""
msg_topic = call.data[ATTR_TOPIC]
payload = call.data.get(ATTR_PAYLOAD)
payload_template = call.data.get(ATTR_PAYLOAD_TEMPLATE)
qos = call.data[ATTR_QOS]
retain = call.data[ATTR_RETAIN]
if payload_template is not None:
try:
payload = \
template.Template(payload_template, hass).async_render()
except template.jinja2.TemplateError as exc:
_LOGGER.error(
"Unable to publish to %s: rendering payload template of "
"%s failed because %s",
msg_topic, payload_template, exc)
return
yield from hass.data[DATA_MQTT].async_publish(
msg_topic, payload, qos, retain)
hass.services.async_register(
DOMAIN, SERVICE_PUBLISH, async_publish_service,
schema=MQTT_PUBLISH_SCHEMA)
if conf.get(CONF_DISCOVERY):
yield from _async_setup_discovery(hass, config)
return True
Subscription = namedtuple('Subscription',
['topic', 'callback', 'qos', 'encoding'])
Subscription.__new__.__defaults__ = (0, 'utf-8')
Message = namedtuple('Message', ['topic', 'payload', 'qos', 'retain'])
Message.__new__.__defaults__ = (0, False)
class MQTT(object):
"""Home Assistant MQTT client."""
def __init__(self, hass, broker, port, client_id, keepalive, username,
password, certificate, client_key, client_cert,
tls_insecure, protocol, will_message: Optional[Message],
birth_message: Optional[Message], tls_version):
"""Initialize Home Assistant MQTT client."""
import paho.mqtt.client as mqtt
self.hass = hass
self.broker = broker
self.port = port
self.keepalive = keepalive
self.subscriptions = []
self.birth_message = birth_message
self._mqttc = None
self._paho_lock = asyncio.Lock(loop=hass.loop)
if protocol == PROTOCOL_31:
proto = mqtt.MQTTv31
else:
proto = mqtt.MQTTv311
if client_id is None:
self._mqttc = mqtt.Client(protocol=proto)
else:
self._mqttc = mqtt.Client(client_id, protocol=proto)
if username is not None:
self._mqttc.username_pw_set(username, password)
if certificate is not None:
self._mqttc.tls_set(
certificate, certfile=client_cert,
keyfile=client_key, tls_version=tls_version)
if tls_insecure is not None:
self._mqttc.tls_insecure_set(tls_insecure)
self._mqttc.on_connect = self._mqtt_on_connect
self._mqttc.on_disconnect = self._mqtt_on_disconnect
self._mqttc.on_message = self._mqtt_on_message
if will_message:
self._mqttc.will_set(*will_message)
@asyncio.coroutine
def async_publish(self, topic, payload, qos, retain):
"""Publish a MQTT message.
This method must be run in the event loop and returns a coroutine.
"""
with (yield from self._paho_lock):
yield from self.hass.async_add_job(
self._mqttc.publish, topic, payload, qos, retain)
@asyncio.coroutine
def async_connect(self):
"""Connect to the host. Does process messages yet.
This method is a coroutine.
"""
result = yield from self.hass.async_add_job(
self._mqttc.connect, self.broker, self.port, self.keepalive)
if result != 0:
import paho.mqtt.client as mqtt
_LOGGER.error('Failed to connect: %s', mqtt.error_string(result))
else:
self._mqttc.loop_start()
return not result
def async_disconnect(self):
"""Stop the MQTT client.
This method must be run in the event loop and returns a coroutine.
"""
def stop():
"""Stop the MQTT client."""
self._mqttc.disconnect()
self._mqttc.loop_stop()
return self.hass.async_add_job(stop)
@asyncio.coroutine
def async_subscribe(self, topic, msg_callback, qos, encoding):
"""Set up a subscription to a topic with the provided qos.
This method is a coroutine.
"""
if not isinstance(topic, str):
raise HomeAssistantError("topic needs to be a string!")
subscription = Subscription(topic, msg_callback, qos, encoding)
self.subscriptions.append(subscription)
yield from self._async_perform_subscription(topic, qos)
@callback
def async_remove():
"""Remove subscription."""
if subscription not in self.subscriptions:
raise HomeAssistantError("Can't remove subscription twice")
self.subscriptions.remove(subscription)
if any(other.topic == topic for other in self.subscriptions):
# Other subscriptions on topic remaining - don't unsubscribe.
return
self.hass.async_add_job(self._async_unsubscribe(topic))
return async_remove
@asyncio.coroutine
def _async_unsubscribe(self, topic):
"""Unsubscribe from a topic.
This method is a coroutine.
"""
with (yield from self._paho_lock):
result, _ = yield from self.hass.async_add_job(
self._mqttc.unsubscribe, topic)
_raise_on_error(result)
@asyncio.coroutine
def _async_perform_subscription(self, topic, qos):
"""Perform a paho-mqtt subscription."""
_LOGGER.debug("Subscribing to %s", topic)
with (yield from self._paho_lock):
result, _ = yield from self.hass.async_add_job(
self._mqttc.subscribe, topic, qos)
_raise_on_error(result)
def _mqtt_on_connect(self, _mqttc, _userdata, _flags, result_code):
"""On connect callback.
Resubscribe to all topics we were subscribed to and publish birth
message.
"""
import paho.mqtt.client as mqtt
if result_code != mqtt.CONNACK_ACCEPTED:
_LOGGER.error('Unable to connect to the MQTT broker: %s',
mqtt.connack_string(result_code))
self._mqttc.disconnect()
return
# Group subscriptions to only re-subscribe once for each topic.
keyfunc = attrgetter('topic')
for topic, subs in groupby(sorted(self.subscriptions, key=keyfunc),
keyfunc):
# Re-subscribe with the highest requested qos
max_qos = max(subscription.qos for subscription in subs)
self.hass.add_job(self._async_perform_subscription, topic, max_qos)
if self.birth_message:
self.hass.add_job(self.async_publish(*self.birth_message))
def _mqtt_on_message(self, _mqttc, _userdata, msg):
"""Message received callback."""
self.hass.async_add_job(self._mqtt_handle_message, msg)
@callback
def _mqtt_handle_message(self, msg):
_LOGGER.debug("Received message on %s: %s", msg.topic, msg.payload)
for subscription in self.subscriptions:
if not _match_topic(subscription.topic, msg.topic):
continue
payload = msg.payload
if subscription.encoding is not None:
try:
payload = msg.payload.decode(subscription.encoding)
except (AttributeError, UnicodeDecodeError):
_LOGGER.warning("Can't decode payload %s on %s "
"with encoding %s",
msg.payload, msg.topic,
subscription.encoding)
return
self.hass.async_run_job(subscription.callback,
msg.topic, payload, msg.qos)
def _mqtt_on_disconnect(self, _mqttc, _userdata, result_code):
"""Disconnected callback."""
# When disconnected because of calling disconnect()
if result_code == 0:
return
tries = 0
while True:
try:
if self._mqttc.reconnect() == 0:
_LOGGER.info("Successfully reconnected to the MQTT server")
break
except socket.error:
pass
wait_time = min(2**tries, MAX_RECONNECT_WAIT)
_LOGGER.warning(
"Disconnected from MQTT (%s). Trying to reconnect in %s s",
result_code, wait_time)
# It is ok to sleep here as we are in the MQTT thread.
time.sleep(wait_time)
tries += 1
def _raise_on_error(result):
"""Raise error if error result."""
if result != 0:
import paho.mqtt.client as mqtt
raise HomeAssistantError(
'Error talking to MQTT: {}'.format(mqtt.error_string(result)))
def _match_topic(subscription, topic):
"""Test if topic matches subscription."""
reg_ex_parts = []
suffix = ""
if subscription.endswith('#'):
subscription = subscription[:-2]
suffix = "(.*)"
sub_parts = subscription.split('/')
for sub_part in sub_parts:
if sub_part == "+":
reg_ex_parts.append(r"([^\/]+)")
else:
reg_ex_parts.append(re.escape(sub_part))
reg_ex = "^" + (r'\/'.join(reg_ex_parts)) + suffix + "$"
reg = re.compile(reg_ex)
return reg.match(topic) is not None
class MqttAvailability(Entity):
"""Mixin used for platforms that report availability."""
def __init__(self, availability_topic, qos, payload_available,
payload_not_available):
"""Initialize the availability mixin."""
self._availability_topic = availability_topic
self._availability_qos = qos
self._available = availability_topic is None
self._payload_available = payload_available
self._payload_not_available = payload_not_available
def async_added_to_hass(self):
"""Subscribe mqtt events.
This method must be run in the event loop and returns a coroutine.
"""
@callback
def availability_message_received(topic, payload, qos):
"""Handle a new received MQTT availability message."""
if payload == self._payload_available:
self._available = True
elif payload == self._payload_not_available:
self._available = False
self.async_schedule_update_ha_state()
if self._availability_topic is not None:
yield from async_subscribe(
self.hass, self._availability_topic,
availability_message_received, self._availability_qos)
@property
def available(self) -> bool:
"""Return if the device is available."""
return self._available