hass-core/homeassistant/components/raspihats/__init__.py
Ville Skyttä b4bac0f7a0
Exception chaining and wrapping improvements (#39320)
* Remove unnecessary exception re-wraps

* Preserve exception chains on re-raise

We slap "from cause" to almost all possible cases here. In some cases it
could conceivably be better to do "from None" if we really want to hide
the cause. However those should be in the minority, and "from cause"
should be an improvement over the corresponding raise without a "from"
in all cases anyway.

The only case where we raise from None here is in plex, where the
exception for an original invalid SSL cert is not the root cause for
failure to validate a newly fetched one.

Follow local convention on exception variable names if there is a
consistent one, otherwise `err` to match with majority of codebase.

* Fix mistaken re-wrap in homematicip_cloud/hap.py

Missed the difference between HmipConnectionError and
HmipcConnectionError.

* Do not hide original error on plex new cert validation error

Original is not the cause for the new one, but showing old in the
traceback is useful nevertheless.
2020-08-28 13:50:32 +02:00

245 lines
8.8 KiB
Python

"""Support for controlling raspihats boards."""
import logging
import threading
import time
from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP
_LOGGER = logging.getLogger(__name__)
DOMAIN = "raspihats"
CONF_I2C_HATS = "i2c_hats"
CONF_BOARD = "board"
CONF_CHANNELS = "channels"
CONF_INDEX = "index"
CONF_INVERT_LOGIC = "invert_logic"
CONF_INITIAL_STATE = "initial_state"
I2C_HAT_NAMES = [
"Di16",
"Rly10",
"Di6Rly6",
"DI16ac",
"DQ10rly",
"DQ16oc",
"DI6acDQ6rly",
]
I2C_HATS_MANAGER = "I2CH_MNG"
def setup(hass, config):
"""Set up the raspihats component."""
hass.data[I2C_HATS_MANAGER] = I2CHatsManager()
def start_i2c_hats_keep_alive(event):
"""Start I2C-HATs keep alive."""
hass.data[I2C_HATS_MANAGER].start_keep_alive()
def stop_i2c_hats_keep_alive(event):
"""Stop I2C-HATs keep alive."""
hass.data[I2C_HATS_MANAGER].stop_keep_alive()
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_i2c_hats_keep_alive)
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_i2c_hats_keep_alive)
return True
def log_message(source, *parts):
"""Build log message."""
message = source.__class__.__name__
for part in parts:
message += f": {part!s}"
return message
class I2CHatsException(Exception):
"""I2C-HATs exception."""
class I2CHatsDIScanner:
"""Scan Digital Inputs and fire callbacks."""
_DIGITAL_INPUTS = "di"
_OLD_VALUE = "old_value"
_CALLBACKS = "callbacks"
def setup(self, i2c_hat):
"""Set up the I2C-HAT instance for digital inputs scanner."""
if hasattr(i2c_hat, self._DIGITAL_INPUTS):
digital_inputs = getattr(i2c_hat, self._DIGITAL_INPUTS)
old_value = None
# Add old value attribute
setattr(digital_inputs, self._OLD_VALUE, old_value)
# Add callbacks dict attribute {channel: callback}
setattr(digital_inputs, self._CALLBACKS, {})
def register_callback(self, i2c_hat, channel, callback):
"""Register edge callback."""
if hasattr(i2c_hat, self._DIGITAL_INPUTS):
digital_inputs = getattr(i2c_hat, self._DIGITAL_INPUTS)
callbacks = getattr(digital_inputs, self._CALLBACKS)
callbacks[channel] = callback
setattr(digital_inputs, self._CALLBACKS, callbacks)
def scan(self, i2c_hat):
"""Scan I2C-HATs digital inputs and fire callbacks."""
if hasattr(i2c_hat, self._DIGITAL_INPUTS):
digital_inputs = getattr(i2c_hat, self._DIGITAL_INPUTS)
callbacks = getattr(digital_inputs, self._CALLBACKS)
old_value = getattr(digital_inputs, self._OLD_VALUE)
value = digital_inputs.value # i2c data transfer
if old_value is not None and value != old_value:
for channel in range(0, len(digital_inputs.channels)):
state = (value >> channel) & 0x01
old_state = (old_value >> channel) & 0x01
if state != old_state:
callback = callbacks.get(channel)
if callback is not None:
callback(state)
setattr(digital_inputs, self._OLD_VALUE, value)
class I2CHatsManager(threading.Thread):
"""Manages all I2C-HATs instances."""
_EXCEPTION = "exception"
_CALLBACKS = "callbacks"
def __init__(self):
"""Init I2C-HATs Manager."""
threading.Thread.__init__(self)
self._lock = threading.Lock()
self._i2c_hats = {}
self._run = False
self._di_scanner = I2CHatsDIScanner()
def register_board(self, board, address):
"""Register I2C-HAT."""
with self._lock:
i2c_hat = self._i2c_hats.get(address)
if i2c_hat is None:
# This is a Pi module and can't be installed in CI without
# breaking the build.
# pylint: disable=import-outside-toplevel,import-error
import raspihats.i2c_hats as module
constructor = getattr(module, board)
i2c_hat = constructor(address)
setattr(i2c_hat, self._CALLBACKS, {})
# Setting exception attribute will trigger online callbacks
# when keep alive thread starts.
setattr(i2c_hat, self._EXCEPTION, None)
self._di_scanner.setup(i2c_hat)
self._i2c_hats[address] = i2c_hat
status_word = i2c_hat.status # read status_word to reset bits
_LOGGER.info(log_message(self, i2c_hat, "registered", status_word))
def run(self):
"""Keep alive for I2C-HATs."""
# This is a Pi module and can't be installed in CI without
# breaking the build.
# pylint: disable=import-outside-toplevel,import-error
from raspihats.i2c_hats import ResponseException
_LOGGER.info(log_message(self, "starting"))
while self._run:
with self._lock:
for i2c_hat in list(self._i2c_hats.values()):
try:
self._di_scanner.scan(i2c_hat)
self._read_status(i2c_hat)
if hasattr(i2c_hat, self._EXCEPTION):
if getattr(i2c_hat, self._EXCEPTION) is not None:
_LOGGER.warning(
log_message(self, i2c_hat, "online again")
)
delattr(i2c_hat, self._EXCEPTION)
# trigger online callbacks
callbacks = getattr(i2c_hat, self._CALLBACKS)
for callback in list(callbacks.values()):
callback()
except ResponseException as ex:
if not hasattr(i2c_hat, self._EXCEPTION):
_LOGGER.error(log_message(self, i2c_hat, ex))
setattr(i2c_hat, self._EXCEPTION, ex)
time.sleep(0.05)
_LOGGER.info(log_message(self, "exiting"))
def _read_status(self, i2c_hat):
"""Read I2C-HATs status."""
status_word = i2c_hat.status
if status_word.value != 0x00:
_LOGGER.error(log_message(self, i2c_hat, status_word))
def start_keep_alive(self):
"""Start keep alive mechanism."""
self._run = True
threading.Thread.start(self)
def stop_keep_alive(self):
"""Stop keep alive mechanism."""
self._run = False
self.join()
def register_di_callback(self, address, channel, callback):
"""Register I2C-HAT digital input edge callback."""
with self._lock:
i2c_hat = self._i2c_hats[address]
self._di_scanner.register_callback(i2c_hat, channel, callback)
def register_online_callback(self, address, channel, callback):
"""Register I2C-HAT online callback."""
with self._lock:
i2c_hat = self._i2c_hats[address]
callbacks = getattr(i2c_hat, self._CALLBACKS)
callbacks[channel] = callback
setattr(i2c_hat, self._CALLBACKS, callbacks)
def read_di(self, address, channel):
"""Read a value from a I2C-HAT digital input."""
# This is a Pi module and can't be installed in CI without
# breaking the build.
# pylint: disable=import-outside-toplevel,import-error
from raspihats.i2c_hats import ResponseException
with self._lock:
i2c_hat = self._i2c_hats[address]
try:
value = i2c_hat.di.value
return (value >> channel) & 0x01
except ResponseException as ex:
raise I2CHatsException(str(ex)) from ex
def write_dq(self, address, channel, value):
"""Write a value to a I2C-HAT digital output."""
# This is a Pi module and can't be installed in CI without
# breaking the build.
# pylint: disable=import-outside-toplevel,import-error
from raspihats.i2c_hats import ResponseException
with self._lock:
i2c_hat = self._i2c_hats[address]
try:
i2c_hat.dq.channels[channel] = value
except ResponseException as ex:
raise I2CHatsException(str(ex)) from ex
def read_dq(self, address, channel):
"""Read a value from a I2C-HAT digital output."""
# This is a Pi module and can't be installed in CI without
# breaking the build.
# pylint: disable=import-outside-toplevel,import-error
from raspihats.i2c_hats import ResponseException
with self._lock:
i2c_hat = self._i2c_hats[address]
try:
return i2c_hat.dq.channels[channel]
except ResponseException as ex:
raise I2CHatsException(str(ex)) from ex