Convert mqtt platforms to async (#6145)

* Convert mqtt platforms to async

* fix lint

* add more platforms

* convert mqtt_eventstream

* fix lint / add mqtt_room

* fix lint

* fix test part 1

* fix test part 2

* fix out of memory bug

* address comments
This commit is contained in:
Pascal Vizeli 2017-02-22 09:43:22 +01:00 committed by GitHub
parent 65ed85c6eb
commit b0d3bbed79
15 changed files with 509 additions and 291 deletions

View file

@ -4,10 +4,12 @@ This platform enables the possibility to control a MQTT alarm.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/alarm_control_panel.mqtt/
"""
import asyncio
import logging
import voluptuous as vol
from homeassistant.core import callback
import homeassistant.components.alarm_control_panel as alarm
import homeassistant.components.mqtt as mqtt
from homeassistant.const import (
@ -41,10 +43,10 @@ PLATFORM_SCHEMA = mqtt.MQTT_BASE_PLATFORM_SCHEMA.extend({
})
def setup_platform(hass, config, add_devices, discovery_info=None):
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Setup the MQTT platform."""
add_devices([MqttAlarm(
hass,
yield from async_add_devices([MqttAlarm(
config.get(CONF_NAME),
config.get(CONF_STATE_TOPIC),
config.get(CONF_COMMAND_TOPIC),
@ -58,11 +60,10 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class MqttAlarm(alarm.AlarmControlPanel):
"""Representation of a MQTT alarm status."""
def __init__(self, hass, name, state_topic, command_topic, qos,
payload_disarm, payload_arm_home, payload_arm_away, code):
def __init__(self, name, state_topic, command_topic, qos, payload_disarm,
payload_arm_home, payload_arm_away, code):
"""Initalize the MQTT alarm panel."""
self._state = STATE_UNKNOWN
self._hass = hass
self._name = name
self._state_topic = state_topic
self._command_topic = command_topic
@ -72,6 +73,12 @@ class MqttAlarm(alarm.AlarmControlPanel):
self._payload_arm_away = payload_arm_away
self._code = code
def async_added_to_hass(self):
"""Subscribe mqtt events.
This method must be run in the event loop and returns a coroutine.
"""
@callback
def message_received(topic, payload, qos):
"""A new MQTT message has been received."""
if payload not in (STATE_ALARM_DISARMED, STATE_ALARM_ARMED_HOME,
@ -80,9 +87,10 @@ class MqttAlarm(alarm.AlarmControlPanel):
_LOGGER.warning('Received unexpected payload: %s', payload)
return
self._state = payload
self.update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
mqtt.subscribe(hass, self._state_topic, message_received, self._qos)
return mqtt.async_subscribe(
self.hass, self._state_topic, message_received, self._qos)
@property
def should_poll(self):
@ -104,26 +112,38 @@ class MqttAlarm(alarm.AlarmControlPanel):
"""One or more characters if code is defined."""
return None if self._code is None else '.+'
def alarm_disarm(self, code=None):
"""Send disarm command."""
@asyncio.coroutine
def async_alarm_disarm(self, code=None):
"""Send disarm command.
This method is a coroutine.
"""
if not self._validate_code(code, 'disarming'):
return
mqtt.publish(self.hass, self._command_topic,
self._payload_disarm, self._qos)
mqtt.async_publish(
self.hass, self._command_topic, self._payload_disarm, self._qos)
def alarm_arm_home(self, code=None):
"""Send arm home command."""
@asyncio.coroutine
def async_alarm_arm_home(self, code=None):
"""Send arm home command.
This method is a coroutine.
"""
if not self._validate_code(code, 'arming home'):
return
mqtt.publish(self.hass, self._command_topic,
self._payload_arm_home, self._qos)
mqtt.async_publish(
self.hass, self._command_topic, self._payload_arm_home, self._qos)
def alarm_arm_away(self, code=None):
"""Send arm away command."""
@asyncio.coroutine
def async_alarm_arm_away(self, code=None):
"""Send arm away command.
This method is a coroutine.
"""
if not self._validate_code(code, 'arming away'):
return
mqtt.publish(self.hass, self._command_topic,
self._payload_arm_away, self._qos)
mqtt.async_publish(
self.hass, self._command_topic, self._payload_arm_away, self._qos)
def _validate_code(self, code, state):
"""Validate given code."""

View file

@ -62,7 +62,8 @@ def is_on(hass, entity_id):
def turn_on(hass, entity_id):
"""Reset the alert."""
run_callback_threadsafe(hass.loop, async_turn_on, hass, entity_id)
run_callback_threadsafe(
hass.loop, async_turn_on, hass, entity_id).result()
@callback
@ -75,7 +76,8 @@ def async_turn_on(hass, entity_id):
def turn_off(hass, entity_id):
"""Acknowledge alert."""
run_callback_threadsafe(hass.loop, async_turn_off, hass, entity_id)
run_callback_threadsafe(
hass.loop, async_turn_off, hass, entity_id).result()
@callback

View file

@ -4,6 +4,7 @@ Support for MQTT binary sensors.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/binary_sensor.mqtt/
"""
import asyncio
import logging
import voluptuous as vol
@ -35,8 +36,8 @@ PLATFORM_SCHEMA = mqtt.MQTT_RO_PLATFORM_SCHEMA.extend({
})
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Set up the MQTT binary sensor."""
if discovery_info is not None:
config = PLATFORM_SCHEMA(discovery_info)
@ -44,8 +45,8 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
value_template = config.get(CONF_VALUE_TEMPLATE)
if value_template is not None:
value_template.hass = hass
add_devices([MqttBinarySensor(
hass,
yield from async_add_devices([MqttBinarySensor(
config.get(CONF_NAME),
config.get(CONF_STATE_TOPIC),
get_deprecated(config, CONF_DEVICE_CLASS, CONF_SENSOR_CLASS),
@ -59,10 +60,9 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class MqttBinarySensor(BinarySensorDevice):
"""Representation a binary sensor that is updated by MQTT."""
def __init__(self, hass, name, state_topic, device_class, qos, payload_on,
def __init__(self, name, state_topic, device_class, qos, payload_on,
payload_off, value_template):
"""Initialize the MQTT binary sensor."""
self._hass = hass
self._name = name
self._state = False
self._state_topic = state_topic
@ -70,21 +70,28 @@ class MqttBinarySensor(BinarySensorDevice):
self._payload_on = payload_on
self._payload_off = payload_off
self._qos = qos
self._template = value_template
def async_added_to_hass(self):
"""Subscribe mqtt events.
This method must be run in the event loop and returns a coroutine.
"""
@callback
def message_received(topic, payload, qos):
"""A new MQTT message has been received."""
if value_template is not None:
payload = value_template.async_render_with_possible_json_value(
if self._template is not None:
payload = self._template.async_render_with_possible_json_value(
payload)
if payload == self._payload_on:
self._state = True
hass.async_add_job(self.async_update_ha_state())
elif payload == self._payload_off:
self._state = False
hass.async_add_job(self.async_update_ha_state())
mqtt.subscribe(hass, self._state_topic, message_received, self._qos)
self.hass.async_add_job(self.async_update_ha_state())
return mqtt.async_subscribe(
self.hass, self._state_topic, message_received, self._qos)
@property
def should_poll(self):

View file

@ -4,6 +4,7 @@ Support for MQTT cover devices.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/cover.mqtt/
"""
import asyncio
import logging
import voluptuous as vol
@ -46,13 +47,14 @@ PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend({
})
def setup_platform(hass, config, add_devices, discovery_info=None):
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Setup the MQTT Cover."""
value_template = config.get(CONF_VALUE_TEMPLATE)
if value_template is not None:
value_template.hass = hass
add_devices([MqttCover(
hass,
yield from async_add_devices([MqttCover(
config.get(CONF_NAME),
config.get(CONF_STATE_TOPIC),
config.get(CONF_COMMAND_TOPIC),
@ -71,13 +73,12 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class MqttCover(CoverDevice):
"""Representation of a cover that can be controlled using MQTT."""
def __init__(self, hass, name, state_topic, command_topic, qos,
retain, state_open, state_closed, payload_open, payload_close,
def __init__(self, name, state_topic, command_topic, qos, retain,
state_open, state_closed, payload_open, payload_close,
payload_stop, optimistic, value_template):
"""Initialize the cover."""
self._position = None
self._state = None
self._hass = hass
self._name = name
self._state_topic = state_topic
self._command_topic = command_topic
@ -89,37 +90,45 @@ class MqttCover(CoverDevice):
self._state_closed = state_closed
self._retain = retain
self._optimistic = optimistic or state_topic is None
self._template = value_template
@asyncio.coroutine
def async_added_to_hass(self):
"""Subscribe mqtt events.
This method is a coroutine.
"""
@callback
def message_received(topic, payload, qos):
"""A new MQTT message has been received."""
if value_template is not None:
payload = value_template.async_render_with_possible_json_value(
if self._template is not None:
payload = self._template.async_render_with_possible_json_value(
payload)
if payload == self._state_open:
self._state = False
hass.async_add_job(self.async_update_ha_state())
elif payload == self._state_closed:
self._state = True
hass.async_add_job(self.async_update_ha_state())
elif payload.isnumeric() and 0 <= int(payload) <= 100:
if int(payload) > 0:
self._state = False
else:
self._state = True
self._position = int(payload)
hass.async_add_job(self.async_update_ha_state())
else:
_LOGGER.warning(
"Payload is not True, False, or integer (0-100): %s",
payload)
return
self.hass.async_add_job(self.async_update_ha_state())
if self._state_topic is None:
# Force into optimistic mode.
self._optimistic = True
else:
mqtt.subscribe(hass, self._state_topic, message_received,
self._qos)
yield from mqtt.async_subscribe(
self.hass, self._state_topic, message_received, self._qos)
@property
def should_poll(self):
@ -144,25 +153,40 @@ class MqttCover(CoverDevice):
"""
return self._position
def open_cover(self, **kwargs):
"""Move the cover up."""
mqtt.publish(self.hass, self._command_topic, self._payload_open,
self._qos, self._retain)
@asyncio.coroutine
def async_open_cover(self, **kwargs):
"""Move the cover up.
This method is a coroutine.
"""
mqtt.async_publish(
self.hass, self._command_topic, self._payload_open, self._qos,
self._retain)
if self._optimistic:
# Optimistically assume that cover has changed state.
self._state = False
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
def close_cover(self, **kwargs):
"""Move the cover down."""
mqtt.publish(self.hass, self._command_topic, self._payload_close,
self._qos, self._retain)
@asyncio.coroutine
def async_close_cover(self, **kwargs):
"""Move the cover down.
This method is a coroutine.
"""
mqtt.async_publish(
self.hass, self._command_topic, self._payload_close, self._qos,
self._retain)
if self._optimistic:
# Optimistically assume that cover has changed state.
self._state = True
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
def stop_cover(self, **kwargs):
"""Stop the device."""
mqtt.publish(self.hass, self._command_topic, self._payload_stop,
self._qos, self._retain)
@asyncio.coroutine
def async_stop_cover(self, **kwargs):
"""Stop the device.
This method is a coroutine.
"""
mqtt.async_publish(
self.hass, self._command_topic, self._payload_stop, self._qos,
self._retain)

View file

@ -4,10 +4,12 @@ Support for MQTT fans.
For more details about this platform, please refer to the documentation
https://home-assistant.io/components/fan.mqtt/
"""
import asyncio
import logging
import voluptuous as vol
from homeassistant.core import callback
import homeassistant.components.mqtt as mqtt
from homeassistant.const import (
CONF_NAME, CONF_OPTIMISTIC, CONF_STATE, STATE_ON, STATE_OFF,
@ -73,11 +75,10 @@ PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend({
})
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Setup MQTT fan platform."""
add_devices([MqttFan(
hass,
yield from async_add_devices([MqttFan(
config.get(CONF_NAME),
{
key: config.get(key) for key in (
@ -113,15 +114,15 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class MqttFan(FanEntity):
"""A MQTT fan component."""
def __init__(self, hass, name, topic, templates, qos, retain, payload,
def __init__(self, name, topic, templates, qos, retain, payload,
speed_list, optimistic):
"""Initialize the MQTT fan."""
self._hass = hass
self._name = name
self._topic = topic
self._qos = qos
self._retain = retain
self._payload = payload
self._templates = templates
self._speed_list = speed_list
self._optimistic = optimistic or topic[CONF_STATE_TOPIC] is None
self._optimistic_oscillation = (
@ -129,19 +130,29 @@ class MqttFan(FanEntity):
self._optimistic_speed = (
optimistic or topic[CONF_SPEED_STATE_TOPIC] is None)
self._state = False
self._speed = None
self._oscillation = None
self._supported_features = 0
self._supported_features |= (topic[CONF_OSCILLATION_STATE_TOPIC]
is not None and SUPPORT_OSCILLATE)
self._supported_features |= (topic[CONF_SPEED_STATE_TOPIC]
is not None and SUPPORT_SET_SPEED)
for key, tpl in list(templates.items()):
@asyncio.coroutine
def async_added_to_hass(self):
"""Subscribe mqtt events.
This method is a coroutine.
"""
templates = {}
for key, tpl in list(self._templates.items()):
if tpl is None:
templates[key] = lambda value: value
else:
tpl.hass = hass
templates[key] = tpl.render_with_possible_json_value
tpl.hass = self.hass
templates[key] = tpl.async_render_with_possible_json_value
@callback
def state_received(topic, payload, qos):
"""A new MQTT message has been received."""
payload = templates[CONF_STATE](payload)
@ -149,13 +160,14 @@ class MqttFan(FanEntity):
self._state = True
elif payload == self._payload[STATE_OFF]:
self._state = False
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
if self._topic[CONF_STATE_TOPIC] is not None:
mqtt.subscribe(self._hass, self._topic[CONF_STATE_TOPIC],
state_received, self._qos)
yield from mqtt.async_subscribe(
self.hass, self._topic[CONF_STATE_TOPIC], state_received,
self._qos)
@callback
def speed_received(topic, payload, qos):
"""A new MQTT message for the speed has been received."""
payload = templates[ATTR_SPEED](payload)
@ -165,17 +177,15 @@ class MqttFan(FanEntity):
self._speed = SPEED_MEDIUM
elif payload == self._payload[SPEED_HIGH]:
self._speed = SPEED_HIGH
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
if self._topic[CONF_SPEED_STATE_TOPIC] is not None:
mqtt.subscribe(self._hass, self._topic[CONF_SPEED_STATE_TOPIC],
speed_received, self._qos)
self._speed = SPEED_OFF
elif self._topic[CONF_SPEED_COMMAND_TOPIC] is not None:
self._speed = SPEED_OFF
else:
self._speed = SPEED_OFF
yield from mqtt.async_subscribe(
self.hass, self._topic[CONF_SPEED_STATE_TOPIC], speed_received,
self._qos)
self._speed = SPEED_OFF
@callback
def oscillation_received(topic, payload, qos):
"""A new MQTT message has been received."""
payload = templates[OSCILLATION](payload)
@ -183,17 +193,13 @@ class MqttFan(FanEntity):
self._oscillation = True
elif payload == self._payload[OSCILLATE_OFF_PAYLOAD]:
self._oscillation = False
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
if self._topic[CONF_OSCILLATION_STATE_TOPIC] is not None:
mqtt.subscribe(self._hass,
self._topic[CONF_OSCILLATION_STATE_TOPIC],
oscillation_received, self._qos)
self._oscillation = False
if self._topic[CONF_OSCILLATION_COMMAND_TOPIC] is not None:
self._oscillation = False
else:
self._oscillation = False
yield from mqtt.async_subscribe(
self.hass, self._topic[CONF_OSCILLATION_STATE_TOPIC],
oscillation_received, self._qos)
self._oscillation = False
@property
def should_poll(self):
@ -235,43 +241,72 @@ class MqttFan(FanEntity):
"""Return the oscillation state."""
return self._oscillation
def turn_on(self, speed: str=None) -> None:
"""Turn on the entity."""
mqtt.publish(self._hass, self._topic[CONF_COMMAND_TOPIC],
self._payload[STATE_ON], self._qos, self._retain)
@asyncio.coroutine
def async_turn_on(self, speed: str=None) -> None:
"""Turn on the entity.
This method is a coroutine.
"""
mqtt.async_publish(
self.hass, self._topic[CONF_COMMAND_TOPIC],
self._payload[STATE_ON], self._qos, self._retain)
if speed:
self.set_speed(speed)
yield from self.async_set_speed(speed)
def turn_off(self) -> None:
"""Turn off the entity."""
mqtt.publish(self._hass, self._topic[CONF_COMMAND_TOPIC],
self._payload[STATE_OFF], self._qos, self._retain)
@asyncio.coroutine
def async_turn_off(self) -> None:
"""Turn off the entity.
def set_speed(self, speed: str) -> None:
"""Set the speed of the fan."""
if self._topic[CONF_SPEED_COMMAND_TOPIC] is not None:
mqtt_payload = SPEED_OFF
if speed == SPEED_LOW:
mqtt_payload = self._payload[SPEED_LOW]
elif speed == SPEED_MEDIUM:
mqtt_payload = self._payload[SPEED_MEDIUM]
elif speed == SPEED_HIGH:
mqtt_payload = self._payload[SPEED_HIGH]
else:
mqtt_payload = speed
This method is a coroutine.
"""
mqtt.async_publish(
self.hass, self._topic[CONF_COMMAND_TOPIC],
self._payload[STATE_OFF], self._qos, self._retain)
@asyncio.coroutine
def async_set_speed(self, speed: str) -> None:
"""Set the speed of the fan.
This method is a coroutine.
"""
if self._topic[CONF_SPEED_COMMAND_TOPIC] is None:
return
if speed == SPEED_LOW:
mqtt_payload = self._payload[SPEED_LOW]
elif speed == SPEED_MEDIUM:
mqtt_payload = self._payload[SPEED_MEDIUM]
elif speed == SPEED_HIGH:
mqtt_payload = self._payload[SPEED_HIGH]
else:
mqtt_payload = speed
mqtt.async_publish(
self.hass, self._topic[CONF_SPEED_COMMAND_TOPIC],
mqtt_payload, self._qos, self._retain)
if self._optimistic_speed:
self._speed = speed
mqtt.publish(self._hass, self._topic[CONF_SPEED_COMMAND_TOPIC],
mqtt_payload, self._qos, self._retain)
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
def oscillate(self, oscillating: bool) -> None:
"""Set oscillation."""
if self._topic[CONF_OSCILLATION_COMMAND_TOPIC] is not None:
self._oscillation = oscillating
@asyncio.coroutine
def async_oscillate(self, oscillating: bool) -> None:
"""Set oscillation.
This method is a coroutine.
"""
if self._topic[CONF_OSCILLATION_COMMAND_TOPIC] is None:
return
if oscillating is False:
payload = self._payload[OSCILLATE_OFF_PAYLOAD]
else:
payload = self._payload[OSCILLATE_ON_PAYLOAD]
if oscillating is False:
payload = self._payload[OSCILLATE_OFF_PAYLOAD]
mqtt.publish(self._hass,
self._topic[CONF_OSCILLATION_COMMAND_TOPIC],
payload, self._qos, self._retain)
self.schedule_update_ha_state()
mqtt.async_publish(
self.hass, self._topic[CONF_OSCILLATION_COMMAND_TOPIC],
payload, self._qos, self._retain)
if self._optimistic_oscillation:
self._oscillation = oscillating
self.hass.async_add_job(self.async_update_ha_state())

View file

@ -4,10 +4,12 @@ Support for MQTT lights.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/light.mqtt/
"""
import asyncio
import logging
import voluptuous as vol
from homeassistant.core import callback
import homeassistant.components.mqtt as mqtt
from homeassistant.components.light import (
ATTR_BRIGHTNESS, ATTR_RGB_COLOR, ATTR_COLOR_TEMP, SUPPORT_BRIGHTNESS,
@ -62,12 +64,13 @@ PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend({
})
def setup_platform(hass, config, add_devices, discovery_info=None):
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Add MQTT Light."""
config.setdefault(CONF_STATE_VALUE_TEMPLATE,
config.get(CONF_VALUE_TEMPLATE))
add_devices([MqttLight(
hass,
config.setdefault(
CONF_STATE_VALUE_TEMPLATE, config.get(CONF_VALUE_TEMPLATE))
yield from async_add_devices([MqttLight(
config.get(CONF_NAME),
{
key: config.get(key) for key in (
@ -101,15 +104,15 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class MqttLight(Light):
"""MQTT light."""
def __init__(self, hass, name, topic, templates, qos, retain, payload,
def __init__(self, name, topic, templates, qos, retain, payload,
optimistic, brightness_scale):
"""Initialize MQTT light."""
self._hass = hass
self._name = name
self._topic = topic
self._qos = qos
self._retain = retain
self._payload = payload
self._templates = templates
self._optimistic = optimistic or topic[CONF_STATE_TOPIC] is None
self._optimistic_rgb = \
optimistic or topic[CONF_RGB_STATE_TOPIC] is None
@ -119,6 +122,9 @@ class MqttLight(Light):
optimistic or topic[CONF_COLOR_TEMP_STATE_TOPIC] is None)
self._brightness_scale = brightness_scale
self._state = False
self._brightness = None
self._rgb = None
self._color_temp = None
self._supported_features = 0
self._supported_features |= (
topic[CONF_RGB_COMMAND_TOPIC] is not None and SUPPORT_RGB_COLOR)
@ -129,13 +135,21 @@ class MqttLight(Light):
topic[CONF_COLOR_TEMP_COMMAND_TOPIC] is not None and
SUPPORT_COLOR_TEMP)
for key, tpl in list(templates.items()):
@asyncio.coroutine
def async_added_to_hass(self):
"""Subscribe mqtt events.
This method is a coroutine.
"""
templates = {}
for key, tpl in list(self._templates.items()):
if tpl is None:
templates[key] = lambda value: value
else:
tpl.hass = hass
templates[key] = tpl.render_with_possible_json_value
tpl.hass = self.hass
templates[key] = tpl.async_render_with_possible_json_value
@callback
def state_received(topic, payload, qos):
"""A new MQTT message has been received."""
payload = templates[CONF_STATE](payload)
@ -143,23 +157,24 @@ class MqttLight(Light):
self._state = True
elif payload == self._payload['off']:
self._state = False
self.update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
if self._topic[CONF_STATE_TOPIC] is not None:
mqtt.subscribe(self._hass, self._topic[CONF_STATE_TOPIC],
state_received, self._qos)
yield from mqtt.async_subscribe(
self.hass, self._topic[CONF_STATE_TOPIC], state_received,
self._qos)
@callback
def brightness_received(topic, payload, qos):
"""A new MQTT message for the brightness has been received."""
device_value = float(templates[CONF_BRIGHTNESS](payload))
percent_bright = device_value / self._brightness_scale
self._brightness = int(percent_bright * 255)
self.update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
if self._topic[CONF_BRIGHTNESS_STATE_TOPIC] is not None:
mqtt.subscribe(
self._hass, self._topic[CONF_BRIGHTNESS_STATE_TOPIC],
yield from mqtt.async_subscribe(
self.hass, self._topic[CONF_BRIGHTNESS_STATE_TOPIC],
brightness_received, self._qos)
self._brightness = 255
elif self._topic[CONF_BRIGHTNESS_COMMAND_TOPIC] is not None:
@ -167,29 +182,32 @@ class MqttLight(Light):
else:
self._brightness = None
@callback
def rgb_received(topic, payload, qos):
"""A new MQTT message has been received."""
self._rgb = [int(val) for val in
templates[CONF_RGB](payload).split(',')]
self.update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
if self._topic[CONF_RGB_STATE_TOPIC] is not None:
mqtt.subscribe(self._hass, self._topic[CONF_RGB_STATE_TOPIC],
rgb_received, self._qos)
yield from mqtt.async_subscribe(
self.hass, self._topic[CONF_RGB_STATE_TOPIC], rgb_received,
self._qos)
self._rgb = [255, 255, 255]
if self._topic[CONF_RGB_COMMAND_TOPIC] is not None:
self._rgb = [255, 255, 255]
else:
self._rgb = None
@callback
def color_temp_received(topic, payload, qos):
"""A new MQTT message for color temp has been received."""
self._color_temp = int(templates[CONF_COLOR_TEMP](payload))
self.update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
if self._topic[CONF_COLOR_TEMP_STATE_TOPIC] is not None:
mqtt.subscribe(
self._hass, self._topic[CONF_COLOR_TEMP_STATE_TOPIC],
yield from mqtt.async_subscribe(
self.hass, self._topic[CONF_COLOR_TEMP_STATE_TOPIC],
color_temp_received, self._qos)
self._color_temp = 150
if self._topic[CONF_COLOR_TEMP_COMMAND_TOPIC] is not None:
@ -237,16 +255,21 @@ class MqttLight(Light):
"""Flag supported features."""
return self._supported_features
def turn_on(self, **kwargs):
"""Turn the device on."""
@asyncio.coroutine
def async_turn_on(self, **kwargs):
"""Turn the device on.
This method is a coroutine.
"""
should_update = False
if ATTR_RGB_COLOR in kwargs and \
self._topic[CONF_RGB_COMMAND_TOPIC] is not None:
mqtt.publish(self._hass, self._topic[CONF_RGB_COMMAND_TOPIC],
'{},{},{}'.format(*kwargs[ATTR_RGB_COLOR]),
self._qos, self._retain)
mqtt.async_publish(
self.hass, self._topic[CONF_RGB_COMMAND_TOPIC],
'{},{},{}'.format(*kwargs[ATTR_RGB_COLOR]), self._qos,
self._retain)
if self._optimistic_rgb:
self._rgb = kwargs[ATTR_RGB_COLOR]
@ -256,8 +279,8 @@ class MqttLight(Light):
self._topic[CONF_BRIGHTNESS_COMMAND_TOPIC] is not None:
percent_bright = float(kwargs[ATTR_BRIGHTNESS]) / 255
device_brightness = int(percent_bright * self._brightness_scale)
mqtt.publish(
self._hass, self._topic[CONF_BRIGHTNESS_COMMAND_TOPIC],
mqtt.async_publish(
self.hass, self._topic[CONF_BRIGHTNESS_COMMAND_TOPIC],
device_brightness, self._qos, self._retain)
if self._optimistic_brightness:
@ -267,15 +290,16 @@ class MqttLight(Light):
if ATTR_COLOR_TEMP in kwargs and \
self._topic[CONF_COLOR_TEMP_COMMAND_TOPIC] is not None:
color_temp = int(kwargs[ATTR_COLOR_TEMP])
mqtt.publish(
self._hass, self._topic[CONF_COLOR_TEMP_COMMAND_TOPIC],
mqtt.async_publish(
self.hass, self._topic[CONF_COLOR_TEMP_COMMAND_TOPIC],
color_temp, self._qos, self._retain)
if self._optimistic_color_temp:
self._color_temp = kwargs[ATTR_COLOR_TEMP]
should_update = True
mqtt.publish(self._hass, self._topic[CONF_COMMAND_TOPIC],
self._payload['on'], self._qos, self._retain)
mqtt.async_publish(
self.hass, self._topic[CONF_COMMAND_TOPIC], self._payload['on'],
self._qos, self._retain)
if self._optimistic:
# Optimistically assume that switch has changed state.
@ -283,14 +307,19 @@ class MqttLight(Light):
should_update = True
if should_update:
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
def turn_off(self, **kwargs):
"""Turn the device off."""
mqtt.publish(self._hass, self._topic[CONF_COMMAND_TOPIC],
self._payload['off'], self._qos, self._retain)
@asyncio.coroutine
def async_turn_off(self, **kwargs):
"""Turn the device off.
This method is a coroutine.
"""
mqtt.async_publish(
self.hass, self._topic[CONF_COMMAND_TOPIC], self._payload['off'],
self._qos, self._retain)
if self._optimistic:
# Optimistically assume that switch has changed state.
self._state = False
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())

View file

@ -4,11 +4,12 @@ Support for MQTT JSON lights.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/light.mqtt_json/
"""
import asyncio
import logging
import json
import voluptuous as vol
from homeassistant.core import callback
import homeassistant.components.mqtt as mqtt
from homeassistant.components.light import (
ATTR_BRIGHTNESS, ATTR_RGB_COLOR, ATTR_TRANSITION, PLATFORM_SCHEMA,
@ -57,10 +58,10 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
})
def setup_platform(hass, config, add_devices, discovery_info=None):
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Setup a MQTT JSON Light."""
add_devices([MqttJson(
hass,
yield from async_add_devices([MqttJson(
config.get(CONF_NAME),
{
key: config.get(key) for key in (
@ -85,10 +86,9 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class MqttJson(Light):
"""Representation of a MQTT JSON light."""
def __init__(self, hass, name, topic, qos, retain,
optimistic, brightness, rgb, flash_times):
def __init__(self, name, topic, qos, retain, optimistic, brightness, rgb,
flash_times):
"""Initialize MQTT JSON light."""
self._hass = hass
self._name = name
self._topic = topic
self._qos = qos
@ -107,6 +107,13 @@ class MqttJson(Light):
self._flash_times = flash_times
@asyncio.coroutine
def async_added_to_hass(self):
"""Subscribe mqtt events.
This method is a coroutine.
"""
@callback
def state_received(topic, payload, qos):
"""A new MQTT message has been received."""
values = json.loads(payload)
@ -136,11 +143,12 @@ class MqttJson(Light):
except ValueError:
_LOGGER.warning('Invalid brightness value received')
self.update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
if self._topic[CONF_STATE_TOPIC] is not None:
mqtt.subscribe(self._hass, self._topic[CONF_STATE_TOPIC],
state_received, self._qos)
yield from mqtt.async_subscribe(
self.hass, self._topic[CONF_STATE_TOPIC], state_received,
self._qos)
@property
def brightness(self):
@ -177,8 +185,12 @@ class MqttJson(Light):
"""Flag supported features."""
return SUPPORT_MQTT_JSON
def turn_on(self, **kwargs):
"""Turn the device on."""
@asyncio.coroutine
def async_turn_on(self, **kwargs):
"""Turn the device on.
This method is a coroutine.
"""
should_update = False
message = {'state': 'ON'}
@ -212,8 +224,9 @@ class MqttJson(Light):
self._brightness = kwargs[ATTR_BRIGHTNESS]
should_update = True
mqtt.publish(self._hass, self._topic[CONF_COMMAND_TOPIC],
json.dumps(message), self._qos, self._retain)
mqtt.async_publish(
self.hass, self._topic[CONF_COMMAND_TOPIC], json.dumps(message),
self._qos, self._retain)
if self._optimistic:
# Optimistically assume that the light has changed state.
@ -221,19 +234,24 @@ class MqttJson(Light):
should_update = True
if should_update:
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
def turn_off(self, **kwargs):
"""Turn the device off."""
@asyncio.coroutine
def async_turn_off(self, **kwargs):
"""Turn the device off.
This method is a coroutine.
"""
message = {'state': 'OFF'}
if ATTR_TRANSITION in kwargs:
message['transition'] = kwargs[ATTR_TRANSITION]
mqtt.publish(self._hass, self._topic[CONF_COMMAND_TOPIC],
json.dumps(message), self._qos, self._retain)
mqtt.async_publish(
self.hass, self._topic[CONF_COMMAND_TOPIC], json.dumps(message),
self._qos, self._retain)
if self._optimistic:
# Optimistically assume that the light has changed state.
self._state = False
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())

View file

@ -4,10 +4,11 @@ Support for MQTT Template lights.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/light.mqtt_template/
"""
import asyncio
import logging
import voluptuous as vol
from homeassistant.core import callback
import homeassistant.components.mqtt as mqtt
from homeassistant.components.light import (
ATTR_BRIGHTNESS, ATTR_EFFECT, ATTR_FLASH, ATTR_RGB_COLOR, ATTR_TRANSITION,
@ -60,9 +61,10 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
})
def setup_platform(hass, config, add_devices, discovery_info=None):
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Setup a MQTT Template light."""
add_devices([MqttTemplate(
yield from async_add_devices([MqttTemplate(
hass,
config.get(CONF_NAME),
config.get(CONF_EFFECT_LIST),
@ -96,14 +98,10 @@ class MqttTemplate(Light):
def __init__(self, hass, name, effect_list, topics, templates, optimistic,
qos, retain):
"""Initialize MQTT Template light."""
self._hass = hass
self._name = name
self._effect_list = effect_list
self._topics = topics
self._templates = templates
for tpl in self._templates.values():
if tpl is not None:
tpl.hass = hass
self._optimistic = optimistic or topics[CONF_STATE_TOPIC] is None \
or templates[CONF_STATE_TEMPLATE] is None
self._qos = qos
@ -124,11 +122,23 @@ class MqttTemplate(Light):
self._rgb = None
self._effect = None
# init hass to template
for tpl in self._templates.values():
if tpl is not None:
tpl.hass = hass
@asyncio.coroutine
def async_added_to_hass(self):
"""Subscribe mqtt events.
This method is a coroutine.
"""
@callback
def state_received(topic, payload, qos):
"""A new MQTT message has been received."""
# read state
state = self._templates[CONF_STATE_TEMPLATE].\
render_with_possible_json_value(payload)
async_render_with_possible_json_value(payload)
if state == STATE_ON:
self._state = True
elif state == STATE_OFF:
@ -141,7 +151,7 @@ class MqttTemplate(Light):
try:
self._brightness = int(
self._templates[CONF_BRIGHTNESS_TEMPLATE].
render_with_possible_json_value(payload)
async_render_with_possible_json_value(payload)
)
except ValueError:
_LOGGER.warning('Invalid brightness value received')
@ -151,20 +161,20 @@ class MqttTemplate(Light):
try:
self._rgb[0] = int(
self._templates[CONF_RED_TEMPLATE].
render_with_possible_json_value(payload))
async_render_with_possible_json_value(payload))
self._rgb[1] = int(
self._templates[CONF_GREEN_TEMPLATE].
render_with_possible_json_value(payload))
async_render_with_possible_json_value(payload))
self._rgb[2] = int(
self._templates[CONF_BLUE_TEMPLATE].
render_with_possible_json_value(payload))
async_render_with_possible_json_value(payload))
except ValueError:
_LOGGER.warning('Invalid color value received')
# read effect
if self._templates[CONF_EFFECT_TEMPLATE] is not None:
effect = self._templates[CONF_EFFECT_TEMPLATE].\
render_with_possible_json_value(payload)
async_render_with_possible_json_value(payload)
# validate effect value
if effect in self._effect_list:
@ -172,11 +182,12 @@ class MqttTemplate(Light):
else:
_LOGGER.warning('Unsupported effect value received')
self.update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
if self._topics[CONF_STATE_TOPIC] is not None:
mqtt.subscribe(self._hass, self._topics[CONF_STATE_TOPIC],
state_received, self._qos)
yield from mqtt.async_subscribe(
self.hass, self._topics[CONF_STATE_TOPIC], state_received,
self._qos)
@property
def brightness(self):
@ -221,8 +232,12 @@ class MqttTemplate(Light):
"""Return the current effect."""
return self._effect
def turn_on(self, **kwargs):
"""Turn the entity on."""
@asyncio.coroutine
def async_turn_on(self, **kwargs):
"""Turn the entity on.
This method is a coroutine.
"""
# state
values = {'state': True}
if self._optimistic:
@ -256,17 +271,21 @@ class MqttTemplate(Light):
if ATTR_TRANSITION in kwargs:
values['transition'] = kwargs[ATTR_TRANSITION]
mqtt.publish(
self._hass, self._topics[CONF_COMMAND_TOPIC],
self._templates[CONF_COMMAND_ON_TEMPLATE].render(**values),
mqtt.async_publish(
self.hass, self._topics[CONF_COMMAND_TOPIC],
self._templates[CONF_COMMAND_ON_TEMPLATE].async_render(**values),
self._qos, self._retain
)
if self._optimistic:
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
def turn_off(self, **kwargs):
"""Turn the entity off."""
@asyncio.coroutine
def async_turn_off(self, **kwargs):
"""Turn the entity off.
This method is a coroutine.
"""
# state
values = {'state': False}
if self._optimistic:
@ -276,14 +295,14 @@ class MqttTemplate(Light):
if ATTR_TRANSITION in kwargs:
values['transition'] = kwargs[ATTR_TRANSITION]
mqtt.publish(
self._hass, self._topics[CONF_COMMAND_TOPIC],
self._templates[CONF_COMMAND_OFF_TEMPLATE].render(**values),
mqtt.async_publish(
self.hass, self._topics[CONF_COMMAND_TOPIC],
self._templates[CONF_COMMAND_OFF_TEMPLATE].async_render(**values),
self._qos, self._retain
)
if self._optimistic:
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
@property
def supported_features(self):

View file

@ -4,10 +4,12 @@ Support for MQTT locks.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/lock.mqtt/
"""
import asyncio
import logging
import voluptuous as vol
from homeassistant.core import callback
from homeassistant.components.lock import LockDevice
from homeassistant.components.mqtt import (
CONF_STATE_TOPIC, CONF_COMMAND_TOPIC, CONF_QOS, CONF_RETAIN)
@ -38,14 +40,14 @@ PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend({
})
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Setup the MQTT lock."""
value_template = config.get(CONF_VALUE_TEMPLATE)
if value_template is not None:
value_template.hass = hass
add_devices([MqttLock(
hass,
yield from async_add_devices([MqttLock(
config.get(CONF_NAME),
config.get(CONF_STATE_TOPIC),
config.get(CONF_COMMAND_TOPIC),
@ -61,11 +63,10 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class MqttLock(LockDevice):
"""Represents a lock that can be toggled using MQTT."""
def __init__(self, hass, name, state_topic, command_topic, qos, retain,
def __init__(self, name, state_topic, command_topic, qos, retain,
payload_lock, payload_unlock, optimistic, value_template):
"""Initialize the lock."""
self._state = False
self._hass = hass
self._name = name
self._state_topic = state_topic
self._command_topic = command_topic
@ -74,25 +75,33 @@ class MqttLock(LockDevice):
self._payload_lock = payload_lock
self._payload_unlock = payload_unlock
self._optimistic = optimistic
self._template = value_template
@asyncio.coroutine
def async_added_to_hass(self):
"""Subscribe mqtt events.
This method is a coroutine.
"""
@callback
def message_received(topic, payload, qos):
"""A new MQTT message has been received."""
if value_template is not None:
payload = value_template.render_with_possible_json_value(
if self._template is not None:
payload = self._template.async_render_with_possible_json_value(
payload)
if payload == self._payload_lock:
self._state = True
self.schedule_update_ha_state()
elif payload == self._payload_unlock:
self._state = False
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
if self._state_topic is None:
# Force into optimistic mode.
self._optimistic = True
else:
mqtt.subscribe(
hass, self._state_topic, message_received, self._qos)
yield from mqtt.async_subscribe(
self.hass, self._state_topic, message_received, self._qos)
@property
def should_poll(self):
@ -114,20 +123,30 @@ class MqttLock(LockDevice):
"""Return true if we do optimistic updates."""
return self._optimistic
def lock(self, **kwargs):
"""Lock the device."""
mqtt.publish(self.hass, self._command_topic, self._payload_lock,
self._qos, self._retain)
@asyncio.coroutine
def async_lock(self, **kwargs):
"""Lock the device.
This method is a coroutine.
"""
mqtt.async_publish(
self.hass, self._command_topic, self._payload_lock, self._qos,
self._retain)
if self._optimistic:
# Optimistically assume that switch has changed state.
self._state = True
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
def unlock(self, **kwargs):
"""Unlock the device."""
mqtt.publish(self.hass, self._command_topic, self._payload_unlock,
self._qos, self._retain)
@asyncio.coroutine
def async_unlock(self, **kwargs):
"""Unlock the device.
This method is a coroutine.
"""
mqtt.async_publish(
self.hass, self._command_topic, self._payload_unlock, self._qos,
self._retain)
if self._optimistic:
# Optimistically assume that switch has changed state.
self._state = False
self.schedule_update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())

View file

@ -172,10 +172,16 @@ def _build_publish_data(topic, qos, retain):
def publish(hass, topic, payload, qos=None, retain=None):
"""Publish message to an MQTT topic."""
hass.add_job(async_publish, hass, topic, payload, qos, retain)
@callback
def async_publish(hass, topic, payload, qos=None, retain=None):
"""Publish message to an MQTT topic."""
data = _build_publish_data(topic, qos, retain)
data[ATTR_PAYLOAD] = payload
hass.services.call(DOMAIN, SERVICE_PUBLISH, data)
hass.async_add_job(hass.services.async_call(DOMAIN, SERVICE_PUBLISH, data))
def publish_template(hass, topic, payload_template, qos=None, retain=None):
@ -387,6 +393,8 @@ class MQTT(object):
self.progress = {}
self.birth_message = birth_message
self._mqttc = None
self._subscribe_lock = asyncio.Lock(loop=hass.loop)
self._publish_lock = asyncio.Lock(loop=hass.loop)
if protocol == PROTOCOL_31:
proto = mqtt.MQTTv31
@ -426,8 +434,9 @@ class MQTT(object):
This method must be run in the event loop and returns a coroutine.
"""
yield from self.hass.loop.run_in_executor(
None, self._mqttc.publish, topic, payload, qos, retain)
with (yield from self._publish_lock):
yield from self.hass.loop.run_in_executor(
None, self._mqttc.publish, topic, payload, qos, retain)
@asyncio.coroutine
def async_connect(self):
@ -474,8 +483,10 @@ class MQTT(object):
if topic in self.topics:
return
result, mid = yield from self.hass.loop.run_in_executor(
None, self._mqttc.subscribe, topic, qos)
with (yield from self._subscribe_lock):
result, mid = yield from self.hass.loop.run_in_executor(
None, self._mqttc.subscribe, topic, qos)
_raise_on_error(result)
self.progress[mid] = topic

View file

@ -4,10 +4,12 @@ Connect two Home Assistant instances via MQTT.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/mqtt_eventstream/
"""
import asyncio
import json
import voluptuous as vol
from homeassistant.core import callback
import homeassistant.loader as loader
from homeassistant.components.mqtt import (
valid_publish_topic, valid_subscribe_topic)
@ -36,13 +38,15 @@ CONFIG_SCHEMA = vol.Schema({
}, extra=vol.ALLOW_EXTRA)
def setup(hass, config):
@asyncio.coroutine
def async_setup(hass, config):
"""Setup the MQTT eventstream component."""
mqtt = loader.get_component('mqtt')
conf = config.get(DOMAIN, {})
pub_topic = conf.get(CONF_PUBLISH_TOPIC)
sub_topic = conf.get(CONF_SUBSCRIBE_TOPIC)
@callback
def _event_publisher(event):
"""Handle events by publishing them on the MQTT queue."""
if event.origin != EventOrigin.local:
@ -81,13 +85,14 @@ def setup(hass, config):
event_info = {'event_type': event.event_type, 'event_data': event.data}
msg = json.dumps(event_info, cls=JSONEncoder)
mqtt.publish(hass, pub_topic, msg)
mqtt.async_publish(hass, pub_topic, msg)
# Only listen for local events if you are going to publish them.
if pub_topic:
hass.bus.listen(MATCH_ALL, _event_publisher)
hass.bus.async_listen(MATCH_ALL, _event_publisher)
# Process events from a remote server that are received on a queue.
@callback
def _event_receiver(topic, payload, qos):
"""Receive events published by and fire them on this hass instance."""
event = json.loads(payload)
@ -105,7 +110,7 @@ def setup(hass, config):
if state:
event_data[key] = state
hass.bus.fire(
hass.bus.async_fire(
event_type,
event_data=event_data,
origin=EventOrigin.remote
@ -113,8 +118,7 @@ def setup(hass, config):
# Only subscribe if you specified a topic.
if sub_topic:
mqtt.subscribe(hass, sub_topic, _event_receiver)
hass.states.set('{domain}.initialized'.format(domain=DOMAIN), True)
yield from mqtt.async_subscribe(hass, sub_topic, _event_receiver)
hass.states.async_set('{domain}.initialized'.format(domain=DOMAIN), True)
return True

View file

@ -4,6 +4,7 @@ Support for MQTT sensors.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.mqtt/
"""
import asyncio
import logging
import voluptuous as vol
@ -27,8 +28,8 @@ PLATFORM_SCHEMA = mqtt.MQTT_RO_PLATFORM_SCHEMA.extend({
})
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Set up MQTT Sensor."""
if discovery_info is not None:
config = PLATFORM_SCHEMA(discovery_info)
@ -36,8 +37,8 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
value_template = config.get(CONF_VALUE_TEMPLATE)
if value_template is not None:
value_template.hass = hass
add_devices([MqttSensor(
hass,
yield from async_add_devices([MqttSensor(
config.get(CONF_NAME),
config.get(CONF_STATE_TOPIC),
config.get(CONF_QOS),
@ -49,26 +50,32 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class MqttSensor(Entity):
"""Representation of a sensor that can be updated using MQTT."""
def __init__(self, hass, name, state_topic, qos, unit_of_measurement,
def __init__(self, name, state_topic, qos, unit_of_measurement,
value_template):
"""Initialize the sensor."""
self._state = STATE_UNKNOWN
self._hass = hass
self._name = name
self._state_topic = state_topic
self._qos = qos
self._unit_of_measurement = unit_of_measurement
self._template = value_template
def async_added_to_hass(self):
"""Subscribe mqtt events.
This method must be run in the event loop and returns a coroutine.
"""
@callback
def message_received(topic, payload, qos):
"""A new MQTT message has been received."""
if value_template is not None:
payload = value_template.async_render_with_possible_json_value(
if self._template is not None:
payload = self._template.async_render_with_possible_json_value(
payload, self._state)
self._state = payload
hass.async_add_job(self.async_update_ha_state())
self.hass.async_add_job(self.async_update_ha_state())
mqtt.subscribe(hass, self._state_topic, message_received, self._qos)
return mqtt.async_subscribe(
self.hass, self._state_topic, message_received, self._qos)
@property
def should_poll(self):

View file

@ -4,12 +4,14 @@ Support for MQTT room presence detection.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.mqtt_room/
"""
import asyncio
import logging
import json
from datetime import timedelta
import voluptuous as vol
from homeassistant.core import callback
import homeassistant.components.mqtt as mqtt
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
@ -54,11 +56,10 @@ MQTT_PAYLOAD = vol.Schema(vol.All(json.loads, vol.Schema({
}, extra=vol.ALLOW_EXTRA)))
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Setup MQTT Sensor."""
add_devices([MQTTRoomSensor(
hass,
yield from async_add_devices([MQTTRoomSensor(
config.get(CONF_NAME),
config.get(CONF_STATE_TOPIC),
config.get(CONF_DEVICE_ID),
@ -70,11 +71,9 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class MQTTRoomSensor(Entity):
"""Representation of a room sensor that is updated via MQTT."""
def __init__(self, hass, name, state_topic, device_id, timeout,
consider_home):
def __init__(self, name, state_topic, device_id, timeout, consider_home):
"""Initialize the sensor."""
self._state = STATE_AWAY
self._hass = hass
self._name = name
self._state_topic = '{}{}'.format(state_topic, '/+')
self._device_id = slugify(device_id).upper()
@ -85,13 +84,19 @@ class MQTTRoomSensor(Entity):
self._distance = None
self._updated = None
def async_added_to_hass(self):
"""Subscribe mqtt events.
This method must be run in the event loop and returns a coroutine.
"""
@callback
def update_state(device_id, room, distance):
"""Update the sensor state."""
self._state = room
self._distance = distance
self._updated = dt.utcnow()
self.update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
def message_received(topic, payload, qos):
"""A new MQTT message has been received."""
@ -117,7 +122,8 @@ class MQTTRoomSensor(Entity):
or timediff.seconds >= self._timeout:
update_state(**device)
mqtt.subscribe(hass, self._state_topic, message_received, 1)
return mqtt.async_subscribe(
self.hass, self._state_topic, message_received, 1)
@property
def name(self):

View file

@ -4,6 +4,7 @@ Support for MQTT switches.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/switch.mqtt/
"""
import asyncio
import logging
import voluptuous as vol
@ -35,14 +36,14 @@ PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend({
})
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Setup the MQTT switch."""
value_template = config.get(CONF_VALUE_TEMPLATE)
if value_template is not None:
value_template.hass = hass
add_devices([MqttSwitch(
hass,
yield from async_add_devices([MqttSwitch(
config.get(CONF_NAME),
config.get(CONF_STATE_TOPIC),
config.get(CONF_COMMAND_TOPIC),
@ -58,11 +59,10 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class MqttSwitch(SwitchDevice):
"""Representation of a switch that can be toggled using MQTT."""
def __init__(self, hass, name, state_topic, command_topic, qos, retain,
def __init__(self, name, state_topic, command_topic, qos, retain,
payload_on, payload_off, optimistic, value_template):
"""Initialize the MQTT switch."""
self._state = False
self._hass = hass
self._name = name
self._state_topic = state_topic
self._command_topic = command_topic
@ -71,26 +71,33 @@ class MqttSwitch(SwitchDevice):
self._payload_on = payload_on
self._payload_off = payload_off
self._optimistic = optimistic
self._template = value_template
@asyncio.coroutine
def async_added_to_hass(self):
"""Subscribe mqtt events.
This method is a coroutine.
"""
@callback
def message_received(topic, payload, qos):
"""A new MQTT message has been received."""
if value_template is not None:
payload = value_template.async_render_with_possible_json_value(
if self._template is not None:
payload = self._template.async_render_with_possible_json_value(
payload)
if payload == self._payload_on:
self._state = True
hass.async_add_job(self.async_update_ha_state())
elif payload == self._payload_off:
self._state = False
hass.async_add_job(self.async_update_ha_state())
self.hass.async_add_job(self.async_update_ha_state())
if self._state_topic is None:
# Force into optimistic mode.
self._optimistic = True
else:
mqtt.subscribe(
hass, self._state_topic, message_received, self._qos)
yield from mqtt.async_subscribe(
self.hass, self._state_topic, message_received, self._qos)
@property
def should_poll(self):
@ -112,20 +119,30 @@ class MqttSwitch(SwitchDevice):
"""Return true if we do optimistic updates."""
return self._optimistic
def turn_on(self, **kwargs):
"""Turn the device on."""
mqtt.publish(self.hass, self._command_topic, self._payload_on,
self._qos, self._retain)
@asyncio.coroutine
def async_turn_on(self, **kwargs):
"""Turn the device on.
This method is a coroutine.
"""
mqtt.async_publish(
self.hass, self._command_topic, self._payload_on, self._qos,
self._retain)
if self._optimistic:
# Optimistically assume that switch has changed state.
self._state = True
self.update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())
def turn_off(self, **kwargs):
"""Turn the device off."""
mqtt.publish(self.hass, self._command_topic, self._payload_off,
self._qos, self._retain)
@asyncio.coroutine
def async_turn_off(self, **kwargs):
"""Turn the device off.
This method is a coroutine.
"""
mqtt.async_publish(
self.hass, self._command_topic, self._payload_off, self._qos,
self._retain)
if self._optimistic:
# Optimistically assume that switch has changed state.
self._state = False
self.update_ha_state()
self.hass.async_add_job(self.async_update_ha_state())

View file

@ -57,7 +57,7 @@ class TestMqttEventStream(object):
# Verify that the event handler has been added as a listener
assert self.hass.bus.listeners.get('*') == 1
@patch('homeassistant.components.mqtt.subscribe')
@patch('homeassistant.components.mqtt.async_subscribe')
def test_subscribe(self, mock_sub):
""""Test the subscription."""
sub_topic = 'foo'
@ -67,7 +67,7 @@ class TestMqttEventStream(object):
# Verify that the this entity was subscribed to the topic
mock_sub.assert_called_with(self.hass, sub_topic, ANY)
@patch('homeassistant.components.mqtt.publish')
@patch('homeassistant.components.mqtt.async_publish')
@patch('homeassistant.core.dt_util.utcnow')
def test_state_changed_event_sends_message(self, mock_utcnow, mock_pub):
""""Test the sending of a new message if event changed."""
@ -110,7 +110,7 @@ class TestMqttEventStream(object):
# Verify that the message received was that expected
assert json.loads(msg) == event
@patch('homeassistant.components.mqtt.publish')
@patch('homeassistant.components.mqtt.async_publish')
def test_time_event_does_not_send_message(self, mock_pub):
""""Test the sending of a new message if time event."""
assert self.add_eventstream(pub_topic='bar')
@ -147,7 +147,7 @@ class TestMqttEventStream(object):
assert 1 == len(calls)
@patch('homeassistant.components.mqtt.publish')
@patch('homeassistant.components.mqtt.async_publish')
def test_mqtt_received_event(self, mock_pub):
"""Don't filter events from the mqtt component about received message.