* MySensors: Add type annotations
Adds a bunch of type annotations that were created
while understanding the code.
* MySensors: Change GatewayId to string
In preparation for config flow.
The GatewayId used to be id(gateway).
With config flows, every gateway will have its own
ConfigEntry. Every ConfigEntry has a unique id.
Thus we would have two separate but one-to-one related ID systems.
This commit removes this unneeded duplication by using the id of the ConfigEntry
as GatewayId.
* MySensors: Add unique_id to all entities
This allows entities to work well with the frontend.
* MySensors: Add device_info to all entities
Entities belonging to the same node_id will now by grouped as a device.
* MySensors: clean up device.py a bit
* MySensors: Add config flow support
With this change the MySensors can be fully configured from the GUI.
Legacy configuration.yaml configs will be migrated by reading them once.
Note that custom node names are not migrated. Users will have to re-enter
the names in the front-end.
Since there is no straight-forward way to configure global settings,
all previously global settings are now per-gateway. These settings include:
- MQTT retain
- optimistic
- persistence enable
- MySensors version
When a MySensors integration is loaded, it works as follows:
1. __init__.async_setup_entry is called
2. for every platform, async_forward_entry_setup is called
3. the platform's async_setup_entry is called
4. __init__.setup_mysensors_platform is called
5. the entity's constructor (e.g. MySensorsCover) is called
6. the created entity is stored in a dict in the hass object
* MySensors: Fix linter errors
* MySensors: Remove unused import
* MySensors: Feedback from @MartinHjelmare
* MySensors: Multi-step config flow
* MySensors: More feedback
* MySensors: Move all storage in hass object under DOMAIN
The integration now stores everything under hass.data["mysensors"]
instead of using several top level keys.
* MySensors: await shutdown of gateway instead of creating a task
* MySensors: Rename Ethernet to TCP
* MySensors: Absolute imports and cosmetic changes
* MySensors: fix gw_stop
* MySensors: Allow user to specify persistence file
* MySensors: Nicer log message
* MySensors: Add lots of unit tests
* MySensors: Fix legacy import of persistence file name
Turns out tests help to find bugs :D
* MySensors: Improve test coverage
* MySensors: Use json persistence files by default
* MySensors: Code style improvements
* MySensors: Stop adding attributes to existing objects
This commit removes the extra attributes that were being
added to the gateway objects from pymysensors.
Most attributes were easy to remove, except for the gateway id.
The MySensorsDevice class needs the gateway id as it is part of its DevId
as well as the unique_id and device_info.
Most MySensorsDevices actually end up being Entities.
Entities have access to their ConfigEntry via self.platform.config_entry.
However, the device_tracker platform does not become an Entity.
For this reason, the gateway id is not fetched from self.plaform but
given as an argument.
Additionally, MySensorsDevices expose the address of the gateway
(CONF_DEVICE). Entities can easily fetch this information via self.platform,
but the device_tracker cannot. This commit chooses to remove the gateway
address from device_tracker. While this could in theory break some automations,
the simplicity of this solution was deemed worth it.
The alternative of adding the entire ConfigEntry as an argument to MySensorsDevices
is not viable, because device_tracker is initialized by the async_setup_scanner function
that isn't supplied a ConfigEntry. It only gets discovery_info.
Adding the entire ConfigEntry doesn't seem appropriate for this edge case.
* MySensors: Fix gw_stop and the translations
* MySensors: Fix incorrect function calls
* MySensors: Fewer comments in const.py
* MySensors: Remove union from _get_gateway and remove id from try_connect
* MySensors: Deprecate nodes option in configuration.yaml
* MySensors: Use version parser from packaging
* MySensors: Remove prefix from unique_id and change some private property names
* MySensors: Change _get_gateway function signature
* MySensors: add packaging==20.8 for the version parser
* MySensors: Rename some stuff
* MySensors: use pytest.mark.parametrize
* MySensors: Clean up test cases
* MySensors: Remove unneeded parameter from devices
* Revert "MySensors: add packaging==20.8 for the version parser"
This reverts commit 6b200ee01a
.
* MySensors: Use core interface for testing configuration.yaml import
* MySensors: Fix test_init
* MySensors: Rename a few variables
* MySensors: cosmetic changes
* MySensors: Update strings.json
* MySensors: Still more feedback from @MartinHjelmare
* MySensors: Remove unused strings
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
* MySensors: Fix typo and remove another unused string
* MySensors: More strings.json
* MySensors: Fix gateway ready handler
* MySensors: Add duplicate detection to config flows
* MySensors: Deal with non-existing topics and ports.
Includes unit tests for these cases.
* MySensors: Use awesomeversion instead of packaging
* Add string already_configured
* MySensors: Abort config flow when config is found to be invalid while importing
* MySensors: Copy all error messages to also be abort messages
All error strings may now also be used as an abort reason,
so the strings should be defined
* Use string references
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
338 lines
12 KiB
Python
338 lines
12 KiB
Python
"""Handle MySensors gateways."""
|
|
import asyncio
|
|
from collections import defaultdict
|
|
import logging
|
|
import socket
|
|
import sys
|
|
from typing import Any, Callable, Coroutine, Dict, Optional
|
|
|
|
import async_timeout
|
|
from mysensors import BaseAsyncGateway, Message, Sensor, mysensors
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
|
from homeassistant.core import Event, callback
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.helpers.typing import HomeAssistantType
|
|
|
|
from .const import (
|
|
CONF_BAUD_RATE,
|
|
CONF_DEVICE,
|
|
CONF_PERSISTENCE_FILE,
|
|
CONF_RETAIN,
|
|
CONF_TCP_PORT,
|
|
CONF_TOPIC_IN_PREFIX,
|
|
CONF_TOPIC_OUT_PREFIX,
|
|
CONF_VERSION,
|
|
DOMAIN,
|
|
MYSENSORS_GATEWAY_READY,
|
|
MYSENSORS_GATEWAY_START_TASK,
|
|
MYSENSORS_GATEWAYS,
|
|
GatewayId,
|
|
)
|
|
from .handler import HANDLERS
|
|
from .helpers import discover_mysensors_platform, validate_child, validate_node
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
GATEWAY_READY_TIMEOUT = 15.0
|
|
MQTT_COMPONENT = "mqtt"
|
|
|
|
|
|
def is_serial_port(value):
|
|
"""Validate that value is a windows serial port or a unix device."""
|
|
if sys.platform.startswith("win"):
|
|
ports = (f"COM{idx + 1}" for idx in range(256))
|
|
if value in ports:
|
|
return value
|
|
raise vol.Invalid(f"{value} is not a serial port")
|
|
return cv.isdevice(value)
|
|
|
|
|
|
def is_socket_address(value):
|
|
"""Validate that value is a valid address."""
|
|
try:
|
|
socket.getaddrinfo(value, None)
|
|
return value
|
|
except OSError as err:
|
|
raise vol.Invalid("Device is not a valid domain name or ip address") from err
|
|
|
|
|
|
async def try_connect(hass: HomeAssistantType, user_input: Dict[str, str]) -> bool:
|
|
"""Try to connect to a gateway and report if it worked."""
|
|
if user_input[CONF_DEVICE] == MQTT_COMPONENT:
|
|
return True # dont validate mqtt. mqtt gateways dont send ready messages :(
|
|
try:
|
|
gateway_ready = asyncio.Future()
|
|
|
|
def gateway_ready_callback(msg):
|
|
msg_type = msg.gateway.const.MessageType(msg.type)
|
|
_LOGGER.debug("Received MySensors msg type %s: %s", msg_type.name, msg)
|
|
if msg_type.name != "internal":
|
|
return
|
|
internal = msg.gateway.const.Internal(msg.sub_type)
|
|
if internal.name != "I_GATEWAY_READY":
|
|
return
|
|
_LOGGER.debug("Received gateway ready")
|
|
gateway_ready.set_result(True)
|
|
|
|
gateway: Optional[BaseAsyncGateway] = await _get_gateway(
|
|
hass,
|
|
device=user_input[CONF_DEVICE],
|
|
version=user_input[CONF_VERSION],
|
|
event_callback=gateway_ready_callback,
|
|
persistence_file=None,
|
|
baud_rate=user_input.get(CONF_BAUD_RATE),
|
|
tcp_port=user_input.get(CONF_TCP_PORT),
|
|
topic_in_prefix=None,
|
|
topic_out_prefix=None,
|
|
retain=False,
|
|
persistence=False,
|
|
)
|
|
if gateway is None:
|
|
return False
|
|
|
|
connect_task = None
|
|
try:
|
|
connect_task = asyncio.create_task(gateway.start())
|
|
with async_timeout.timeout(5):
|
|
await gateway_ready
|
|
return True
|
|
except asyncio.TimeoutError:
|
|
_LOGGER.info("Try gateway connect failed with timeout")
|
|
return False
|
|
finally:
|
|
if connect_task is not None and not connect_task.done():
|
|
connect_task.cancel()
|
|
asyncio.create_task(gateway.stop())
|
|
except OSError as err:
|
|
_LOGGER.info("Try gateway connect failed with exception", exc_info=err)
|
|
return False
|
|
|
|
|
|
def get_mysensors_gateway(
|
|
hass: HomeAssistantType, gateway_id: GatewayId
|
|
) -> Optional[BaseAsyncGateway]:
|
|
"""Return the Gateway for a given GatewayId."""
|
|
if MYSENSORS_GATEWAYS not in hass.data[DOMAIN]:
|
|
hass.data[DOMAIN][MYSENSORS_GATEWAYS] = {}
|
|
gateways = hass.data[DOMAIN].get(MYSENSORS_GATEWAYS)
|
|
return gateways.get(gateway_id)
|
|
|
|
|
|
async def setup_gateway(
|
|
hass: HomeAssistantType, entry: ConfigEntry
|
|
) -> Optional[BaseAsyncGateway]:
|
|
"""Set up the Gateway for the given ConfigEntry."""
|
|
|
|
ready_gateway = await _get_gateway(
|
|
hass,
|
|
device=entry.data[CONF_DEVICE],
|
|
version=entry.data[CONF_VERSION],
|
|
event_callback=_gw_callback_factory(hass, entry.entry_id),
|
|
persistence_file=entry.data.get(
|
|
CONF_PERSISTENCE_FILE, f"mysensors_{entry.entry_id}.json"
|
|
),
|
|
baud_rate=entry.data.get(CONF_BAUD_RATE),
|
|
tcp_port=entry.data.get(CONF_TCP_PORT),
|
|
topic_in_prefix=entry.data.get(CONF_TOPIC_IN_PREFIX),
|
|
topic_out_prefix=entry.data.get(CONF_TOPIC_OUT_PREFIX),
|
|
retain=entry.data.get(CONF_RETAIN, False),
|
|
)
|
|
return ready_gateway
|
|
|
|
|
|
async def _get_gateway(
|
|
hass: HomeAssistantType,
|
|
device: str,
|
|
version: str,
|
|
event_callback: Callable[[Message], None],
|
|
persistence_file: Optional[str] = None,
|
|
baud_rate: Optional[int] = None,
|
|
tcp_port: Optional[int] = None,
|
|
topic_in_prefix: Optional[str] = None,
|
|
topic_out_prefix: Optional[str] = None,
|
|
retain: bool = False,
|
|
persistence: bool = True, # old persistence option has been deprecated. kwarg is here so we can run try_connect() without persistence
|
|
) -> Optional[BaseAsyncGateway]:
|
|
"""Return gateway after setup of the gateway."""
|
|
|
|
if persistence_file is not None:
|
|
# interpret relative paths to be in hass config folder. absolute paths will be left as they are
|
|
persistence_file = hass.config.path(persistence_file)
|
|
|
|
if device == MQTT_COMPONENT:
|
|
# what is the purpose of this?
|
|
# if not await async_setup_component(hass, MQTT_COMPONENT, entry):
|
|
# return None
|
|
mqtt = hass.components.mqtt
|
|
|
|
def pub_callback(topic, payload, qos, retain):
|
|
"""Call MQTT publish function."""
|
|
mqtt.async_publish(topic, payload, qos, retain)
|
|
|
|
def sub_callback(topic, sub_cb, qos):
|
|
"""Call MQTT subscribe function."""
|
|
|
|
@callback
|
|
def internal_callback(msg):
|
|
"""Call callback."""
|
|
sub_cb(msg.topic, msg.payload, msg.qos)
|
|
|
|
hass.async_create_task(mqtt.async_subscribe(topic, internal_callback, qos))
|
|
|
|
gateway = mysensors.AsyncMQTTGateway(
|
|
pub_callback,
|
|
sub_callback,
|
|
in_prefix=topic_in_prefix,
|
|
out_prefix=topic_out_prefix,
|
|
retain=retain,
|
|
loop=hass.loop,
|
|
event_callback=None,
|
|
persistence=persistence,
|
|
persistence_file=persistence_file,
|
|
protocol_version=version,
|
|
)
|
|
else:
|
|
try:
|
|
await hass.async_add_executor_job(is_serial_port, device)
|
|
gateway = mysensors.AsyncSerialGateway(
|
|
device,
|
|
baud=baud_rate,
|
|
loop=hass.loop,
|
|
event_callback=None,
|
|
persistence=persistence,
|
|
persistence_file=persistence_file,
|
|
protocol_version=version,
|
|
)
|
|
except vol.Invalid:
|
|
try:
|
|
await hass.async_add_executor_job(is_socket_address, device)
|
|
# valid ip address
|
|
gateway = mysensors.AsyncTCPGateway(
|
|
device,
|
|
port=tcp_port,
|
|
loop=hass.loop,
|
|
event_callback=None,
|
|
persistence=persistence,
|
|
persistence_file=persistence_file,
|
|
protocol_version=version,
|
|
)
|
|
except vol.Invalid:
|
|
# invalid ip address
|
|
_LOGGER.error("Connect failed: Invalid device %s", device)
|
|
return None
|
|
gateway.event_callback = event_callback
|
|
if persistence:
|
|
await gateway.start_persistence()
|
|
|
|
return gateway
|
|
|
|
|
|
async def finish_setup(
|
|
hass: HomeAssistantType, entry: ConfigEntry, gateway: BaseAsyncGateway
|
|
):
|
|
"""Load any persistent devices and platforms and start gateway."""
|
|
discover_tasks = []
|
|
start_tasks = []
|
|
discover_tasks.append(_discover_persistent_devices(hass, entry, gateway))
|
|
start_tasks.append(_gw_start(hass, entry, gateway))
|
|
if discover_tasks:
|
|
# Make sure all devices and platforms are loaded before gateway start.
|
|
await asyncio.wait(discover_tasks)
|
|
if start_tasks:
|
|
await asyncio.wait(start_tasks)
|
|
|
|
|
|
async def _discover_persistent_devices(
|
|
hass: HomeAssistantType, entry: ConfigEntry, gateway: BaseAsyncGateway
|
|
):
|
|
"""Discover platforms for devices loaded via persistence file."""
|
|
tasks = []
|
|
new_devices = defaultdict(list)
|
|
for node_id in gateway.sensors:
|
|
if not validate_node(gateway, node_id):
|
|
continue
|
|
node: Sensor = gateway.sensors[node_id]
|
|
for child in node.children.values(): # child is of type ChildSensor
|
|
validated = validate_child(entry.entry_id, gateway, node_id, child)
|
|
for platform, dev_ids in validated.items():
|
|
new_devices[platform].extend(dev_ids)
|
|
_LOGGER.debug("discovering persistent devices: %s", new_devices)
|
|
for platform, dev_ids in new_devices.items():
|
|
discover_mysensors_platform(hass, entry.entry_id, platform, dev_ids)
|
|
if tasks:
|
|
await asyncio.wait(tasks)
|
|
|
|
|
|
async def gw_stop(hass, entry: ConfigEntry, gateway: BaseAsyncGateway):
|
|
"""Stop the gateway."""
|
|
connect_task = hass.data[DOMAIN].get(
|
|
MYSENSORS_GATEWAY_START_TASK.format(entry.entry_id)
|
|
)
|
|
if connect_task is not None and not connect_task.done():
|
|
connect_task.cancel()
|
|
await gateway.stop()
|
|
|
|
|
|
async def _gw_start(
|
|
hass: HomeAssistantType, entry: ConfigEntry, gateway: BaseAsyncGateway
|
|
):
|
|
"""Start the gateway."""
|
|
# Don't use hass.async_create_task to avoid holding up setup indefinitely.
|
|
hass.data[DOMAIN][
|
|
MYSENSORS_GATEWAY_START_TASK.format(entry.entry_id)
|
|
] = asyncio.create_task(
|
|
gateway.start()
|
|
) # store the connect task so it can be cancelled in gw_stop
|
|
|
|
async def stop_this_gw(_: Event):
|
|
await gw_stop(hass, entry, gateway)
|
|
|
|
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_this_gw)
|
|
if entry.data[CONF_DEVICE] == MQTT_COMPONENT:
|
|
# Gatways connected via mqtt doesn't send gateway ready message.
|
|
return
|
|
gateway_ready = asyncio.Future()
|
|
gateway_ready_key = MYSENSORS_GATEWAY_READY.format(entry.entry_id)
|
|
hass.data[DOMAIN][gateway_ready_key] = gateway_ready
|
|
|
|
try:
|
|
with async_timeout.timeout(GATEWAY_READY_TIMEOUT):
|
|
await gateway_ready
|
|
except asyncio.TimeoutError:
|
|
_LOGGER.warning(
|
|
"Gateway %s not ready after %s secs so continuing with setup",
|
|
entry.data[CONF_DEVICE],
|
|
GATEWAY_READY_TIMEOUT,
|
|
)
|
|
finally:
|
|
hass.data[DOMAIN].pop(gateway_ready_key, None)
|
|
|
|
|
|
def _gw_callback_factory(
|
|
hass: HomeAssistantType, gateway_id: GatewayId
|
|
) -> Callable[[Message], None]:
|
|
"""Return a new callback for the gateway."""
|
|
|
|
@callback
|
|
def mysensors_callback(msg: Message):
|
|
"""Handle messages from a MySensors gateway.
|
|
|
|
All MySenors messages are received here.
|
|
The messages are passed to handler functions depending on their type.
|
|
"""
|
|
_LOGGER.debug("Node update: node %s child %s", msg.node_id, msg.child_id)
|
|
|
|
msg_type = msg.gateway.const.MessageType(msg.type)
|
|
msg_handler: Callable[
|
|
[Any, GatewayId, Message], Coroutine[None]
|
|
] = HANDLERS.get(msg_type.name)
|
|
|
|
if msg_handler is None:
|
|
return
|
|
|
|
hass.async_create_task(msg_handler(hass, gateway_id, msg))
|
|
|
|
return mysensors_callback
|