Add Config Flow to AlarmDecoder (#37998)
This commit is contained in:
parent
17efa1bda5
commit
c32f698671
14 changed files with 1169 additions and 210 deletions
356
homeassistant/components/alarmdecoder/config_flow.py
Normal file
356
homeassistant/components/alarmdecoder/config_flow.py
Normal file
|
@ -0,0 +1,356 @@
|
|||
"""Config flow for AlarmDecoder."""
|
||||
import logging
|
||||
|
||||
from adext import AdExt
|
||||
from alarmdecoder.devices import SerialDevice, SocketDevice
|
||||
from alarmdecoder.util import NoDeviceError
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components.binary_sensor import DEVICE_CLASSES
|
||||
from homeassistant.const import CONF_HOST, CONF_PORT, CONF_PROTOCOL
|
||||
from homeassistant.core import callback
|
||||
|
||||
from .const import ( # pylint: disable=unused-import
|
||||
CONF_ALT_NIGHT_MODE,
|
||||
CONF_AUTO_BYPASS,
|
||||
CONF_CODE_ARM_REQUIRED,
|
||||
CONF_DEVICE_BAUD,
|
||||
CONF_DEVICE_PATH,
|
||||
CONF_RELAY_ADDR,
|
||||
CONF_RELAY_CHAN,
|
||||
CONF_ZONE_LOOP,
|
||||
CONF_ZONE_NAME,
|
||||
CONF_ZONE_NUMBER,
|
||||
CONF_ZONE_RFID,
|
||||
CONF_ZONE_TYPE,
|
||||
DEFAULT_ARM_OPTIONS,
|
||||
DEFAULT_DEVICE_BAUD,
|
||||
DEFAULT_DEVICE_HOST,
|
||||
DEFAULT_DEVICE_PATH,
|
||||
DEFAULT_DEVICE_PORT,
|
||||
DEFAULT_ZONE_OPTIONS,
|
||||
DEFAULT_ZONE_TYPE,
|
||||
DOMAIN,
|
||||
OPTIONS_ARM,
|
||||
OPTIONS_ZONES,
|
||||
PROTOCOL_SERIAL,
|
||||
PROTOCOL_SOCKET,
|
||||
)
|
||||
|
||||
EDIT_KEY = "edit_selection"
|
||||
EDIT_ZONES = "Zones"
|
||||
EDIT_SETTINGS = "Arming Settings"
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AlarmDecoderFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a AlarmDecoder config flow."""
|
||||
|
||||
VERSION = 1
|
||||
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_PUSH
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize AlarmDecoder ConfigFlow."""
|
||||
self.protocol = None
|
||||
|
||||
@staticmethod
|
||||
@callback
|
||||
def async_get_options_flow(config_entry):
|
||||
"""Get the options flow for AlarmDecoder."""
|
||||
return AlarmDecoderOptionsFlowHandler(config_entry)
|
||||
|
||||
async def async_step_user(self, user_input=None):
|
||||
"""Handle a flow initialized by the user."""
|
||||
if user_input is not None:
|
||||
self.protocol = user_input[CONF_PROTOCOL]
|
||||
return await self.async_step_protocol()
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_PROTOCOL): vol.In(
|
||||
[PROTOCOL_SOCKET, PROTOCOL_SERIAL]
|
||||
),
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
async def async_step_protocol(self, user_input=None):
|
||||
"""Handle AlarmDecoder protocol setup."""
|
||||
errors = {}
|
||||
if user_input is not None:
|
||||
if _device_already_added(
|
||||
self._async_current_entries(), user_input, self.protocol
|
||||
):
|
||||
return self.async_abort(reason="already_configured")
|
||||
connection = {}
|
||||
if self.protocol == PROTOCOL_SOCKET:
|
||||
baud = connection[CONF_DEVICE_BAUD] = None
|
||||
host = connection[CONF_HOST] = user_input[CONF_HOST]
|
||||
port = connection[CONF_PORT] = user_input[CONF_PORT]
|
||||
title = f"{host}:{port}"
|
||||
device = SocketDevice(interface=(host, port))
|
||||
if self.protocol == PROTOCOL_SERIAL:
|
||||
path = connection[CONF_DEVICE_PATH] = user_input[CONF_DEVICE_PATH]
|
||||
baud = connection[CONF_DEVICE_BAUD] = user_input[CONF_DEVICE_BAUD]
|
||||
title = path
|
||||
device = SerialDevice(interface=path)
|
||||
|
||||
controller = AdExt(device)
|
||||
try:
|
||||
with controller:
|
||||
controller.open(baudrate=baud)
|
||||
return self.async_create_entry(
|
||||
title=title, data={CONF_PROTOCOL: self.protocol, **connection}
|
||||
)
|
||||
except NoDeviceError:
|
||||
errors["base"] = "service_unavailable"
|
||||
except Exception: # pylint: disable=broad-except
|
||||
_LOGGER.exception("Unexpected exception during AlarmDecoder setup")
|
||||
errors["base"] = "unknown"
|
||||
|
||||
if self.protocol == PROTOCOL_SOCKET:
|
||||
schema = vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_HOST, default=DEFAULT_DEVICE_HOST): str,
|
||||
vol.Required(CONF_PORT, default=DEFAULT_DEVICE_PORT): int,
|
||||
}
|
||||
)
|
||||
if self.protocol == PROTOCOL_SERIAL:
|
||||
schema = vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_DEVICE_PATH, default=DEFAULT_DEVICE_PATH): str,
|
||||
vol.Required(CONF_DEVICE_BAUD, default=DEFAULT_DEVICE_BAUD): int,
|
||||
}
|
||||
)
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="protocol",
|
||||
data_schema=schema,
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
|
||||
class AlarmDecoderOptionsFlowHandler(config_entries.OptionsFlow):
|
||||
"""Handle AlarmDecoder options."""
|
||||
|
||||
def __init__(self, config_entry: config_entries.ConfigEntry):
|
||||
"""Initialize AlarmDecoder options flow."""
|
||||
self.arm_options = config_entry.options.get(OPTIONS_ARM, DEFAULT_ARM_OPTIONS)
|
||||
self.zone_options = config_entry.options.get(
|
||||
OPTIONS_ZONES, DEFAULT_ZONE_OPTIONS
|
||||
)
|
||||
self.selected_zone = None
|
||||
|
||||
async def async_step_init(self, user_input=None):
|
||||
"""Manage the options."""
|
||||
if user_input is not None:
|
||||
if user_input[EDIT_KEY] == EDIT_SETTINGS:
|
||||
return await self.async_step_arm_settings()
|
||||
if user_input[EDIT_KEY] == EDIT_ZONES:
|
||||
return await self.async_step_zone_select()
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="init",
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Required(EDIT_KEY, default=EDIT_SETTINGS): vol.In(
|
||||
[EDIT_SETTINGS, EDIT_ZONES]
|
||||
)
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
async def async_step_arm_settings(self, user_input=None):
|
||||
"""Arming options form."""
|
||||
if user_input is not None:
|
||||
return self.async_create_entry(
|
||||
title="",
|
||||
data={OPTIONS_ARM: user_input, OPTIONS_ZONES: self.zone_options},
|
||||
)
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="arm_settings",
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Optional(
|
||||
CONF_ALT_NIGHT_MODE,
|
||||
default=self.arm_options[CONF_ALT_NIGHT_MODE],
|
||||
): bool,
|
||||
vol.Optional(
|
||||
CONF_AUTO_BYPASS, default=self.arm_options[CONF_AUTO_BYPASS]
|
||||
): bool,
|
||||
vol.Optional(
|
||||
CONF_CODE_ARM_REQUIRED,
|
||||
default=self.arm_options[CONF_CODE_ARM_REQUIRED],
|
||||
): bool,
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
async def async_step_zone_select(self, user_input=None):
|
||||
"""Zone selection form."""
|
||||
errors = _validate_zone_input(user_input)
|
||||
|
||||
if user_input is not None and not errors:
|
||||
self.selected_zone = str(
|
||||
int(user_input[CONF_ZONE_NUMBER])
|
||||
) # remove leading zeros
|
||||
return await self.async_step_zone_details()
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="zone_select",
|
||||
data_schema=vol.Schema({vol.Required(CONF_ZONE_NUMBER): str}),
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
async def async_step_zone_details(self, user_input=None):
|
||||
"""Zone details form."""
|
||||
errors = _validate_zone_input(user_input)
|
||||
|
||||
if user_input is not None and not errors:
|
||||
zone_options = self.zone_options.copy()
|
||||
zone_id = self.selected_zone
|
||||
zone_options[zone_id] = _fix_input_types(user_input)
|
||||
|
||||
# Delete zone entry if zone_name is omitted
|
||||
if CONF_ZONE_NAME not in zone_options[zone_id]:
|
||||
zone_options.pop(zone_id)
|
||||
|
||||
return self.async_create_entry(
|
||||
title="",
|
||||
data={OPTIONS_ARM: self.arm_options, OPTIONS_ZONES: zone_options},
|
||||
)
|
||||
|
||||
existing_zone_settings = self.zone_options.get(self.selected_zone, {})
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="zone_details",
|
||||
description_placeholders={CONF_ZONE_NUMBER: self.selected_zone},
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Optional(
|
||||
CONF_ZONE_NAME,
|
||||
description={
|
||||
"suggested_value": existing_zone_settings.get(
|
||||
CONF_ZONE_NAME
|
||||
)
|
||||
},
|
||||
): str,
|
||||
vol.Optional(
|
||||
CONF_ZONE_TYPE,
|
||||
default=existing_zone_settings.get(
|
||||
CONF_ZONE_TYPE, DEFAULT_ZONE_TYPE
|
||||
),
|
||||
): vol.In(DEVICE_CLASSES),
|
||||
vol.Optional(
|
||||
CONF_ZONE_RFID,
|
||||
description={
|
||||
"suggested_value": existing_zone_settings.get(
|
||||
CONF_ZONE_RFID
|
||||
)
|
||||
},
|
||||
): str,
|
||||
vol.Optional(
|
||||
CONF_ZONE_LOOP,
|
||||
description={
|
||||
"suggested_value": existing_zone_settings.get(
|
||||
CONF_ZONE_LOOP
|
||||
)
|
||||
},
|
||||
): str,
|
||||
vol.Optional(
|
||||
CONF_RELAY_ADDR,
|
||||
description={
|
||||
"suggested_value": existing_zone_settings.get(
|
||||
CONF_RELAY_ADDR
|
||||
)
|
||||
},
|
||||
): str,
|
||||
vol.Optional(
|
||||
CONF_RELAY_CHAN,
|
||||
description={
|
||||
"suggested_value": existing_zone_settings.get(
|
||||
CONF_RELAY_CHAN
|
||||
)
|
||||
},
|
||||
): str,
|
||||
}
|
||||
),
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
|
||||
def _validate_zone_input(zone_input):
|
||||
if not zone_input:
|
||||
return {}
|
||||
errors = {}
|
||||
|
||||
# CONF_RELAY_ADDR & CONF_RELAY_CHAN are inclusive
|
||||
if (CONF_RELAY_ADDR in zone_input and CONF_RELAY_CHAN not in zone_input) or (
|
||||
CONF_RELAY_ADDR not in zone_input and CONF_RELAY_CHAN in zone_input
|
||||
):
|
||||
errors["base"] = "relay_inclusive"
|
||||
|
||||
# The following keys must be int
|
||||
for key in [CONF_ZONE_NUMBER, CONF_ZONE_LOOP, CONF_RELAY_ADDR, CONF_RELAY_CHAN]:
|
||||
if key in zone_input:
|
||||
try:
|
||||
int(zone_input[key])
|
||||
except ValueError:
|
||||
errors[key] = "int"
|
||||
|
||||
# CONF_ZONE_LOOP depends on CONF_ZONE_RFID
|
||||
if CONF_ZONE_LOOP in zone_input and CONF_ZONE_RFID not in zone_input:
|
||||
errors[CONF_ZONE_LOOP] = "loop_rfid"
|
||||
|
||||
# CONF_ZONE_LOOP must be 1-4
|
||||
if (
|
||||
CONF_ZONE_LOOP in zone_input
|
||||
and zone_input[CONF_ZONE_LOOP].isdigit()
|
||||
and int(zone_input[CONF_ZONE_LOOP]) not in list(range(1, 5))
|
||||
):
|
||||
errors[CONF_ZONE_LOOP] = "loop_range"
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def _fix_input_types(zone_input):
|
||||
"""Convert necessary keys to int.
|
||||
|
||||
Since ConfigFlow inputs of type int cannot default to an empty string, we collect the values below as
|
||||
strings and then convert them to ints.
|
||||
"""
|
||||
|
||||
for key in [CONF_ZONE_LOOP, CONF_RELAY_ADDR, CONF_RELAY_CHAN]:
|
||||
if key in zone_input:
|
||||
zone_input[key] = int(zone_input[key])
|
||||
|
||||
return zone_input
|
||||
|
||||
|
||||
def _device_already_added(current_entries, user_input, protocol):
|
||||
"""Determine if entry has already been added to HA."""
|
||||
user_host = user_input.get(CONF_HOST)
|
||||
user_port = user_input.get(CONF_PORT)
|
||||
user_path = user_input.get(CONF_DEVICE_PATH)
|
||||
user_baud = user_input.get(CONF_DEVICE_BAUD)
|
||||
|
||||
for entry in current_entries:
|
||||
entry_host = entry.data.get(CONF_HOST)
|
||||
entry_port = entry.data.get(CONF_PORT)
|
||||
entry_path = entry.data.get(CONF_DEVICE_PATH)
|
||||
entry_baud = entry.data.get(CONF_DEVICE_BAUD)
|
||||
|
||||
if protocol == PROTOCOL_SOCKET:
|
||||
if user_host == entry_host and user_port == entry_port:
|
||||
return True
|
||||
|
||||
if protocol == PROTOCOL_SERIAL:
|
||||
if user_baud == entry_baud and user_path == entry_path:
|
||||
return True
|
||||
|
||||
return False
|
Loading…
Add table
Add a link
Reference in a new issue