MQTT: Start embedded server if no config given
This commit is contained in:
parent
13d7f742a7
commit
8386bda4e4
5 changed files with 213 additions and 15 deletions
|
@ -10,11 +10,11 @@ import socket
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
from homeassistant.bootstrap import prepare_setup_platform
|
||||||
from homeassistant.config import load_yaml_config_file
|
from homeassistant.config import load_yaml_config_file
|
||||||
from homeassistant.exceptions import HomeAssistantError
|
from homeassistant.exceptions import HomeAssistantError
|
||||||
import homeassistant.util as util
|
import homeassistant.util as util
|
||||||
from homeassistant.helpers import template
|
from homeassistant.helpers import template
|
||||||
from homeassistant.helpers import validate_config
|
|
||||||
from homeassistant.const import (
|
from homeassistant.const import (
|
||||||
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP)
|
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP)
|
||||||
|
|
||||||
|
@ -29,6 +29,7 @@ EVENT_MQTT_MESSAGE_RECEIVED = 'mqtt_message_received'
|
||||||
|
|
||||||
REQUIREMENTS = ['paho-mqtt==1.1']
|
REQUIREMENTS = ['paho-mqtt==1.1']
|
||||||
|
|
||||||
|
CONF_EMBEDDED = 'embedded'
|
||||||
CONF_BROKER = 'broker'
|
CONF_BROKER = 'broker'
|
||||||
CONF_PORT = 'port'
|
CONF_PORT = 'port'
|
||||||
CONF_CLIENT_ID = 'client_id'
|
CONF_CLIENT_ID = 'client_id'
|
||||||
|
@ -92,21 +93,50 @@ def subscribe(hass, topic, callback, qos=DEFAULT_QOS):
|
||||||
MQTT_CLIENT.subscribe(topic, qos)
|
MQTT_CLIENT.subscribe(topic, qos)
|
||||||
|
|
||||||
|
|
||||||
|
def _setup_server(hass, config):
|
||||||
|
"""Try to start embedded MQTT broker."""
|
||||||
|
conf = config.get(DOMAIN, {})
|
||||||
|
|
||||||
|
# Only setup if embedded config passed in or no broker specified
|
||||||
|
if CONF_EMBEDDED not in conf and CONF_BROKER in conf:
|
||||||
|
return None
|
||||||
|
|
||||||
|
server = prepare_setup_platform(hass, config, DOMAIN, 'server')
|
||||||
|
|
||||||
|
if server is None:
|
||||||
|
_LOGGER.error('Unable to load embedded server.')
|
||||||
|
return None
|
||||||
|
|
||||||
|
success, broker_config = server.start(hass, conf.get(CONF_EMBEDDED))
|
||||||
|
|
||||||
|
return success and broker_config
|
||||||
|
|
||||||
|
|
||||||
def setup(hass, config):
|
def setup(hass, config):
|
||||||
"""Start the MQTT protocol service."""
|
"""Start the MQTT protocol service."""
|
||||||
if not validate_config(config, {DOMAIN: ['broker']}, _LOGGER):
|
# pylint: disable=too-many-locals
|
||||||
return False
|
conf = config.get(DOMAIN, {})
|
||||||
|
|
||||||
conf = config[DOMAIN]
|
|
||||||
|
|
||||||
broker = conf[CONF_BROKER]
|
|
||||||
port = util.convert(conf.get(CONF_PORT), int, DEFAULT_PORT)
|
|
||||||
client_id = util.convert(conf.get(CONF_CLIENT_ID), str)
|
client_id = util.convert(conf.get(CONF_CLIENT_ID), str)
|
||||||
keepalive = util.convert(conf.get(CONF_KEEPALIVE), int, DEFAULT_KEEPALIVE)
|
keepalive = util.convert(conf.get(CONF_KEEPALIVE), int, DEFAULT_KEEPALIVE)
|
||||||
username = util.convert(conf.get(CONF_USERNAME), str)
|
|
||||||
password = util.convert(conf.get(CONF_PASSWORD), str)
|
broker_config = _setup_server(hass, config)
|
||||||
certificate = util.convert(conf.get(CONF_CERTIFICATE), str)
|
|
||||||
protocol = util.convert(conf.get(CONF_PROTOCOL), str, DEFAULT_PROTOCOL)
|
# Only auto config if no server config was passed in
|
||||||
|
if broker_config and CONF_EMBEDDED not in conf:
|
||||||
|
broker, port, username, password, certificate, protocol = broker_config
|
||||||
|
elif not broker_config and (CONF_EMBEDDED in conf or
|
||||||
|
CONF_BROKER not in conf):
|
||||||
|
_LOGGER.error('Unable to start broker and auto-configure MQTT.')
|
||||||
|
return False
|
||||||
|
|
||||||
|
if CONF_BROKER in conf:
|
||||||
|
broker = conf[CONF_BROKER]
|
||||||
|
port = util.convert(conf.get(CONF_PORT), int, DEFAULT_PORT)
|
||||||
|
username = util.convert(conf.get(CONF_USERNAME), str)
|
||||||
|
password = util.convert(conf.get(CONF_PASSWORD), str)
|
||||||
|
certificate = util.convert(conf.get(CONF_CERTIFICATE), str)
|
||||||
|
protocol = util.convert(conf.get(CONF_PROTOCOL), str, DEFAULT_PROTOCOL)
|
||||||
|
|
||||||
if protocol not in (PROTOCOL_31, PROTOCOL_311):
|
if protocol not in (PROTOCOL_31, PROTOCOL_311):
|
||||||
_LOGGER.error('Invalid protocol specified: %s. Allowed values: %s, %s',
|
_LOGGER.error('Invalid protocol specified: %s. Allowed values: %s, %s',
|
||||||
|
|
114
homeassistant/components/mqtt/server.py
Normal file
114
homeassistant/components/mqtt/server.py
Normal file
|
@ -0,0 +1,114 @@
|
||||||
|
"""MQTT server."""
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import tempfile
|
||||||
|
import threading
|
||||||
|
|
||||||
|
from homeassistant.components.mqtt import PROTOCOL_311
|
||||||
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
||||||
|
|
||||||
|
REQUIREMENTS = ['hbmqtt==0.6.3']
|
||||||
|
DEPENDENCIES = ['http']
|
||||||
|
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def broker_coro(loop, config):
|
||||||
|
"""Start broker coroutine."""
|
||||||
|
from hbmqtt.broker import Broker
|
||||||
|
broker = Broker(config, loop)
|
||||||
|
yield from broker.start()
|
||||||
|
return broker
|
||||||
|
|
||||||
|
|
||||||
|
def loop_run(loop, broker, shutdown_complete):
|
||||||
|
"""Run broker and clean up when done."""
|
||||||
|
loop.run_forever()
|
||||||
|
# run_forever ends when stop is called because we're shutting down
|
||||||
|
loop.run_until_complete(broker.shutdown())
|
||||||
|
loop.close()
|
||||||
|
shutdown_complete.set()
|
||||||
|
|
||||||
|
|
||||||
|
def start(hass, server_config):
|
||||||
|
"""Initialize MQTT Server."""
|
||||||
|
from hbmqtt.broker import BrokerException
|
||||||
|
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
|
||||||
|
try:
|
||||||
|
passwd = tempfile.NamedTemporaryFile()
|
||||||
|
|
||||||
|
if server_config is None:
|
||||||
|
server_config, client_config = generate_config(hass, passwd)
|
||||||
|
else:
|
||||||
|
client_config = None
|
||||||
|
|
||||||
|
start_server = asyncio.gather(broker_coro(loop, server_config),
|
||||||
|
loop=loop)
|
||||||
|
loop.run_until_complete(start_server)
|
||||||
|
# Result raises exception if one was raised during startup
|
||||||
|
broker = start_server.result()[0]
|
||||||
|
except BrokerException:
|
||||||
|
logging.getLogger(__name__).exception('Error initializing MQTT server')
|
||||||
|
loop.close()
|
||||||
|
return False, None
|
||||||
|
finally:
|
||||||
|
passwd.close()
|
||||||
|
|
||||||
|
shutdown_complete = threading.Event()
|
||||||
|
|
||||||
|
def shutdown(event):
|
||||||
|
"""Gracefully shutdown MQTT broker."""
|
||||||
|
loop.call_soon_threadsafe(loop.stop)
|
||||||
|
shutdown_complete.wait()
|
||||||
|
|
||||||
|
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
|
||||||
|
|
||||||
|
threading.Thread(target=loop_run, args=(loop, broker, shutdown_complete),
|
||||||
|
name="MQTT-server").start()
|
||||||
|
|
||||||
|
return True, client_config
|
||||||
|
|
||||||
|
|
||||||
|
def generate_config(hass, passwd):
|
||||||
|
"""Generate a configuration based on current Home Assistant instance."""
|
||||||
|
config = {
|
||||||
|
'listeners': {
|
||||||
|
'default': {
|
||||||
|
'max-connections': 50000,
|
||||||
|
'bind': '0.0.0.0:1883',
|
||||||
|
'type': 'tcp',
|
||||||
|
},
|
||||||
|
'ws-1': {
|
||||||
|
'bind': '0.0.0.0:8080',
|
||||||
|
'type': 'ws',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
'auth': {
|
||||||
|
'allow-anonymous': hass.config.api.api_password is None
|
||||||
|
},
|
||||||
|
'plugins': ['auth_anonymous'],
|
||||||
|
}
|
||||||
|
|
||||||
|
if hass.config.api.api_password:
|
||||||
|
username = 'homeassistant'
|
||||||
|
password = hass.config.api.api_password
|
||||||
|
|
||||||
|
# Encrypt with what hbmqtt uses to verify
|
||||||
|
from passlib.apps import custom_app_context
|
||||||
|
|
||||||
|
passwd.write(
|
||||||
|
'homeassistant:{}\n'.format(
|
||||||
|
custom_app_context.encrypt(
|
||||||
|
hass.config.api.api_password)).encode('utf-8'))
|
||||||
|
passwd.flush()
|
||||||
|
|
||||||
|
config['auth']['password-file'] = passwd.name
|
||||||
|
config['plugins'].append('auth_file')
|
||||||
|
else:
|
||||||
|
username = None
|
||||||
|
password = None
|
||||||
|
|
||||||
|
client_config = ('localhost', 1883, username, password, None, PROTOCOL_311)
|
||||||
|
|
||||||
|
return config, client_config
|
|
@ -54,6 +54,9 @@ freesms==0.1.0
|
||||||
# homeassistant.components.conversation
|
# homeassistant.components.conversation
|
||||||
fuzzywuzzy==0.8.0
|
fuzzywuzzy==0.8.0
|
||||||
|
|
||||||
|
# homeassistant.components.mqtt.server
|
||||||
|
hbmqtt==0.6.3
|
||||||
|
|
||||||
# homeassistant.components.thermostat.heatmiser
|
# homeassistant.components.thermostat.heatmiser
|
||||||
heatmiserV3==0.9.1
|
heatmiserV3==0.9.1
|
||||||
|
|
||||||
|
|
|
@ -44,10 +44,6 @@ class TestMQTT(unittest.TestCase):
|
||||||
self.hass.pool.block_till_done()
|
self.hass.pool.block_till_done()
|
||||||
self.assertTrue(mqtt.MQTT_CLIENT.stop.called)
|
self.assertTrue(mqtt.MQTT_CLIENT.stop.called)
|
||||||
|
|
||||||
def test_setup_fails_if_no_broker_config(self):
|
|
||||||
"""Test for setup failure if broker configuration is missing."""
|
|
||||||
self.assertFalse(mqtt.setup(self.hass, {mqtt.DOMAIN: {}}))
|
|
||||||
|
|
||||||
def test_setup_fails_if_no_connect_broker(self):
|
def test_setup_fails_if_no_connect_broker(self):
|
||||||
"""Test for setup failure if connection to broker is missing."""
|
"""Test for setup failure if connection to broker is missing."""
|
||||||
with mock.patch('homeassistant.components.mqtt.MQTT',
|
with mock.patch('homeassistant.components.mqtt.MQTT',
|
55
tests/components/mqtt/test_server.py
Normal file
55
tests/components/mqtt/test_server.py
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
"""The tests for the MQTT component embedded server."""
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import homeassistant.components.mqtt as mqtt
|
||||||
|
|
||||||
|
from tests.common import get_test_home_assistant
|
||||||
|
|
||||||
|
|
||||||
|
class TestMQTT:
|
||||||
|
"""Test the MQTT component."""
|
||||||
|
|
||||||
|
def setup_method(self, method):
|
||||||
|
"""Setup things to be run when tests are started."""
|
||||||
|
self.hass = get_test_home_assistant()
|
||||||
|
|
||||||
|
def teardown_method(self, method):
|
||||||
|
"""Stop everything that was started."""
|
||||||
|
self.hass.stop()
|
||||||
|
|
||||||
|
@patch('homeassistant.components.mqtt.MQTT')
|
||||||
|
@patch('asyncio.gather')
|
||||||
|
@patch('asyncio.new_event_loop')
|
||||||
|
def test_creating_config_with_http_pass(self, mock_new_loop, mock_gather,
|
||||||
|
mock_mqtt):
|
||||||
|
"""Test if the MQTT server gets started and subscribe/publish msg."""
|
||||||
|
self.hass.config.components.append('http')
|
||||||
|
password = 'super_secret'
|
||||||
|
|
||||||
|
self.hass.config.api = MagicMock(api_password=password)
|
||||||
|
assert mqtt.setup(self.hass, {})
|
||||||
|
assert mock_mqtt.called
|
||||||
|
assert mock_mqtt.mock_calls[0][1][5] == 'homeassistant'
|
||||||
|
assert mock_mqtt.mock_calls[0][1][6] == password
|
||||||
|
|
||||||
|
mock_mqtt.reset_mock()
|
||||||
|
|
||||||
|
self.hass.config.api = MagicMock(api_password=None)
|
||||||
|
assert mqtt.setup(self.hass, {})
|
||||||
|
assert mock_mqtt.called
|
||||||
|
assert mock_mqtt.mock_calls[0][1][5] is None
|
||||||
|
assert mock_mqtt.mock_calls[0][1][6] is None
|
||||||
|
|
||||||
|
@patch('asyncio.gather')
|
||||||
|
@patch('asyncio.new_event_loop')
|
||||||
|
def test_broker_config_fails(self, mock_new_loop, mock_gather):
|
||||||
|
"""Test if the MQTT component fails if server fails."""
|
||||||
|
self.hass.config.components.append('http')
|
||||||
|
from hbmqtt.broker import BrokerException
|
||||||
|
|
||||||
|
mock_gather.side_effect = BrokerException
|
||||||
|
|
||||||
|
self.hass.config.api = MagicMock(api_password=None)
|
||||||
|
assert not mqtt.setup(self.hass, {
|
||||||
|
'mqtt': {'embedded': {}}
|
||||||
|
})
|
Loading…
Add table
Reference in a new issue