hass-core/homeassistant/components/enocean/dongle.py
jduquennoy af6a4bb6cf
Refactor Enocean part 1 (#35927)
* First step of an EnOcean integration refactoring, including code reorganisation and support of a setup config flow

* Moved title to root of strings file

* Fixed pre-commit checks failures

* Fixed linter errors

* Updated formatted string format in logs

* Removed leftover comment

* Multiple changes after PR change requests.
Using an import flow for yaml config, removed unnecessary logs, added proper unload in __init__ and EnOceanDongle
Replaced config state machine by several flows.
Serial port validity check done in the EnOceanDongle class asynchronously, removed unique ID from config flow
Multiple cosmetic changes

* Multiple changes after PR change requests

* Added variable to store default value, as setdefault was caught returning None when the empty dict literal was passed as an argument

* Literal used directly

* Added tests for EnOcean config flows, changed static methods to bundle methods for bundle

* Updated variable name

* Added missing mock to test, replaced repeated magic strings by constants

* Changed imports to avoid an unused import warning from pylint on DOMAIN

* Adding pylint exception for unused import

* Added proper propagation of setup and unload to platforms, removed dead code, some syntax changes

* Removed setup_entry forwarding as the entities can only be configured using yaml

* Removed forwarding of unload

* Enabled code coverage for config flow only

* Clean up coveragerc

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
2020-07-08 20:46:38 -04:00

87 lines
2.8 KiB
Python

"""Representation of an EnOcean dongle."""
import glob
import logging
from os.path import basename, normpath
from enocean.communicators import SerialCommunicator
from enocean.protocol.packet import RadioPacket
import serial
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from .const import SIGNAL_RECEIVE_MESSAGE, SIGNAL_SEND_MESSAGE
_LOGGER = logging.getLogger(__name__)
class EnOceanDongle:
"""Representation of an EnOcean dongle.
The dongle is responsible for receiving the ENOcean frames,
creating devices if needed, and dispatching messages to platforms.
"""
def __init__(self, hass, serial_path):
"""Initialize the EnOcean dongle."""
self._communicator = SerialCommunicator(
port=serial_path, callback=self.callback
)
self.serial_path = serial_path
self.identifier = basename(normpath(serial_path))
self.hass = hass
self.dispatcher_disconnect_handle = None
async def async_setup(self):
"""Finish the setup of the bridge and supported platforms."""
self._communicator.start()
self.dispatcher_disconnect_handle = async_dispatcher_connect(
self.hass, SIGNAL_SEND_MESSAGE, self._send_message_callback
)
def unload(self):
"""Disconnect callbacks established at init time."""
if self.dispatcher_disconnect_handle:
self.dispatcher_disconnect_handle()
self.dispatcher_disconnect_handle = None
def _send_message_callback(self, command):
"""Send a command through the EnOcean dongle."""
self._communicator.send(command)
def callback(self, packet):
"""Handle EnOcean device's callback.
This is the callback function called by python-enocan whenever there
is an incoming packet.
"""
if isinstance(packet, RadioPacket):
_LOGGER.debug("Received radio packet: %s", packet)
self.hass.helpers.dispatcher.dispatcher_send(SIGNAL_RECEIVE_MESSAGE, packet)
def detect():
"""Return a list of candidate paths for USB ENOcean dongles.
This method is currently a bit simplistic, it may need to be
improved to support more configurations and OS.
"""
globs_to_test = ["/dev/tty*FTOA2PV*", "/dev/serial/by-id/*EnOcean*"]
found_paths = []
for current_glob in globs_to_test:
found_paths.extend(glob.glob(current_glob))
return found_paths
def validate_path(path: str):
"""Return True if the provided path points to a valid serial port, False otherwise."""
try:
# Creating the serial communicator will raise an exception
# if it cannot connect
SerialCommunicator(port=path)
return True
except serial.SerialException as exception:
_LOGGER.warning("Dongle path %s is invalid: %s", path, str(exception))
return False