From b0d3bbed79f68d064f1f3e5995def0c78e2eb5d9 Mon Sep 17 00:00:00 2001 From: Pascal Vizeli Date: Wed, 22 Feb 2017 09:43:22 +0100 Subject: [PATCH] Convert mqtt platforms to async (#6145) * Convert mqtt platforms to async * fix lint * add more platforms * convert mqtt_eventstream * fix lint / add mqtt_room * fix lint * fix test part 1 * fix test part 2 * fix out of memory bug * address comments --- .../components/alarm_control_panel/mqtt.py | 60 ++++--- homeassistant/components/alert.py | 6 +- .../components/binary_sensor/mqtt.py | 29 ++-- homeassistant/components/cover/mqtt.py | 78 ++++++--- homeassistant/components/fan/mqtt.py | 163 +++++++++++------- homeassistant/components/light/mqtt.py | 109 +++++++----- homeassistant/components/light/mqtt_json.py | 58 ++++--- .../components/light/mqtt_template.py | 75 +++++--- homeassistant/components/lock/mqtt.py | 63 ++++--- homeassistant/components/mqtt/__init__.py | 21 ++- homeassistant/components/mqtt_eventstream.py | 18 +- homeassistant/components/sensor/mqtt.py | 27 +-- homeassistant/components/sensor/mqtt_room.py | 24 ++- homeassistant/components/switch/mqtt.py | 61 ++++--- tests/components/test_mqtt_eventstream.py | 8 +- 15 files changed, 509 insertions(+), 291 deletions(-) diff --git a/homeassistant/components/alarm_control_panel/mqtt.py b/homeassistant/components/alarm_control_panel/mqtt.py index 26e2a2f1f77..455f60319c6 100644 --- a/homeassistant/components/alarm_control_panel/mqtt.py +++ b/homeassistant/components/alarm_control_panel/mqtt.py @@ -4,10 +4,12 @@ This platform enables the possibility to control a MQTT alarm. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/alarm_control_panel.mqtt/ """ +import asyncio import logging import voluptuous as vol +from homeassistant.core import callback import homeassistant.components.alarm_control_panel as alarm import homeassistant.components.mqtt as mqtt from homeassistant.const import ( @@ -41,10 +43,10 @@ PLATFORM_SCHEMA = mqtt.MQTT_BASE_PLATFORM_SCHEMA.extend({ }) -def setup_platform(hass, config, add_devices, discovery_info=None): +@asyncio.coroutine +def async_setup_platform(hass, config, async_add_devices, discovery_info=None): """Setup the MQTT platform.""" - add_devices([MqttAlarm( - hass, + yield from async_add_devices([MqttAlarm( config.get(CONF_NAME), config.get(CONF_STATE_TOPIC), config.get(CONF_COMMAND_TOPIC), @@ -58,11 +60,10 @@ def setup_platform(hass, config, add_devices, discovery_info=None): class MqttAlarm(alarm.AlarmControlPanel): """Representation of a MQTT alarm status.""" - def __init__(self, hass, name, state_topic, command_topic, qos, - payload_disarm, payload_arm_home, payload_arm_away, code): + def __init__(self, name, state_topic, command_topic, qos, payload_disarm, + payload_arm_home, payload_arm_away, code): """Initalize the MQTT alarm panel.""" self._state = STATE_UNKNOWN - self._hass = hass self._name = name self._state_topic = state_topic self._command_topic = command_topic @@ -72,6 +73,12 @@ class MqttAlarm(alarm.AlarmControlPanel): self._payload_arm_away = payload_arm_away self._code = code + def async_added_to_hass(self): + """Subscribe mqtt events. + + This method must be run in the event loop and returns a coroutine. + """ + @callback def message_received(topic, payload, qos): """A new MQTT message has been received.""" if payload not in (STATE_ALARM_DISARMED, STATE_ALARM_ARMED_HOME, @@ -80,9 +87,10 @@ class MqttAlarm(alarm.AlarmControlPanel): _LOGGER.warning('Received unexpected payload: %s', payload) return self._state = payload - self.update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) - mqtt.subscribe(hass, self._state_topic, message_received, self._qos) + return mqtt.async_subscribe( + self.hass, self._state_topic, message_received, self._qos) @property def should_poll(self): @@ -104,26 +112,38 @@ class MqttAlarm(alarm.AlarmControlPanel): """One or more characters if code is defined.""" return None if self._code is None else '.+' - def alarm_disarm(self, code=None): - """Send disarm command.""" + @asyncio.coroutine + def async_alarm_disarm(self, code=None): + """Send disarm command. + + This method is a coroutine. + """ if not self._validate_code(code, 'disarming'): return - mqtt.publish(self.hass, self._command_topic, - self._payload_disarm, self._qos) + mqtt.async_publish( + self.hass, self._command_topic, self._payload_disarm, self._qos) - def alarm_arm_home(self, code=None): - """Send arm home command.""" + @asyncio.coroutine + def async_alarm_arm_home(self, code=None): + """Send arm home command. + + This method is a coroutine. + """ if not self._validate_code(code, 'arming home'): return - mqtt.publish(self.hass, self._command_topic, - self._payload_arm_home, self._qos) + mqtt.async_publish( + self.hass, self._command_topic, self._payload_arm_home, self._qos) - def alarm_arm_away(self, code=None): - """Send arm away command.""" + @asyncio.coroutine + def async_alarm_arm_away(self, code=None): + """Send arm away command. + + This method is a coroutine. + """ if not self._validate_code(code, 'arming away'): return - mqtt.publish(self.hass, self._command_topic, - self._payload_arm_away, self._qos) + mqtt.async_publish( + self.hass, self._command_topic, self._payload_arm_away, self._qos) def _validate_code(self, code, state): """Validate given code.""" diff --git a/homeassistant/components/alert.py b/homeassistant/components/alert.py index 8cee05f29cc..40c91784a42 100644 --- a/homeassistant/components/alert.py +++ b/homeassistant/components/alert.py @@ -62,7 +62,8 @@ def is_on(hass, entity_id): def turn_on(hass, entity_id): """Reset the alert.""" - run_callback_threadsafe(hass.loop, async_turn_on, hass, entity_id) + run_callback_threadsafe( + hass.loop, async_turn_on, hass, entity_id).result() @callback @@ -75,7 +76,8 @@ def async_turn_on(hass, entity_id): def turn_off(hass, entity_id): """Acknowledge alert.""" - run_callback_threadsafe(hass.loop, async_turn_off, hass, entity_id) + run_callback_threadsafe( + hass.loop, async_turn_off, hass, entity_id).result() @callback diff --git a/homeassistant/components/binary_sensor/mqtt.py b/homeassistant/components/binary_sensor/mqtt.py index 3dbfffa75b6..06814d85f88 100644 --- a/homeassistant/components/binary_sensor/mqtt.py +++ b/homeassistant/components/binary_sensor/mqtt.py @@ -4,6 +4,7 @@ Support for MQTT binary sensors. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/binary_sensor.mqtt/ """ +import asyncio import logging import voluptuous as vol @@ -35,8 +36,8 @@ PLATFORM_SCHEMA = mqtt.MQTT_RO_PLATFORM_SCHEMA.extend({ }) -# pylint: disable=unused-argument -def setup_platform(hass, config, add_devices, discovery_info=None): +@asyncio.coroutine +def async_setup_platform(hass, config, async_add_devices, discovery_info=None): """Set up the MQTT binary sensor.""" if discovery_info is not None: config = PLATFORM_SCHEMA(discovery_info) @@ -44,8 +45,8 @@ def setup_platform(hass, config, add_devices, discovery_info=None): value_template = config.get(CONF_VALUE_TEMPLATE) if value_template is not None: value_template.hass = hass - add_devices([MqttBinarySensor( - hass, + + yield from async_add_devices([MqttBinarySensor( config.get(CONF_NAME), config.get(CONF_STATE_TOPIC), get_deprecated(config, CONF_DEVICE_CLASS, CONF_SENSOR_CLASS), @@ -59,10 +60,9 @@ def setup_platform(hass, config, add_devices, discovery_info=None): class MqttBinarySensor(BinarySensorDevice): """Representation a binary sensor that is updated by MQTT.""" - def __init__(self, hass, name, state_topic, device_class, qos, payload_on, + def __init__(self, name, state_topic, device_class, qos, payload_on, payload_off, value_template): """Initialize the MQTT binary sensor.""" - self._hass = hass self._name = name self._state = False self._state_topic = state_topic @@ -70,21 +70,28 @@ class MqttBinarySensor(BinarySensorDevice): self._payload_on = payload_on self._payload_off = payload_off self._qos = qos + self._template = value_template + def async_added_to_hass(self): + """Subscribe mqtt events. + + This method must be run in the event loop and returns a coroutine. + """ @callback def message_received(topic, payload, qos): """A new MQTT message has been received.""" - if value_template is not None: - payload = value_template.async_render_with_possible_json_value( + if self._template is not None: + payload = self._template.async_render_with_possible_json_value( payload) if payload == self._payload_on: self._state = True - hass.async_add_job(self.async_update_ha_state()) elif payload == self._payload_off: self._state = False - hass.async_add_job(self.async_update_ha_state()) - mqtt.subscribe(hass, self._state_topic, message_received, self._qos) + self.hass.async_add_job(self.async_update_ha_state()) + + return mqtt.async_subscribe( + self.hass, self._state_topic, message_received, self._qos) @property def should_poll(self): diff --git a/homeassistant/components/cover/mqtt.py b/homeassistant/components/cover/mqtt.py index aa549986533..97ddad74d79 100644 --- a/homeassistant/components/cover/mqtt.py +++ b/homeassistant/components/cover/mqtt.py @@ -4,6 +4,7 @@ Support for MQTT cover devices. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/cover.mqtt/ """ +import asyncio import logging import voluptuous as vol @@ -46,13 +47,14 @@ PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend({ }) -def setup_platform(hass, config, add_devices, discovery_info=None): +@asyncio.coroutine +def async_setup_platform(hass, config, async_add_devices, discovery_info=None): """Setup the MQTT Cover.""" value_template = config.get(CONF_VALUE_TEMPLATE) if value_template is not None: value_template.hass = hass - add_devices([MqttCover( - hass, + + yield from async_add_devices([MqttCover( config.get(CONF_NAME), config.get(CONF_STATE_TOPIC), config.get(CONF_COMMAND_TOPIC), @@ -71,13 +73,12 @@ def setup_platform(hass, config, add_devices, discovery_info=None): class MqttCover(CoverDevice): """Representation of a cover that can be controlled using MQTT.""" - def __init__(self, hass, name, state_topic, command_topic, qos, - retain, state_open, state_closed, payload_open, payload_close, + def __init__(self, name, state_topic, command_topic, qos, retain, + state_open, state_closed, payload_open, payload_close, payload_stop, optimistic, value_template): """Initialize the cover.""" self._position = None self._state = None - self._hass = hass self._name = name self._state_topic = state_topic self._command_topic = command_topic @@ -89,37 +90,45 @@ class MqttCover(CoverDevice): self._state_closed = state_closed self._retain = retain self._optimistic = optimistic or state_topic is None + self._template = value_template + @asyncio.coroutine + def async_added_to_hass(self): + """Subscribe mqtt events. + + This method is a coroutine. + """ @callback def message_received(topic, payload, qos): """A new MQTT message has been received.""" - if value_template is not None: - payload = value_template.async_render_with_possible_json_value( + if self._template is not None: + payload = self._template.async_render_with_possible_json_value( payload) + if payload == self._state_open: self._state = False - hass.async_add_job(self.async_update_ha_state()) elif payload == self._state_closed: self._state = True - hass.async_add_job(self.async_update_ha_state()) elif payload.isnumeric() and 0 <= int(payload) <= 100: if int(payload) > 0: self._state = False else: self._state = True self._position = int(payload) - hass.async_add_job(self.async_update_ha_state()) else: _LOGGER.warning( "Payload is not True, False, or integer (0-100): %s", payload) + return + + self.hass.async_add_job(self.async_update_ha_state()) if self._state_topic is None: # Force into optimistic mode. self._optimistic = True else: - mqtt.subscribe(hass, self._state_topic, message_received, - self._qos) + yield from mqtt.async_subscribe( + self.hass, self._state_topic, message_received, self._qos) @property def should_poll(self): @@ -144,25 +153,40 @@ class MqttCover(CoverDevice): """ return self._position - def open_cover(self, **kwargs): - """Move the cover up.""" - mqtt.publish(self.hass, self._command_topic, self._payload_open, - self._qos, self._retain) + @asyncio.coroutine + def async_open_cover(self, **kwargs): + """Move the cover up. + + This method is a coroutine. + """ + mqtt.async_publish( + self.hass, self._command_topic, self._payload_open, self._qos, + self._retain) if self._optimistic: # Optimistically assume that cover has changed state. self._state = False - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) - def close_cover(self, **kwargs): - """Move the cover down.""" - mqtt.publish(self.hass, self._command_topic, self._payload_close, - self._qos, self._retain) + @asyncio.coroutine + def async_close_cover(self, **kwargs): + """Move the cover down. + + This method is a coroutine. + """ + mqtt.async_publish( + self.hass, self._command_topic, self._payload_close, self._qos, + self._retain) if self._optimistic: # Optimistically assume that cover has changed state. self._state = True - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) - def stop_cover(self, **kwargs): - """Stop the device.""" - mqtt.publish(self.hass, self._command_topic, self._payload_stop, - self._qos, self._retain) + @asyncio.coroutine + def async_stop_cover(self, **kwargs): + """Stop the device. + + This method is a coroutine. + """ + mqtt.async_publish( + self.hass, self._command_topic, self._payload_stop, self._qos, + self._retain) diff --git a/homeassistant/components/fan/mqtt.py b/homeassistant/components/fan/mqtt.py index 1d5b7609897..3463cc01bbc 100644 --- a/homeassistant/components/fan/mqtt.py +++ b/homeassistant/components/fan/mqtt.py @@ -4,10 +4,12 @@ Support for MQTT fans. For more details about this platform, please refer to the documentation https://home-assistant.io/components/fan.mqtt/ """ +import asyncio import logging import voluptuous as vol +from homeassistant.core import callback import homeassistant.components.mqtt as mqtt from homeassistant.const import ( CONF_NAME, CONF_OPTIMISTIC, CONF_STATE, STATE_ON, STATE_OFF, @@ -73,11 +75,10 @@ PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend({ }) -# pylint: disable=unused-argument -def setup_platform(hass, config, add_devices, discovery_info=None): +@asyncio.coroutine +def async_setup_platform(hass, config, async_add_devices, discovery_info=None): """Setup MQTT fan platform.""" - add_devices([MqttFan( - hass, + yield from async_add_devices([MqttFan( config.get(CONF_NAME), { key: config.get(key) for key in ( @@ -113,15 +114,15 @@ def setup_platform(hass, config, add_devices, discovery_info=None): class MqttFan(FanEntity): """A MQTT fan component.""" - def __init__(self, hass, name, topic, templates, qos, retain, payload, + def __init__(self, name, topic, templates, qos, retain, payload, speed_list, optimistic): """Initialize the MQTT fan.""" - self._hass = hass self._name = name self._topic = topic self._qos = qos self._retain = retain self._payload = payload + self._templates = templates self._speed_list = speed_list self._optimistic = optimistic or topic[CONF_STATE_TOPIC] is None self._optimistic_oscillation = ( @@ -129,19 +130,29 @@ class MqttFan(FanEntity): self._optimistic_speed = ( optimistic or topic[CONF_SPEED_STATE_TOPIC] is None) self._state = False + self._speed = None + self._oscillation = None self._supported_features = 0 self._supported_features |= (topic[CONF_OSCILLATION_STATE_TOPIC] is not None and SUPPORT_OSCILLATE) self._supported_features |= (topic[CONF_SPEED_STATE_TOPIC] is not None and SUPPORT_SET_SPEED) - for key, tpl in list(templates.items()): + @asyncio.coroutine + def async_added_to_hass(self): + """Subscribe mqtt events. + + This method is a coroutine. + """ + templates = {} + for key, tpl in list(self._templates.items()): if tpl is None: templates[key] = lambda value: value else: - tpl.hass = hass - templates[key] = tpl.render_with_possible_json_value + tpl.hass = self.hass + templates[key] = tpl.async_render_with_possible_json_value + @callback def state_received(topic, payload, qos): """A new MQTT message has been received.""" payload = templates[CONF_STATE](payload) @@ -149,13 +160,14 @@ class MqttFan(FanEntity): self._state = True elif payload == self._payload[STATE_OFF]: self._state = False - - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) if self._topic[CONF_STATE_TOPIC] is not None: - mqtt.subscribe(self._hass, self._topic[CONF_STATE_TOPIC], - state_received, self._qos) + yield from mqtt.async_subscribe( + self.hass, self._topic[CONF_STATE_TOPIC], state_received, + self._qos) + @callback def speed_received(topic, payload, qos): """A new MQTT message for the speed has been received.""" payload = templates[ATTR_SPEED](payload) @@ -165,17 +177,15 @@ class MqttFan(FanEntity): self._speed = SPEED_MEDIUM elif payload == self._payload[SPEED_HIGH]: self._speed = SPEED_HIGH - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) if self._topic[CONF_SPEED_STATE_TOPIC] is not None: - mqtt.subscribe(self._hass, self._topic[CONF_SPEED_STATE_TOPIC], - speed_received, self._qos) - self._speed = SPEED_OFF - elif self._topic[CONF_SPEED_COMMAND_TOPIC] is not None: - self._speed = SPEED_OFF - else: - self._speed = SPEED_OFF + yield from mqtt.async_subscribe( + self.hass, self._topic[CONF_SPEED_STATE_TOPIC], speed_received, + self._qos) + self._speed = SPEED_OFF + @callback def oscillation_received(topic, payload, qos): """A new MQTT message has been received.""" payload = templates[OSCILLATION](payload) @@ -183,17 +193,13 @@ class MqttFan(FanEntity): self._oscillation = True elif payload == self._payload[OSCILLATE_OFF_PAYLOAD]: self._oscillation = False - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) if self._topic[CONF_OSCILLATION_STATE_TOPIC] is not None: - mqtt.subscribe(self._hass, - self._topic[CONF_OSCILLATION_STATE_TOPIC], - oscillation_received, self._qos) - self._oscillation = False - if self._topic[CONF_OSCILLATION_COMMAND_TOPIC] is not None: - self._oscillation = False - else: - self._oscillation = False + yield from mqtt.async_subscribe( + self.hass, self._topic[CONF_OSCILLATION_STATE_TOPIC], + oscillation_received, self._qos) + self._oscillation = False @property def should_poll(self): @@ -235,43 +241,72 @@ class MqttFan(FanEntity): """Return the oscillation state.""" return self._oscillation - def turn_on(self, speed: str=None) -> None: - """Turn on the entity.""" - mqtt.publish(self._hass, self._topic[CONF_COMMAND_TOPIC], - self._payload[STATE_ON], self._qos, self._retain) + @asyncio.coroutine + def async_turn_on(self, speed: str=None) -> None: + """Turn on the entity. + + This method is a coroutine. + """ + mqtt.async_publish( + self.hass, self._topic[CONF_COMMAND_TOPIC], + self._payload[STATE_ON], self._qos, self._retain) if speed: - self.set_speed(speed) + yield from self.async_set_speed(speed) - def turn_off(self) -> None: - """Turn off the entity.""" - mqtt.publish(self._hass, self._topic[CONF_COMMAND_TOPIC], - self._payload[STATE_OFF], self._qos, self._retain) + @asyncio.coroutine + def async_turn_off(self) -> None: + """Turn off the entity. - def set_speed(self, speed: str) -> None: - """Set the speed of the fan.""" - if self._topic[CONF_SPEED_COMMAND_TOPIC] is not None: - mqtt_payload = SPEED_OFF - if speed == SPEED_LOW: - mqtt_payload = self._payload[SPEED_LOW] - elif speed == SPEED_MEDIUM: - mqtt_payload = self._payload[SPEED_MEDIUM] - elif speed == SPEED_HIGH: - mqtt_payload = self._payload[SPEED_HIGH] - else: - mqtt_payload = speed + This method is a coroutine. + """ + mqtt.async_publish( + self.hass, self._topic[CONF_COMMAND_TOPIC], + self._payload[STATE_OFF], self._qos, self._retain) + + @asyncio.coroutine + def async_set_speed(self, speed: str) -> None: + """Set the speed of the fan. + + This method is a coroutine. + """ + if self._topic[CONF_SPEED_COMMAND_TOPIC] is None: + return + + if speed == SPEED_LOW: + mqtt_payload = self._payload[SPEED_LOW] + elif speed == SPEED_MEDIUM: + mqtt_payload = self._payload[SPEED_MEDIUM] + elif speed == SPEED_HIGH: + mqtt_payload = self._payload[SPEED_HIGH] + else: + mqtt_payload = speed + + mqtt.async_publish( + self.hass, self._topic[CONF_SPEED_COMMAND_TOPIC], + mqtt_payload, self._qos, self._retain) + + if self._optimistic_speed: self._speed = speed - mqtt.publish(self._hass, self._topic[CONF_SPEED_COMMAND_TOPIC], - mqtt_payload, self._qos, self._retain) - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) - def oscillate(self, oscillating: bool) -> None: - """Set oscillation.""" - if self._topic[CONF_OSCILLATION_COMMAND_TOPIC] is not None: - self._oscillation = oscillating + @asyncio.coroutine + def async_oscillate(self, oscillating: bool) -> None: + """Set oscillation. + + This method is a coroutine. + """ + if self._topic[CONF_OSCILLATION_COMMAND_TOPIC] is None: + return + + if oscillating is False: + payload = self._payload[OSCILLATE_OFF_PAYLOAD] + else: payload = self._payload[OSCILLATE_ON_PAYLOAD] - if oscillating is False: - payload = self._payload[OSCILLATE_OFF_PAYLOAD] - mqtt.publish(self._hass, - self._topic[CONF_OSCILLATION_COMMAND_TOPIC], - payload, self._qos, self._retain) - self.schedule_update_ha_state() + + mqtt.async_publish( + self.hass, self._topic[CONF_OSCILLATION_COMMAND_TOPIC], + payload, self._qos, self._retain) + + if self._optimistic_oscillation: + self._oscillation = oscillating + self.hass.async_add_job(self.async_update_ha_state()) diff --git a/homeassistant/components/light/mqtt.py b/homeassistant/components/light/mqtt.py index 5f227af97b6..77b804cb499 100644 --- a/homeassistant/components/light/mqtt.py +++ b/homeassistant/components/light/mqtt.py @@ -4,10 +4,12 @@ Support for MQTT lights. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/light.mqtt/ """ +import asyncio import logging import voluptuous as vol +from homeassistant.core import callback import homeassistant.components.mqtt as mqtt from homeassistant.components.light import ( ATTR_BRIGHTNESS, ATTR_RGB_COLOR, ATTR_COLOR_TEMP, SUPPORT_BRIGHTNESS, @@ -62,12 +64,13 @@ PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend({ }) -def setup_platform(hass, config, add_devices, discovery_info=None): +@asyncio.coroutine +def async_setup_platform(hass, config, async_add_devices, discovery_info=None): """Add MQTT Light.""" - config.setdefault(CONF_STATE_VALUE_TEMPLATE, - config.get(CONF_VALUE_TEMPLATE)) - add_devices([MqttLight( - hass, + config.setdefault( + CONF_STATE_VALUE_TEMPLATE, config.get(CONF_VALUE_TEMPLATE)) + + yield from async_add_devices([MqttLight( config.get(CONF_NAME), { key: config.get(key) for key in ( @@ -101,15 +104,15 @@ def setup_platform(hass, config, add_devices, discovery_info=None): class MqttLight(Light): """MQTT light.""" - def __init__(self, hass, name, topic, templates, qos, retain, payload, + def __init__(self, name, topic, templates, qos, retain, payload, optimistic, brightness_scale): """Initialize MQTT light.""" - self._hass = hass self._name = name self._topic = topic self._qos = qos self._retain = retain self._payload = payload + self._templates = templates self._optimistic = optimistic or topic[CONF_STATE_TOPIC] is None self._optimistic_rgb = \ optimistic or topic[CONF_RGB_STATE_TOPIC] is None @@ -119,6 +122,9 @@ class MqttLight(Light): optimistic or topic[CONF_COLOR_TEMP_STATE_TOPIC] is None) self._brightness_scale = brightness_scale self._state = False + self._brightness = None + self._rgb = None + self._color_temp = None self._supported_features = 0 self._supported_features |= ( topic[CONF_RGB_COMMAND_TOPIC] is not None and SUPPORT_RGB_COLOR) @@ -129,13 +135,21 @@ class MqttLight(Light): topic[CONF_COLOR_TEMP_COMMAND_TOPIC] is not None and SUPPORT_COLOR_TEMP) - for key, tpl in list(templates.items()): + @asyncio.coroutine + def async_added_to_hass(self): + """Subscribe mqtt events. + + This method is a coroutine. + """ + templates = {} + for key, tpl in list(self._templates.items()): if tpl is None: templates[key] = lambda value: value else: - tpl.hass = hass - templates[key] = tpl.render_with_possible_json_value + tpl.hass = self.hass + templates[key] = tpl.async_render_with_possible_json_value + @callback def state_received(topic, payload, qos): """A new MQTT message has been received.""" payload = templates[CONF_STATE](payload) @@ -143,23 +157,24 @@ class MqttLight(Light): self._state = True elif payload == self._payload['off']: self._state = False - - self.update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) if self._topic[CONF_STATE_TOPIC] is not None: - mqtt.subscribe(self._hass, self._topic[CONF_STATE_TOPIC], - state_received, self._qos) + yield from mqtt.async_subscribe( + self.hass, self._topic[CONF_STATE_TOPIC], state_received, + self._qos) + @callback def brightness_received(topic, payload, qos): """A new MQTT message for the brightness has been received.""" device_value = float(templates[CONF_BRIGHTNESS](payload)) percent_bright = device_value / self._brightness_scale self._brightness = int(percent_bright * 255) - self.update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) if self._topic[CONF_BRIGHTNESS_STATE_TOPIC] is not None: - mqtt.subscribe( - self._hass, self._topic[CONF_BRIGHTNESS_STATE_TOPIC], + yield from mqtt.async_subscribe( + self.hass, self._topic[CONF_BRIGHTNESS_STATE_TOPIC], brightness_received, self._qos) self._brightness = 255 elif self._topic[CONF_BRIGHTNESS_COMMAND_TOPIC] is not None: @@ -167,29 +182,32 @@ class MqttLight(Light): else: self._brightness = None + @callback def rgb_received(topic, payload, qos): """A new MQTT message has been received.""" self._rgb = [int(val) for val in templates[CONF_RGB](payload).split(',')] - self.update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) if self._topic[CONF_RGB_STATE_TOPIC] is not None: - mqtt.subscribe(self._hass, self._topic[CONF_RGB_STATE_TOPIC], - rgb_received, self._qos) + yield from mqtt.async_subscribe( + self.hass, self._topic[CONF_RGB_STATE_TOPIC], rgb_received, + self._qos) self._rgb = [255, 255, 255] if self._topic[CONF_RGB_COMMAND_TOPIC] is not None: self._rgb = [255, 255, 255] else: self._rgb = None + @callback def color_temp_received(topic, payload, qos): """A new MQTT message for color temp has been received.""" self._color_temp = int(templates[CONF_COLOR_TEMP](payload)) - self.update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) if self._topic[CONF_COLOR_TEMP_STATE_TOPIC] is not None: - mqtt.subscribe( - self._hass, self._topic[CONF_COLOR_TEMP_STATE_TOPIC], + yield from mqtt.async_subscribe( + self.hass, self._topic[CONF_COLOR_TEMP_STATE_TOPIC], color_temp_received, self._qos) self._color_temp = 150 if self._topic[CONF_COLOR_TEMP_COMMAND_TOPIC] is not None: @@ -237,16 +255,21 @@ class MqttLight(Light): """Flag supported features.""" return self._supported_features - def turn_on(self, **kwargs): - """Turn the device on.""" + @asyncio.coroutine + def async_turn_on(self, **kwargs): + """Turn the device on. + + This method is a coroutine. + """ should_update = False if ATTR_RGB_COLOR in kwargs and \ self._topic[CONF_RGB_COMMAND_TOPIC] is not None: - mqtt.publish(self._hass, self._topic[CONF_RGB_COMMAND_TOPIC], - '{},{},{}'.format(*kwargs[ATTR_RGB_COLOR]), - self._qos, self._retain) + mqtt.async_publish( + self.hass, self._topic[CONF_RGB_COMMAND_TOPIC], + '{},{},{}'.format(*kwargs[ATTR_RGB_COLOR]), self._qos, + self._retain) if self._optimistic_rgb: self._rgb = kwargs[ATTR_RGB_COLOR] @@ -256,8 +279,8 @@ class MqttLight(Light): self._topic[CONF_BRIGHTNESS_COMMAND_TOPIC] is not None: percent_bright = float(kwargs[ATTR_BRIGHTNESS]) / 255 device_brightness = int(percent_bright * self._brightness_scale) - mqtt.publish( - self._hass, self._topic[CONF_BRIGHTNESS_COMMAND_TOPIC], + mqtt.async_publish( + self.hass, self._topic[CONF_BRIGHTNESS_COMMAND_TOPIC], device_brightness, self._qos, self._retain) if self._optimistic_brightness: @@ -267,15 +290,16 @@ class MqttLight(Light): if ATTR_COLOR_TEMP in kwargs and \ self._topic[CONF_COLOR_TEMP_COMMAND_TOPIC] is not None: color_temp = int(kwargs[ATTR_COLOR_TEMP]) - mqtt.publish( - self._hass, self._topic[CONF_COLOR_TEMP_COMMAND_TOPIC], + mqtt.async_publish( + self.hass, self._topic[CONF_COLOR_TEMP_COMMAND_TOPIC], color_temp, self._qos, self._retain) if self._optimistic_color_temp: self._color_temp = kwargs[ATTR_COLOR_TEMP] should_update = True - mqtt.publish(self._hass, self._topic[CONF_COMMAND_TOPIC], - self._payload['on'], self._qos, self._retain) + mqtt.async_publish( + self.hass, self._topic[CONF_COMMAND_TOPIC], self._payload['on'], + self._qos, self._retain) if self._optimistic: # Optimistically assume that switch has changed state. @@ -283,14 +307,19 @@ class MqttLight(Light): should_update = True if should_update: - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) - def turn_off(self, **kwargs): - """Turn the device off.""" - mqtt.publish(self._hass, self._topic[CONF_COMMAND_TOPIC], - self._payload['off'], self._qos, self._retain) + @asyncio.coroutine + def async_turn_off(self, **kwargs): + """Turn the device off. + + This method is a coroutine. + """ + mqtt.async_publish( + self.hass, self._topic[CONF_COMMAND_TOPIC], self._payload['off'], + self._qos, self._retain) if self._optimistic: # Optimistically assume that switch has changed state. self._state = False - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) diff --git a/homeassistant/components/light/mqtt_json.py b/homeassistant/components/light/mqtt_json.py index ba2c9efcb3d..49c69ef348b 100755 --- a/homeassistant/components/light/mqtt_json.py +++ b/homeassistant/components/light/mqtt_json.py @@ -4,11 +4,12 @@ Support for MQTT JSON lights. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/light.mqtt_json/ """ - +import asyncio import logging import json import voluptuous as vol +from homeassistant.core import callback import homeassistant.components.mqtt as mqtt from homeassistant.components.light import ( ATTR_BRIGHTNESS, ATTR_RGB_COLOR, ATTR_TRANSITION, PLATFORM_SCHEMA, @@ -57,10 +58,10 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ }) -def setup_platform(hass, config, add_devices, discovery_info=None): +@asyncio.coroutine +def async_setup_platform(hass, config, async_add_devices, discovery_info=None): """Setup a MQTT JSON Light.""" - add_devices([MqttJson( - hass, + yield from async_add_devices([MqttJson( config.get(CONF_NAME), { key: config.get(key) for key in ( @@ -85,10 +86,9 @@ def setup_platform(hass, config, add_devices, discovery_info=None): class MqttJson(Light): """Representation of a MQTT JSON light.""" - def __init__(self, hass, name, topic, qos, retain, - optimistic, brightness, rgb, flash_times): + def __init__(self, name, topic, qos, retain, optimistic, brightness, rgb, + flash_times): """Initialize MQTT JSON light.""" - self._hass = hass self._name = name self._topic = topic self._qos = qos @@ -107,6 +107,13 @@ class MqttJson(Light): self._flash_times = flash_times + @asyncio.coroutine + def async_added_to_hass(self): + """Subscribe mqtt events. + + This method is a coroutine. + """ + @callback def state_received(topic, payload, qos): """A new MQTT message has been received.""" values = json.loads(payload) @@ -136,11 +143,12 @@ class MqttJson(Light): except ValueError: _LOGGER.warning('Invalid brightness value received') - self.update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) if self._topic[CONF_STATE_TOPIC] is not None: - mqtt.subscribe(self._hass, self._topic[CONF_STATE_TOPIC], - state_received, self._qos) + yield from mqtt.async_subscribe( + self.hass, self._topic[CONF_STATE_TOPIC], state_received, + self._qos) @property def brightness(self): @@ -177,8 +185,12 @@ class MqttJson(Light): """Flag supported features.""" return SUPPORT_MQTT_JSON - def turn_on(self, **kwargs): - """Turn the device on.""" + @asyncio.coroutine + def async_turn_on(self, **kwargs): + """Turn the device on. + + This method is a coroutine. + """ should_update = False message = {'state': 'ON'} @@ -212,8 +224,9 @@ class MqttJson(Light): self._brightness = kwargs[ATTR_BRIGHTNESS] should_update = True - mqtt.publish(self._hass, self._topic[CONF_COMMAND_TOPIC], - json.dumps(message), self._qos, self._retain) + mqtt.async_publish( + self.hass, self._topic[CONF_COMMAND_TOPIC], json.dumps(message), + self._qos, self._retain) if self._optimistic: # Optimistically assume that the light has changed state. @@ -221,19 +234,24 @@ class MqttJson(Light): should_update = True if should_update: - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) - def turn_off(self, **kwargs): - """Turn the device off.""" + @asyncio.coroutine + def async_turn_off(self, **kwargs): + """Turn the device off. + + This method is a coroutine. + """ message = {'state': 'OFF'} if ATTR_TRANSITION in kwargs: message['transition'] = kwargs[ATTR_TRANSITION] - mqtt.publish(self._hass, self._topic[CONF_COMMAND_TOPIC], - json.dumps(message), self._qos, self._retain) + mqtt.async_publish( + self.hass, self._topic[CONF_COMMAND_TOPIC], json.dumps(message), + self._qos, self._retain) if self._optimistic: # Optimistically assume that the light has changed state. self._state = False - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) diff --git a/homeassistant/components/light/mqtt_template.py b/homeassistant/components/light/mqtt_template.py index b7520fce682..d99db968315 100755 --- a/homeassistant/components/light/mqtt_template.py +++ b/homeassistant/components/light/mqtt_template.py @@ -4,10 +4,11 @@ Support for MQTT Template lights. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/light.mqtt_template/ """ - +import asyncio import logging import voluptuous as vol +from homeassistant.core import callback import homeassistant.components.mqtt as mqtt from homeassistant.components.light import ( ATTR_BRIGHTNESS, ATTR_EFFECT, ATTR_FLASH, ATTR_RGB_COLOR, ATTR_TRANSITION, @@ -60,9 +61,10 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ }) -def setup_platform(hass, config, add_devices, discovery_info=None): +@asyncio.coroutine +def async_setup_platform(hass, config, async_add_devices, discovery_info=None): """Setup a MQTT Template light.""" - add_devices([MqttTemplate( + yield from async_add_devices([MqttTemplate( hass, config.get(CONF_NAME), config.get(CONF_EFFECT_LIST), @@ -96,14 +98,10 @@ class MqttTemplate(Light): def __init__(self, hass, name, effect_list, topics, templates, optimistic, qos, retain): """Initialize MQTT Template light.""" - self._hass = hass self._name = name self._effect_list = effect_list self._topics = topics self._templates = templates - for tpl in self._templates.values(): - if tpl is not None: - tpl.hass = hass self._optimistic = optimistic or topics[CONF_STATE_TOPIC] is None \ or templates[CONF_STATE_TEMPLATE] is None self._qos = qos @@ -124,11 +122,23 @@ class MqttTemplate(Light): self._rgb = None self._effect = None + # init hass to template + for tpl in self._templates.values(): + if tpl is not None: + tpl.hass = hass + + @asyncio.coroutine + def async_added_to_hass(self): + """Subscribe mqtt events. + + This method is a coroutine. + """ + @callback def state_received(topic, payload, qos): """A new MQTT message has been received.""" # read state state = self._templates[CONF_STATE_TEMPLATE].\ - render_with_possible_json_value(payload) + async_render_with_possible_json_value(payload) if state == STATE_ON: self._state = True elif state == STATE_OFF: @@ -141,7 +151,7 @@ class MqttTemplate(Light): try: self._brightness = int( self._templates[CONF_BRIGHTNESS_TEMPLATE]. - render_with_possible_json_value(payload) + async_render_with_possible_json_value(payload) ) except ValueError: _LOGGER.warning('Invalid brightness value received') @@ -151,20 +161,20 @@ class MqttTemplate(Light): try: self._rgb[0] = int( self._templates[CONF_RED_TEMPLATE]. - render_with_possible_json_value(payload)) + async_render_with_possible_json_value(payload)) self._rgb[1] = int( self._templates[CONF_GREEN_TEMPLATE]. - render_with_possible_json_value(payload)) + async_render_with_possible_json_value(payload)) self._rgb[2] = int( self._templates[CONF_BLUE_TEMPLATE]. - render_with_possible_json_value(payload)) + async_render_with_possible_json_value(payload)) except ValueError: _LOGGER.warning('Invalid color value received') # read effect if self._templates[CONF_EFFECT_TEMPLATE] is not None: effect = self._templates[CONF_EFFECT_TEMPLATE].\ - render_with_possible_json_value(payload) + async_render_with_possible_json_value(payload) # validate effect value if effect in self._effect_list: @@ -172,11 +182,12 @@ class MqttTemplate(Light): else: _LOGGER.warning('Unsupported effect value received') - self.update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) if self._topics[CONF_STATE_TOPIC] is not None: - mqtt.subscribe(self._hass, self._topics[CONF_STATE_TOPIC], - state_received, self._qos) + yield from mqtt.async_subscribe( + self.hass, self._topics[CONF_STATE_TOPIC], state_received, + self._qos) @property def brightness(self): @@ -221,8 +232,12 @@ class MqttTemplate(Light): """Return the current effect.""" return self._effect - def turn_on(self, **kwargs): - """Turn the entity on.""" + @asyncio.coroutine + def async_turn_on(self, **kwargs): + """Turn the entity on. + + This method is a coroutine. + """ # state values = {'state': True} if self._optimistic: @@ -256,17 +271,21 @@ class MqttTemplate(Light): if ATTR_TRANSITION in kwargs: values['transition'] = kwargs[ATTR_TRANSITION] - mqtt.publish( - self._hass, self._topics[CONF_COMMAND_TOPIC], - self._templates[CONF_COMMAND_ON_TEMPLATE].render(**values), + mqtt.async_publish( + self.hass, self._topics[CONF_COMMAND_TOPIC], + self._templates[CONF_COMMAND_ON_TEMPLATE].async_render(**values), self._qos, self._retain ) if self._optimistic: - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) - def turn_off(self, **kwargs): - """Turn the entity off.""" + @asyncio.coroutine + def async_turn_off(self, **kwargs): + """Turn the entity off. + + This method is a coroutine. + """ # state values = {'state': False} if self._optimistic: @@ -276,14 +295,14 @@ class MqttTemplate(Light): if ATTR_TRANSITION in kwargs: values['transition'] = kwargs[ATTR_TRANSITION] - mqtt.publish( - self._hass, self._topics[CONF_COMMAND_TOPIC], - self._templates[CONF_COMMAND_OFF_TEMPLATE].render(**values), + mqtt.async_publish( + self.hass, self._topics[CONF_COMMAND_TOPIC], + self._templates[CONF_COMMAND_OFF_TEMPLATE].async_render(**values), self._qos, self._retain ) if self._optimistic: - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) @property def supported_features(self): diff --git a/homeassistant/components/lock/mqtt.py b/homeassistant/components/lock/mqtt.py index fde62c8695e..00540f66150 100644 --- a/homeassistant/components/lock/mqtt.py +++ b/homeassistant/components/lock/mqtt.py @@ -4,10 +4,12 @@ Support for MQTT locks. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/lock.mqtt/ """ +import asyncio import logging import voluptuous as vol +from homeassistant.core import callback from homeassistant.components.lock import LockDevice from homeassistant.components.mqtt import ( CONF_STATE_TOPIC, CONF_COMMAND_TOPIC, CONF_QOS, CONF_RETAIN) @@ -38,14 +40,14 @@ PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend({ }) -# pylint: disable=unused-argument -def setup_platform(hass, config, add_devices, discovery_info=None): +@asyncio.coroutine +def async_setup_platform(hass, config, async_add_devices, discovery_info=None): """Setup the MQTT lock.""" value_template = config.get(CONF_VALUE_TEMPLATE) if value_template is not None: value_template.hass = hass - add_devices([MqttLock( - hass, + + yield from async_add_devices([MqttLock( config.get(CONF_NAME), config.get(CONF_STATE_TOPIC), config.get(CONF_COMMAND_TOPIC), @@ -61,11 +63,10 @@ def setup_platform(hass, config, add_devices, discovery_info=None): class MqttLock(LockDevice): """Represents a lock that can be toggled using MQTT.""" - def __init__(self, hass, name, state_topic, command_topic, qos, retain, + def __init__(self, name, state_topic, command_topic, qos, retain, payload_lock, payload_unlock, optimistic, value_template): """Initialize the lock.""" self._state = False - self._hass = hass self._name = name self._state_topic = state_topic self._command_topic = command_topic @@ -74,25 +75,33 @@ class MqttLock(LockDevice): self._payload_lock = payload_lock self._payload_unlock = payload_unlock self._optimistic = optimistic + self._template = value_template + @asyncio.coroutine + def async_added_to_hass(self): + """Subscribe mqtt events. + + This method is a coroutine. + """ + @callback def message_received(topic, payload, qos): """A new MQTT message has been received.""" - if value_template is not None: - payload = value_template.render_with_possible_json_value( + if self._template is not None: + payload = self._template.async_render_with_possible_json_value( payload) if payload == self._payload_lock: self._state = True - self.schedule_update_ha_state() elif payload == self._payload_unlock: self._state = False - self.schedule_update_ha_state() + + self.hass.async_add_job(self.async_update_ha_state()) if self._state_topic is None: # Force into optimistic mode. self._optimistic = True else: - mqtt.subscribe( - hass, self._state_topic, message_received, self._qos) + yield from mqtt.async_subscribe( + self.hass, self._state_topic, message_received, self._qos) @property def should_poll(self): @@ -114,20 +123,30 @@ class MqttLock(LockDevice): """Return true if we do optimistic updates.""" return self._optimistic - def lock(self, **kwargs): - """Lock the device.""" - mqtt.publish(self.hass, self._command_topic, self._payload_lock, - self._qos, self._retain) + @asyncio.coroutine + def async_lock(self, **kwargs): + """Lock the device. + + This method is a coroutine. + """ + mqtt.async_publish( + self.hass, self._command_topic, self._payload_lock, self._qos, + self._retain) if self._optimistic: # Optimistically assume that switch has changed state. self._state = True - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) - def unlock(self, **kwargs): - """Unlock the device.""" - mqtt.publish(self.hass, self._command_topic, self._payload_unlock, - self._qos, self._retain) + @asyncio.coroutine + def async_unlock(self, **kwargs): + """Unlock the device. + + This method is a coroutine. + """ + mqtt.async_publish( + self.hass, self._command_topic, self._payload_unlock, self._qos, + self._retain) if self._optimistic: # Optimistically assume that switch has changed state. self._state = False - self.schedule_update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) diff --git a/homeassistant/components/mqtt/__init__.py b/homeassistant/components/mqtt/__init__.py index 129759515a7..57ea0351168 100644 --- a/homeassistant/components/mqtt/__init__.py +++ b/homeassistant/components/mqtt/__init__.py @@ -172,10 +172,16 @@ def _build_publish_data(topic, qos, retain): def publish(hass, topic, payload, qos=None, retain=None): + """Publish message to an MQTT topic.""" + hass.add_job(async_publish, hass, topic, payload, qos, retain) + + +@callback +def async_publish(hass, topic, payload, qos=None, retain=None): """Publish message to an MQTT topic.""" data = _build_publish_data(topic, qos, retain) data[ATTR_PAYLOAD] = payload - hass.services.call(DOMAIN, SERVICE_PUBLISH, data) + hass.async_add_job(hass.services.async_call(DOMAIN, SERVICE_PUBLISH, data)) def publish_template(hass, topic, payload_template, qos=None, retain=None): @@ -387,6 +393,8 @@ class MQTT(object): self.progress = {} self.birth_message = birth_message self._mqttc = None + self._subscribe_lock = asyncio.Lock(loop=hass.loop) + self._publish_lock = asyncio.Lock(loop=hass.loop) if protocol == PROTOCOL_31: proto = mqtt.MQTTv31 @@ -426,8 +434,9 @@ class MQTT(object): This method must be run in the event loop and returns a coroutine. """ - yield from self.hass.loop.run_in_executor( - None, self._mqttc.publish, topic, payload, qos, retain) + with (yield from self._publish_lock): + yield from self.hass.loop.run_in_executor( + None, self._mqttc.publish, topic, payload, qos, retain) @asyncio.coroutine def async_connect(self): @@ -474,8 +483,10 @@ class MQTT(object): if topic in self.topics: return - result, mid = yield from self.hass.loop.run_in_executor( - None, self._mqttc.subscribe, topic, qos) + + with (yield from self._subscribe_lock): + result, mid = yield from self.hass.loop.run_in_executor( + None, self._mqttc.subscribe, topic, qos) _raise_on_error(result) self.progress[mid] = topic diff --git a/homeassistant/components/mqtt_eventstream.py b/homeassistant/components/mqtt_eventstream.py index 8632f8aa99d..c4a4b7bc4ab 100644 --- a/homeassistant/components/mqtt_eventstream.py +++ b/homeassistant/components/mqtt_eventstream.py @@ -4,10 +4,12 @@ Connect two Home Assistant instances via MQTT. For more details about this component, please refer to the documentation at https://home-assistant.io/components/mqtt_eventstream/ """ +import asyncio import json import voluptuous as vol +from homeassistant.core import callback import homeassistant.loader as loader from homeassistant.components.mqtt import ( valid_publish_topic, valid_subscribe_topic) @@ -36,13 +38,15 @@ CONFIG_SCHEMA = vol.Schema({ }, extra=vol.ALLOW_EXTRA) -def setup(hass, config): +@asyncio.coroutine +def async_setup(hass, config): """Setup the MQTT eventstream component.""" mqtt = loader.get_component('mqtt') conf = config.get(DOMAIN, {}) pub_topic = conf.get(CONF_PUBLISH_TOPIC) sub_topic = conf.get(CONF_SUBSCRIBE_TOPIC) + @callback def _event_publisher(event): """Handle events by publishing them on the MQTT queue.""" if event.origin != EventOrigin.local: @@ -81,13 +85,14 @@ def setup(hass, config): event_info = {'event_type': event.event_type, 'event_data': event.data} msg = json.dumps(event_info, cls=JSONEncoder) - mqtt.publish(hass, pub_topic, msg) + mqtt.async_publish(hass, pub_topic, msg) # Only listen for local events if you are going to publish them. if pub_topic: - hass.bus.listen(MATCH_ALL, _event_publisher) + hass.bus.async_listen(MATCH_ALL, _event_publisher) # Process events from a remote server that are received on a queue. + @callback def _event_receiver(topic, payload, qos): """Receive events published by and fire them on this hass instance.""" event = json.loads(payload) @@ -105,7 +110,7 @@ def setup(hass, config): if state: event_data[key] = state - hass.bus.fire( + hass.bus.async_fire( event_type, event_data=event_data, origin=EventOrigin.remote @@ -113,8 +118,7 @@ def setup(hass, config): # Only subscribe if you specified a topic. if sub_topic: - mqtt.subscribe(hass, sub_topic, _event_receiver) - - hass.states.set('{domain}.initialized'.format(domain=DOMAIN), True) + yield from mqtt.async_subscribe(hass, sub_topic, _event_receiver) + hass.states.async_set('{domain}.initialized'.format(domain=DOMAIN), True) return True diff --git a/homeassistant/components/sensor/mqtt.py b/homeassistant/components/sensor/mqtt.py index 797f8007dc1..a811d4e691c 100644 --- a/homeassistant/components/sensor/mqtt.py +++ b/homeassistant/components/sensor/mqtt.py @@ -4,6 +4,7 @@ Support for MQTT sensors. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/sensor.mqtt/ """ +import asyncio import logging import voluptuous as vol @@ -27,8 +28,8 @@ PLATFORM_SCHEMA = mqtt.MQTT_RO_PLATFORM_SCHEMA.extend({ }) -# pylint: disable=unused-argument -def setup_platform(hass, config, add_devices, discovery_info=None): +@asyncio.coroutine +def async_setup_platform(hass, config, async_add_devices, discovery_info=None): """Set up MQTT Sensor.""" if discovery_info is not None: config = PLATFORM_SCHEMA(discovery_info) @@ -36,8 +37,8 @@ def setup_platform(hass, config, add_devices, discovery_info=None): value_template = config.get(CONF_VALUE_TEMPLATE) if value_template is not None: value_template.hass = hass - add_devices([MqttSensor( - hass, + + yield from async_add_devices([MqttSensor( config.get(CONF_NAME), config.get(CONF_STATE_TOPIC), config.get(CONF_QOS), @@ -49,26 +50,32 @@ def setup_platform(hass, config, add_devices, discovery_info=None): class MqttSensor(Entity): """Representation of a sensor that can be updated using MQTT.""" - def __init__(self, hass, name, state_topic, qos, unit_of_measurement, + def __init__(self, name, state_topic, qos, unit_of_measurement, value_template): """Initialize the sensor.""" self._state = STATE_UNKNOWN - self._hass = hass self._name = name self._state_topic = state_topic self._qos = qos self._unit_of_measurement = unit_of_measurement + self._template = value_template + def async_added_to_hass(self): + """Subscribe mqtt events. + + This method must be run in the event loop and returns a coroutine. + """ @callback def message_received(topic, payload, qos): """A new MQTT message has been received.""" - if value_template is not None: - payload = value_template.async_render_with_possible_json_value( + if self._template is not None: + payload = self._template.async_render_with_possible_json_value( payload, self._state) self._state = payload - hass.async_add_job(self.async_update_ha_state()) + self.hass.async_add_job(self.async_update_ha_state()) - mqtt.subscribe(hass, self._state_topic, message_received, self._qos) + return mqtt.async_subscribe( + self.hass, self._state_topic, message_received, self._qos) @property def should_poll(self): diff --git a/homeassistant/components/sensor/mqtt_room.py b/homeassistant/components/sensor/mqtt_room.py index 4156f668093..ad615b5c890 100644 --- a/homeassistant/components/sensor/mqtt_room.py +++ b/homeassistant/components/sensor/mqtt_room.py @@ -4,12 +4,14 @@ Support for MQTT room presence detection. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/sensor.mqtt_room/ """ +import asyncio import logging import json from datetime import timedelta import voluptuous as vol +from homeassistant.core import callback import homeassistant.components.mqtt as mqtt from homeassistant.components.sensor import PLATFORM_SCHEMA from homeassistant.const import ( @@ -54,11 +56,10 @@ MQTT_PAYLOAD = vol.Schema(vol.All(json.loads, vol.Schema({ }, extra=vol.ALLOW_EXTRA))) -# pylint: disable=unused-argument -def setup_platform(hass, config, add_devices, discovery_info=None): +@asyncio.coroutine +def async_setup_platform(hass, config, async_add_devices, discovery_info=None): """Setup MQTT Sensor.""" - add_devices([MQTTRoomSensor( - hass, + yield from async_add_devices([MQTTRoomSensor( config.get(CONF_NAME), config.get(CONF_STATE_TOPIC), config.get(CONF_DEVICE_ID), @@ -70,11 +71,9 @@ def setup_platform(hass, config, add_devices, discovery_info=None): class MQTTRoomSensor(Entity): """Representation of a room sensor that is updated via MQTT.""" - def __init__(self, hass, name, state_topic, device_id, timeout, - consider_home): + def __init__(self, name, state_topic, device_id, timeout, consider_home): """Initialize the sensor.""" self._state = STATE_AWAY - self._hass = hass self._name = name self._state_topic = '{}{}'.format(state_topic, '/+') self._device_id = slugify(device_id).upper() @@ -85,13 +84,19 @@ class MQTTRoomSensor(Entity): self._distance = None self._updated = None + def async_added_to_hass(self): + """Subscribe mqtt events. + + This method must be run in the event loop and returns a coroutine. + """ + @callback def update_state(device_id, room, distance): """Update the sensor state.""" self._state = room self._distance = distance self._updated = dt.utcnow() - self.update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) def message_received(topic, payload, qos): """A new MQTT message has been received.""" @@ -117,7 +122,8 @@ class MQTTRoomSensor(Entity): or timediff.seconds >= self._timeout: update_state(**device) - mqtt.subscribe(hass, self._state_topic, message_received, 1) + return mqtt.async_subscribe( + self.hass, self._state_topic, message_received, 1) @property def name(self): diff --git a/homeassistant/components/switch/mqtt.py b/homeassistant/components/switch/mqtt.py index a7dfab62fb4..d0f2524e3de 100644 --- a/homeassistant/components/switch/mqtt.py +++ b/homeassistant/components/switch/mqtt.py @@ -4,6 +4,7 @@ Support for MQTT switches. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/switch.mqtt/ """ +import asyncio import logging import voluptuous as vol @@ -35,14 +36,14 @@ PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend({ }) -# pylint: disable=unused-argument -def setup_platform(hass, config, add_devices, discovery_info=None): +@asyncio.coroutine +def async_setup_platform(hass, config, async_add_devices, discovery_info=None): """Setup the MQTT switch.""" value_template = config.get(CONF_VALUE_TEMPLATE) if value_template is not None: value_template.hass = hass - add_devices([MqttSwitch( - hass, + + yield from async_add_devices([MqttSwitch( config.get(CONF_NAME), config.get(CONF_STATE_TOPIC), config.get(CONF_COMMAND_TOPIC), @@ -58,11 +59,10 @@ def setup_platform(hass, config, add_devices, discovery_info=None): class MqttSwitch(SwitchDevice): """Representation of a switch that can be toggled using MQTT.""" - def __init__(self, hass, name, state_topic, command_topic, qos, retain, + def __init__(self, name, state_topic, command_topic, qos, retain, payload_on, payload_off, optimistic, value_template): """Initialize the MQTT switch.""" self._state = False - self._hass = hass self._name = name self._state_topic = state_topic self._command_topic = command_topic @@ -71,26 +71,33 @@ class MqttSwitch(SwitchDevice): self._payload_on = payload_on self._payload_off = payload_off self._optimistic = optimistic + self._template = value_template + @asyncio.coroutine + def async_added_to_hass(self): + """Subscribe mqtt events. + + This method is a coroutine. + """ @callback def message_received(topic, payload, qos): """A new MQTT message has been received.""" - if value_template is not None: - payload = value_template.async_render_with_possible_json_value( + if self._template is not None: + payload = self._template.async_render_with_possible_json_value( payload) if payload == self._payload_on: self._state = True - hass.async_add_job(self.async_update_ha_state()) elif payload == self._payload_off: self._state = False - hass.async_add_job(self.async_update_ha_state()) + + self.hass.async_add_job(self.async_update_ha_state()) if self._state_topic is None: # Force into optimistic mode. self._optimistic = True else: - mqtt.subscribe( - hass, self._state_topic, message_received, self._qos) + yield from mqtt.async_subscribe( + self.hass, self._state_topic, message_received, self._qos) @property def should_poll(self): @@ -112,20 +119,30 @@ class MqttSwitch(SwitchDevice): """Return true if we do optimistic updates.""" return self._optimistic - def turn_on(self, **kwargs): - """Turn the device on.""" - mqtt.publish(self.hass, self._command_topic, self._payload_on, - self._qos, self._retain) + @asyncio.coroutine + def async_turn_on(self, **kwargs): + """Turn the device on. + + This method is a coroutine. + """ + mqtt.async_publish( + self.hass, self._command_topic, self._payload_on, self._qos, + self._retain) if self._optimistic: # Optimistically assume that switch has changed state. self._state = True - self.update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) - def turn_off(self, **kwargs): - """Turn the device off.""" - mqtt.publish(self.hass, self._command_topic, self._payload_off, - self._qos, self._retain) + @asyncio.coroutine + def async_turn_off(self, **kwargs): + """Turn the device off. + + This method is a coroutine. + """ + mqtt.async_publish( + self.hass, self._command_topic, self._payload_off, self._qos, + self._retain) if self._optimistic: # Optimistically assume that switch has changed state. self._state = False - self.update_ha_state() + self.hass.async_add_job(self.async_update_ha_state()) diff --git a/tests/components/test_mqtt_eventstream.py b/tests/components/test_mqtt_eventstream.py index 7ce65fc7827..c4e7f7fd673 100644 --- a/tests/components/test_mqtt_eventstream.py +++ b/tests/components/test_mqtt_eventstream.py @@ -57,7 +57,7 @@ class TestMqttEventStream(object): # Verify that the event handler has been added as a listener assert self.hass.bus.listeners.get('*') == 1 - @patch('homeassistant.components.mqtt.subscribe') + @patch('homeassistant.components.mqtt.async_subscribe') def test_subscribe(self, mock_sub): """"Test the subscription.""" sub_topic = 'foo' @@ -67,7 +67,7 @@ class TestMqttEventStream(object): # Verify that the this entity was subscribed to the topic mock_sub.assert_called_with(self.hass, sub_topic, ANY) - @patch('homeassistant.components.mqtt.publish') + @patch('homeassistant.components.mqtt.async_publish') @patch('homeassistant.core.dt_util.utcnow') def test_state_changed_event_sends_message(self, mock_utcnow, mock_pub): """"Test the sending of a new message if event changed.""" @@ -110,7 +110,7 @@ class TestMqttEventStream(object): # Verify that the message received was that expected assert json.loads(msg) == event - @patch('homeassistant.components.mqtt.publish') + @patch('homeassistant.components.mqtt.async_publish') def test_time_event_does_not_send_message(self, mock_pub): """"Test the sending of a new message if time event.""" assert self.add_eventstream(pub_topic='bar') @@ -147,7 +147,7 @@ class TestMqttEventStream(object): assert 1 == len(calls) - @patch('homeassistant.components.mqtt.publish') + @patch('homeassistant.components.mqtt.async_publish') def test_mqtt_received_event(self, mock_pub): """Don't filter events from the mqtt component about received message.