Mqtt fixes (#6116)
* Fix MQTT stop * MQTT: Make sure to have connection setup at end of MQTT setup. * Fix MQTT connect
This commit is contained in:
parent
b969fea900
commit
25408941de
5 changed files with 100 additions and 63 deletions
|
@ -226,10 +226,6 @@ def _async_setup_server(hass, config):
|
||||||
"""
|
"""
|
||||||
conf = config.get(DOMAIN, {})
|
conf = config.get(DOMAIN, {})
|
||||||
|
|
||||||
# Only setup if embedded config passed in or no broker specified
|
|
||||||
if CONF_EMBEDDED not in conf and CONF_BROKER in conf:
|
|
||||||
return None
|
|
||||||
|
|
||||||
server = yield from async_prepare_setup_platform(
|
server = yield from async_prepare_setup_platform(
|
||||||
hass, config, DOMAIN, 'server')
|
hass, config, DOMAIN, 'server')
|
||||||
|
|
||||||
|
@ -272,7 +268,11 @@ def async_setup(hass, config):
|
||||||
client_id = conf.get(CONF_CLIENT_ID)
|
client_id = conf.get(CONF_CLIENT_ID)
|
||||||
keepalive = conf.get(CONF_KEEPALIVE)
|
keepalive = conf.get(CONF_KEEPALIVE)
|
||||||
|
|
||||||
broker_config = yield from _async_setup_server(hass, config)
|
# Only setup if embedded config passed in or no broker specified
|
||||||
|
if CONF_EMBEDDED not in conf and CONF_BROKER in conf:
|
||||||
|
broker_config = None
|
||||||
|
else:
|
||||||
|
broker_config = yield from _async_setup_server(hass, config)
|
||||||
|
|
||||||
if CONF_BROKER in conf:
|
if CONF_BROKER in conf:
|
||||||
broker = conf[CONF_BROKER]
|
broker = conf[CONF_BROKER]
|
||||||
|
@ -329,6 +329,11 @@ def async_setup(hass, config):
|
||||||
|
|
||||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, async_start_mqtt)
|
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, async_start_mqtt)
|
||||||
|
|
||||||
|
success = yield from hass.data[DATA_MQTT].async_connect()
|
||||||
|
|
||||||
|
if not success:
|
||||||
|
return False
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def async_publish_service(call):
|
def async_publish_service(call):
|
||||||
"""Handle MQTT publish service calls."""
|
"""Handle MQTT publish service calls."""
|
||||||
|
@ -375,6 +380,9 @@ class MQTT(object):
|
||||||
import paho.mqtt.client as mqtt
|
import paho.mqtt.client as mqtt
|
||||||
|
|
||||||
self.hass = hass
|
self.hass = hass
|
||||||
|
self.broker = broker
|
||||||
|
self.port = port
|
||||||
|
self.keepalive = keepalive
|
||||||
self.topics = {}
|
self.topics = {}
|
||||||
self.progress = {}
|
self.progress = {}
|
||||||
self.birth_message = birth_message
|
self.birth_message = birth_message
|
||||||
|
@ -412,8 +420,6 @@ class MQTT(object):
|
||||||
will_message.get(ATTR_QOS),
|
will_message.get(ATTR_QOS),
|
||||||
will_message.get(ATTR_RETAIN))
|
will_message.get(ATTR_RETAIN))
|
||||||
|
|
||||||
self._mqttc.connect_async(broker, port, keepalive)
|
|
||||||
|
|
||||||
def async_publish(self, topic, payload, qos, retain):
|
def async_publish(self, topic, payload, qos, retain):
|
||||||
"""Publish a MQTT message.
|
"""Publish a MQTT message.
|
||||||
|
|
||||||
|
@ -422,6 +428,21 @@ class MQTT(object):
|
||||||
return self.hass.loop.run_in_executor(
|
return self.hass.loop.run_in_executor(
|
||||||
None, self._mqttc.publish, topic, payload, qos, retain)
|
None, self._mqttc.publish, topic, payload, qos, retain)
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def async_connect(self):
|
||||||
|
"""Connect to the host. Does not process messages yet.
|
||||||
|
|
||||||
|
This method must be run in the event loop and returns a coroutine.
|
||||||
|
"""
|
||||||
|
result = yield from self.hass.loop.run_in_executor(
|
||||||
|
None, self._mqttc.connect, self.broker, self.port, self.keepalive)
|
||||||
|
|
||||||
|
if result != 0:
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
_LOGGER.error('Failed to connect: %s', mqtt.error_string(result))
|
||||||
|
|
||||||
|
return not result
|
||||||
|
|
||||||
def async_start(self):
|
def async_start(self):
|
||||||
"""Run the MQTT client.
|
"""Run the MQTT client.
|
||||||
|
|
||||||
|
@ -434,7 +455,7 @@ class MQTT(object):
|
||||||
|
|
||||||
This method must be run in the event loop and returns a coroutine.
|
This method must be run in the event loop and returns a coroutine.
|
||||||
"""
|
"""
|
||||||
def stop(self):
|
def stop():
|
||||||
"""Stop the MQTT client."""
|
"""Stop the MQTT client."""
|
||||||
self._mqttc.disconnect()
|
self._mqttc.disconnect()
|
||||||
self._mqttc.loop_stop()
|
self._mqttc.loop_stop()
|
||||||
|
@ -477,14 +498,11 @@ class MQTT(object):
|
||||||
Resubscribe to all topics we were subscribed to and publish birth
|
Resubscribe to all topics we were subscribed to and publish birth
|
||||||
message.
|
message.
|
||||||
"""
|
"""
|
||||||
if result_code != 0:
|
import paho.mqtt.client as mqtt
|
||||||
_LOGGER.error('Unable to connect to the MQTT broker: %s', {
|
|
||||||
1: 'Incorrect protocol version',
|
if result_code != mqtt.CONNACK_ACCEPTED:
|
||||||
2: 'Invalid client identifier',
|
_LOGGER.error('Unable to connect to the MQTT broker: %s',
|
||||||
3: 'Server unavailable',
|
mqtt.connack_string(result_code))
|
||||||
4: 'Bad username or password',
|
|
||||||
5: 'Not authorised'
|
|
||||||
}.get(result_code, 'Unknown reason'))
|
|
||||||
self._mqttc.disconnect()
|
self._mqttc.disconnect()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -574,7 +592,10 @@ class MQTT(object):
|
||||||
def _raise_on_error(result):
|
def _raise_on_error(result):
|
||||||
"""Raise error if error result."""
|
"""Raise error if error result."""
|
||||||
if result != 0:
|
if result != 0:
|
||||||
raise HomeAssistantError('Error talking to MQTT: {}'.format(result))
|
import paho.mqtt.client as mqtt
|
||||||
|
|
||||||
|
raise HomeAssistantError(
|
||||||
|
'Error talking to MQTT: {}'.format(mqtt.error_string(result)))
|
||||||
|
|
||||||
|
|
||||||
def _match_topic(subscription, topic):
|
def _match_topic(subscription, topic):
|
||||||
|
|
|
@ -248,6 +248,7 @@ def mock_http_component_app(hass, api_password=None):
|
||||||
def mock_mqtt_component(hass):
|
def mock_mqtt_component(hass):
|
||||||
"""Mock the MQTT component."""
|
"""Mock the MQTT component."""
|
||||||
with patch('homeassistant.components.mqtt.MQTT') as mock_mqtt:
|
with patch('homeassistant.components.mqtt.MQTT') as mock_mqtt:
|
||||||
|
mock_mqtt().async_connect.return_value = mock_coro(True)
|
||||||
setup_component(hass, mqtt.DOMAIN, {
|
setup_component(hass, mqtt.DOMAIN, {
|
||||||
mqtt.DOMAIN: {
|
mqtt.DOMAIN: {
|
||||||
mqtt.CONF_BROKER: 'mock-broker',
|
mqtt.CONF_BROKER: 'mock-broker',
|
||||||
|
|
|
@ -27,9 +27,11 @@ def mock_mqtt_client(hass, config=None):
|
||||||
}
|
}
|
||||||
|
|
||||||
with mock.patch('paho.mqtt.client.Client') as mock_client:
|
with mock.patch('paho.mqtt.client.Client') as mock_client:
|
||||||
yield from async_setup_component(hass, mqtt.DOMAIN, {
|
mock_client().connect = lambda *args: 0
|
||||||
mqtt.DOMAIN: config
|
result = yield from async_setup_component(hass, mqtt.DOMAIN, {
|
||||||
})
|
mqtt.DOMAIN: config
|
||||||
|
})
|
||||||
|
assert result
|
||||||
return mock_client()
|
return mock_client()
|
||||||
|
|
||||||
|
|
||||||
|
@ -66,41 +68,6 @@ class TestMQTT(unittest.TestCase):
|
||||||
self.hass.block_till_done()
|
self.hass.block_till_done()
|
||||||
self.assertTrue(self.hass.data['mqtt'].async_stop.called)
|
self.assertTrue(self.hass.data['mqtt'].async_stop.called)
|
||||||
|
|
||||||
@mock.patch('paho.mqtt.client.Client')
|
|
||||||
def test_setup_fails_if_no_connect_broker(self, _):
|
|
||||||
"""Test for setup failure if connection to broker is missing."""
|
|
||||||
test_broker_cfg = {mqtt.DOMAIN: {mqtt.CONF_BROKER: 'test-broker'}}
|
|
||||||
|
|
||||||
with mock.patch('homeassistant.components.mqtt.MQTT',
|
|
||||||
side_effect=socket.error()):
|
|
||||||
self.hass.config.components = set()
|
|
||||||
assert not setup_component(self.hass, mqtt.DOMAIN, test_broker_cfg)
|
|
||||||
|
|
||||||
# Ensure if we dont raise it sets up correctly
|
|
||||||
self.hass.config.components = set()
|
|
||||||
assert setup_component(self.hass, mqtt.DOMAIN, test_broker_cfg)
|
|
||||||
|
|
||||||
@mock.patch('paho.mqtt.client.Client')
|
|
||||||
def test_setup_embedded(self, _):
|
|
||||||
"""Test setting up embedded server with no config."""
|
|
||||||
client_config = ('localhost', 1883, 'user', 'pass', None, '3.1.1')
|
|
||||||
|
|
||||||
with mock.patch('homeassistant.components.mqtt.server.async_start',
|
|
||||||
return_value=mock_coro(
|
|
||||||
return_value=(True, client_config))
|
|
||||||
) as _start:
|
|
||||||
self.hass.config.components = set()
|
|
||||||
assert setup_component(self.hass, mqtt.DOMAIN,
|
|
||||||
{mqtt.DOMAIN: {}})
|
|
||||||
assert _start.call_count == 1
|
|
||||||
|
|
||||||
# Test with `embedded: None`
|
|
||||||
_start.return_value = mock_coro(return_value=(True, client_config))
|
|
||||||
self.hass.config.components = set()
|
|
||||||
assert setup_component(self.hass, mqtt.DOMAIN,
|
|
||||||
{mqtt.DOMAIN: {'embedded': None}})
|
|
||||||
assert _start.call_count == 2 # Another call
|
|
||||||
|
|
||||||
def test_publish_calls_service(self):
|
def test_publish_calls_service(self):
|
||||||
"""Test the publishing of call to services."""
|
"""Test the publishing of call to services."""
|
||||||
self.hass.bus.listen_once(EVENT_CALL_SERVICE, self.record_calls)
|
self.hass.bus.listen_once(EVENT_CALL_SERVICE, self.record_calls)
|
||||||
|
@ -253,8 +220,8 @@ class TestMQTTCallbacks(unittest.TestCase):
|
||||||
"""Setup things to be run when tests are started."""
|
"""Setup things to be run when tests are started."""
|
||||||
self.hass = get_test_home_assistant()
|
self.hass = get_test_home_assistant()
|
||||||
|
|
||||||
with mock.patch('paho.mqtt.client.Client'):
|
with mock.patch('paho.mqtt.client.Client') as client:
|
||||||
self.hass.config.components = set()
|
client().connect = lambda *args: 0
|
||||||
assert setup_component(self.hass, mqtt.DOMAIN, {
|
assert setup_component(self.hass, mqtt.DOMAIN, {
|
||||||
mqtt.DOMAIN: {
|
mqtt.DOMAIN: {
|
||||||
mqtt.CONF_BROKER: 'mock-broker',
|
mqtt.CONF_BROKER: 'mock-broker',
|
||||||
|
@ -354,6 +321,51 @@ class TestMQTTCallbacks(unittest.TestCase):
|
||||||
test_handle.output[0])
|
test_handle.output[0])
|
||||||
|
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def test_setup_embedded_starts_with_no_config(hass):
|
||||||
|
"""Test setting up embedded server with no config."""
|
||||||
|
client_config = ('localhost', 1883, 'user', 'pass', None, '3.1.1')
|
||||||
|
|
||||||
|
with mock.patch('homeassistant.components.mqtt.server.async_start',
|
||||||
|
return_value=mock_coro(
|
||||||
|
return_value=(True, client_config))
|
||||||
|
) as _start:
|
||||||
|
yield from mock_mqtt_client(hass, {})
|
||||||
|
assert _start.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def test_setup_embedded_with_embedded(hass):
|
||||||
|
"""Test setting up embedded server with no config."""
|
||||||
|
client_config = ('localhost', 1883, 'user', 'pass', None, '3.1.1')
|
||||||
|
|
||||||
|
with mock.patch('homeassistant.components.mqtt.server.async_start',
|
||||||
|
return_value=mock_coro(
|
||||||
|
return_value=(True, client_config))
|
||||||
|
) as _start:
|
||||||
|
_start.return_value = mock_coro(return_value=(True, client_config))
|
||||||
|
yield from mock_mqtt_client(hass, {'embedded': None})
|
||||||
|
assert _start.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def test_setup_fails_if_no_connect_broker(hass):
|
||||||
|
"""Test for setup failure if connection to broker is missing."""
|
||||||
|
test_broker_cfg = {mqtt.DOMAIN: {mqtt.CONF_BROKER: 'test-broker'}}
|
||||||
|
|
||||||
|
with mock.patch('homeassistant.components.mqtt.MQTT',
|
||||||
|
side_effect=socket.error()):
|
||||||
|
result = yield from async_setup_component(hass, mqtt.DOMAIN,
|
||||||
|
test_broker_cfg)
|
||||||
|
assert not result
|
||||||
|
|
||||||
|
with mock.patch('paho.mqtt.client.Client') as mock_client:
|
||||||
|
mock_client().connect = lambda *args: 1
|
||||||
|
result = yield from async_setup_component(hass, mqtt.DOMAIN,
|
||||||
|
test_broker_cfg)
|
||||||
|
assert not result
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def test_birth_message(hass):
|
def test_birth_message(hass):
|
||||||
"""Test sending birth message."""
|
"""Test sending birth message."""
|
||||||
|
|
|
@ -26,23 +26,25 @@ class TestMQTT:
|
||||||
@patch('homeassistant.components.mqtt.MQTT')
|
@patch('homeassistant.components.mqtt.MQTT')
|
||||||
def test_creating_config_with_http_pass(self, mock_mqtt):
|
def test_creating_config_with_http_pass(self, mock_mqtt):
|
||||||
"""Test if the MQTT server gets started and subscribe/publish msg."""
|
"""Test if the MQTT server gets started and subscribe/publish msg."""
|
||||||
|
mock_mqtt().async_connect.return_value = mock_coro(True)
|
||||||
self.hass.bus.listen_once = MagicMock()
|
self.hass.bus.listen_once = MagicMock()
|
||||||
password = 'super_secret'
|
password = 'super_secret'
|
||||||
|
|
||||||
self.hass.config.api = MagicMock(api_password=password)
|
self.hass.config.api = MagicMock(api_password=password)
|
||||||
assert setup_component(self.hass, mqtt.DOMAIN, {})
|
assert setup_component(self.hass, mqtt.DOMAIN, {})
|
||||||
assert mock_mqtt.called
|
assert mock_mqtt.called
|
||||||
assert mock_mqtt.mock_calls[0][1][5] == 'homeassistant'
|
assert mock_mqtt.mock_calls[1][1][5] == 'homeassistant'
|
||||||
assert mock_mqtt.mock_calls[0][1][6] == password
|
assert mock_mqtt.mock_calls[1][1][6] == password
|
||||||
|
|
||||||
mock_mqtt.reset_mock()
|
mock_mqtt.reset_mock()
|
||||||
|
mock_mqtt().async_connect.return_value = mock_coro(True)
|
||||||
|
|
||||||
self.hass.config.components = set(['http'])
|
self.hass.config.components = set(['http'])
|
||||||
self.hass.config.api = MagicMock(api_password=None)
|
self.hass.config.api = MagicMock(api_password=None)
|
||||||
assert setup_component(self.hass, mqtt.DOMAIN, {})
|
assert setup_component(self.hass, mqtt.DOMAIN, {})
|
||||||
assert mock_mqtt.called
|
assert mock_mqtt.called
|
||||||
assert mock_mqtt.mock_calls[0][1][5] is None
|
assert mock_mqtt.mock_calls[1][1][5] is None
|
||||||
assert mock_mqtt.mock_calls[0][1][6] is None
|
assert mock_mqtt.mock_calls[1][1][6] is None
|
||||||
|
|
||||||
@patch('tempfile.NamedTemporaryFile', Mock(return_value=MagicMock()))
|
@patch('tempfile.NamedTemporaryFile', Mock(return_value=MagicMock()))
|
||||||
@patch('hbmqtt.broker.Broker.start', return_value=mock_coro())
|
@patch('hbmqtt.broker.Broker.start', return_value=mock_coro())
|
||||||
|
|
|
@ -10,7 +10,7 @@ from homeassistant import util, bootstrap
|
||||||
from homeassistant.util import location
|
from homeassistant.util import location
|
||||||
from homeassistant.components import mqtt
|
from homeassistant.components import mqtt
|
||||||
|
|
||||||
from .common import async_test_home_assistant
|
from .common import async_test_home_assistant, mock_coro
|
||||||
from .test_util.aiohttp import mock_aiohttp_client
|
from .test_util.aiohttp import mock_aiohttp_client
|
||||||
|
|
||||||
logging.basicConfig()
|
logging.basicConfig()
|
||||||
|
@ -66,7 +66,8 @@ def aioclient_mock():
|
||||||
def mqtt_mock(loop, hass):
|
def mqtt_mock(loop, hass):
|
||||||
"""Fixture to mock MQTT."""
|
"""Fixture to mock MQTT."""
|
||||||
with patch('homeassistant.components.mqtt.MQTT') as mock_mqtt:
|
with patch('homeassistant.components.mqtt.MQTT') as mock_mqtt:
|
||||||
loop.run_until_complete(bootstrap.async_setup_component(
|
mock_mqtt().async_connect.return_value = mock_coro(True)
|
||||||
|
assert loop.run_until_complete(bootstrap.async_setup_component(
|
||||||
hass, mqtt.DOMAIN, {
|
hass, mqtt.DOMAIN, {
|
||||||
mqtt.DOMAIN: {
|
mqtt.DOMAIN: {
|
||||||
mqtt.CONF_BROKER: 'mock-broker',
|
mqtt.CONF_BROKER: 'mock-broker',
|
||||||
|
|
Loading…
Add table
Reference in a new issue