Make mysensors component async (#13641)

* Make mysensors component async

* Use async dispatcher and discovery.
* Run I/O in executor.
* Make mysensors actuator methods async.
* Upgrade pymysensors to 0.13.0.
* Use async serial gateway.
* Use async TCP gateway.
* Use async mqtt gateway.

* Start gateway before hass start event

* Make sure gateway is started after discovery of persistent devices
  and after corresponding platforms have been loaded.
* Don't wait to start gateway until after hass start.

* Bump pymysensors to 0.14.0
This commit is contained in:
Martin Hjelmare 2018-05-11 09:39:18 +02:00 committed by GitHub
parent ef8fc1f201
commit be3b227a87
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 110 additions and 80 deletions

View file

@ -4,6 +4,7 @@ Connect to a MySensors gateway via pymysensors API.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/mysensors/
"""
import asyncio
from collections import defaultdict
import logging
import os
@ -16,17 +17,17 @@ import voluptuous as vol
from homeassistant.components.mqtt import (
valid_publish_topic, valid_subscribe_topic)
from homeassistant.const import (
ATTR_BATTERY_LEVEL, CONF_NAME, CONF_OPTIMISTIC, EVENT_HOMEASSISTANT_START,
ATTR_BATTERY_LEVEL, CONF_NAME, CONF_OPTIMISTIC,
EVENT_HOMEASSISTANT_STOP, STATE_OFF, STATE_ON)
from homeassistant.core import callback
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect, dispatcher_send)
async_dispatcher_connect, async_dispatcher_send)
from homeassistant.helpers.entity import Entity
from homeassistant.setup import setup_component
from homeassistant.setup import async_setup_component
REQUIREMENTS = ['pymysensors==0.11.1']
REQUIREMENTS = ['pymysensors==0.14.0']
_LOGGER = logging.getLogger(__name__)
@ -280,67 +281,62 @@ MYSENSORS_CONST_SCHEMA = {
}
def setup(hass, config):
async def async_setup(hass, config):
"""Set up the MySensors component."""
import mysensors.mysensors as mysensors
version = config[DOMAIN].get(CONF_VERSION)
persistence = config[DOMAIN].get(CONF_PERSISTENCE)
def setup_gateway(device, persistence_file, baud_rate, tcp_port, in_prefix,
out_prefix):
async def setup_gateway(
device, persistence_file, baud_rate, tcp_port, in_prefix,
out_prefix):
"""Return gateway after setup of the gateway."""
if device == MQTT_COMPONENT:
if not setup_component(hass, MQTT_COMPONENT, config):
return
if not await async_setup_component(hass, MQTT_COMPONENT, config):
return None
mqtt = hass.components.mqtt
retain = config[DOMAIN].get(CONF_RETAIN)
def pub_callback(topic, payload, qos, retain):
"""Call MQTT publish function."""
mqtt.publish(topic, payload, qos, retain)
mqtt.async_publish(topic, payload, qos, retain)
def sub_callback(topic, sub_cb, qos):
"""Call MQTT subscribe function."""
mqtt.subscribe(topic, sub_cb, qos)
gateway = mysensors.MQTTGateway(
pub_callback, sub_callback,
@callback
def internal_callback(*args):
"""Call callback."""
sub_cb(*args)
hass.async_add_job(
mqtt.async_subscribe(topic, internal_callback, qos))
gateway = mysensors.AsyncMQTTGateway(
pub_callback, sub_callback, in_prefix=in_prefix,
out_prefix=out_prefix, retain=retain, loop=hass.loop,
event_callback=None, persistence=persistence,
persistence_file=persistence_file,
protocol_version=version, in_prefix=in_prefix,
out_prefix=out_prefix, retain=retain)
protocol_version=version)
else:
try:
is_serial_port(device)
gateway = mysensors.SerialGateway(
device, event_callback=None, persistence=persistence,
await hass.async_add_job(is_serial_port, device)
gateway = mysensors.AsyncSerialGateway(
device, baud=baud_rate, loop=hass.loop,
event_callback=None, persistence=persistence,
persistence_file=persistence_file,
protocol_version=version, baud=baud_rate)
protocol_version=version)
except vol.Invalid:
try:
socket.getaddrinfo(device, None)
# valid ip address
gateway = mysensors.TCPGateway(
device, event_callback=None, persistence=persistence,
persistence_file=persistence_file,
protocol_version=version, port=tcp_port)
except OSError:
# invalid ip address
return
gateway = mysensors.AsyncTCPGateway(
device, port=tcp_port, loop=hass.loop, event_callback=None,
persistence=persistence, persistence_file=persistence_file,
protocol_version=version)
gateway.metric = hass.config.units.is_metric
gateway.optimistic = config[DOMAIN].get(CONF_OPTIMISTIC)
gateway.device = device
gateway.event_callback = gw_callback_factory(hass)
def gw_start(event):
"""Trigger to start of the gateway and any persistence."""
if persistence:
discover_persistent_devices(hass, gateway)
gateway.start()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP,
lambda event: gateway.stop())
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, gw_start)
if persistence:
await gateway.start_persistence()
return gateway
@ -357,7 +353,7 @@ def setup(hass, config):
tcp_port = gway.get(CONF_TCP_PORT)
in_prefix = gway.get(CONF_TOPIC_IN_PREFIX, '')
out_prefix = gway.get(CONF_TOPIC_OUT_PREFIX, '')
ready_gateway = setup_gateway(
ready_gateway = await setup_gateway(
device, persistence_file, baud_rate, tcp_port, in_prefix,
out_prefix)
if ready_gateway is not None:
@ -371,9 +367,36 @@ def setup(hass, config):
hass.data[MYSENSORS_GATEWAYS] = gateways
hass.async_add_job(finish_setup(hass, gateways))
return True
async def finish_setup(hass, gateways):
"""Load any persistent devices and platforms and start gateway."""
discover_tasks = []
start_tasks = []
for gateway in gateways.values():
discover_tasks.append(discover_persistent_devices(hass, gateway))
start_tasks.append(gw_start(hass, gateway))
if discover_tasks:
# Make sure all devices and platforms are loaded before gateway start.
await asyncio.wait(discover_tasks, loop=hass.loop)
if start_tasks:
await asyncio.wait(start_tasks, loop=hass.loop)
async def gw_start(hass, gateway):
"""Start the gateway."""
@callback
def gw_stop(event):
"""Trigger to stop the gateway."""
hass.async_add_job(gateway.stop())
await gateway.start()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, gw_stop)
def validate_child(gateway, node_id, child):
"""Validate that a child has the correct values according to schema.
@ -431,14 +454,18 @@ def validate_child(gateway, node_id, child):
return validated
@callback
def discover_mysensors_platform(hass, platform, new_devices):
"""Discover a MySensors platform."""
discovery.load_platform(
hass, platform, DOMAIN, {ATTR_DEVICES: new_devices, CONF_NAME: DOMAIN})
task = hass.async_add_job(discovery.async_load_platform(
hass, platform, DOMAIN,
{ATTR_DEVICES: new_devices, CONF_NAME: DOMAIN}))
return task
def discover_persistent_devices(hass, gateway):
async def discover_persistent_devices(hass, gateway):
"""Discover platforms for devices loaded via persistence file."""
tasks = []
new_devices = defaultdict(list)
for node_id in gateway.sensors:
node = gateway.sensors[node_id]
@ -447,7 +474,9 @@ def discover_persistent_devices(hass, gateway):
for platform, dev_ids in validated.items():
new_devices[platform].extend(dev_ids)
for platform, dev_ids in new_devices.items():
discover_mysensors_platform(hass, platform, dev_ids)
tasks.append(discover_mysensors_platform(hass, platform, dev_ids))
if tasks:
await asyncio.wait(tasks, loop=hass.loop)
def get_mysensors_devices(hass, domain):
@ -459,6 +488,7 @@ def get_mysensors_devices(hass, domain):
def gw_callback_factory(hass):
"""Return a new callback for the gateway."""
@callback
def mysensors_callback(msg):
"""Handle messages from a MySensors gateway."""
start = timer()
@ -489,7 +519,7 @@ def gw_callback_factory(hass):
# Only one signal per device is needed.
# A device can have multiple platforms, ie multiple schemas.
# FOR LATER: Add timer to not signal if another update comes in.
dispatcher_send(hass, signal)
async_dispatcher_send(hass, signal)
end = timer()
if end - start > 0.1:
_LOGGER.debug(