Centralize validation for modbus config (#108906)

Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
This commit is contained in:
jan iversen 2024-02-02 12:42:12 +01:00 committed by GitHub
parent d3dbd6fa70
commit 90ec361fc9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 187 additions and 191 deletions

View file

@ -203,141 +203,6 @@ def duplicate_fan_mode_validator(config: dict[str, Any]) -> dict:
return config
def scan_interval_validator(config: dict) -> dict:
"""Control scan_interval."""
for hub in config:
minimum_scan_interval = DEFAULT_SCAN_INTERVAL
for component, conf_key in PLATFORMS:
if conf_key not in hub:
continue
for entry in hub[conf_key]:
scan_interval = entry.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
if scan_interval == 0:
continue
if scan_interval < 5:
_LOGGER.warning(
(
"%s %s scan_interval(%d) is lower than 5 seconds, "
"which may cause Home Assistant stability issues"
),
component,
entry.get(CONF_NAME),
scan_interval,
)
entry[CONF_SCAN_INTERVAL] = scan_interval
minimum_scan_interval = min(scan_interval, minimum_scan_interval)
if (
CONF_TIMEOUT in hub
and hub[CONF_TIMEOUT] > minimum_scan_interval - 1
and minimum_scan_interval > 1
):
_LOGGER.warning(
"Modbus %s timeout(%d) is adjusted(%d) due to scan_interval",
hub.get(CONF_NAME, ""),
hub[CONF_TIMEOUT],
minimum_scan_interval - 1,
)
hub[CONF_TIMEOUT] = minimum_scan_interval - 1
return config
def duplicate_entity_validator(config: dict) -> dict:
"""Control scan_interval."""
for hub_index, hub in enumerate(config):
for component, conf_key in PLATFORMS:
if conf_key not in hub:
continue
names: set[str] = set()
errors: list[int] = []
addresses: set[str] = set()
for index, entry in enumerate(hub[conf_key]):
name = entry[CONF_NAME]
addr = str(entry[CONF_ADDRESS])
if CONF_INPUT_TYPE in entry:
addr += "_" + str(entry[CONF_INPUT_TYPE])
elif CONF_WRITE_TYPE in entry:
addr += "_" + str(entry[CONF_WRITE_TYPE])
if CONF_COMMAND_ON in entry:
addr += "_" + str(entry[CONF_COMMAND_ON])
if CONF_COMMAND_OFF in entry:
addr += "_" + str(entry[CONF_COMMAND_OFF])
inx = entry.get(CONF_SLAVE, None) or entry.get(CONF_DEVICE_ADDRESS, 0)
addr += "_" + str(inx)
entry_addrs: set[str] = set()
entry_addrs.add(addr)
if CONF_TARGET_TEMP in entry:
a = str(entry[CONF_TARGET_TEMP])
a += "_" + str(inx)
entry_addrs.add(a)
if CONF_HVAC_MODE_REGISTER in entry:
a = str(entry[CONF_HVAC_MODE_REGISTER][CONF_ADDRESS])
a += "_" + str(inx)
entry_addrs.add(a)
if CONF_FAN_MODE_REGISTER in entry:
a = str(
entry[CONF_FAN_MODE_REGISTER][CONF_ADDRESS]
if isinstance(entry[CONF_FAN_MODE_REGISTER][CONF_ADDRESS], int)
else entry[CONF_FAN_MODE_REGISTER][CONF_ADDRESS][0]
)
a += "_" + str(inx)
entry_addrs.add(a)
dup_addrs = entry_addrs.intersection(addresses)
if len(dup_addrs) > 0:
for addr in dup_addrs:
err = (
f"Modbus {component}/{name} address {addr} is duplicate, second"
" entry not loaded!"
)
_LOGGER.warning(err)
errors.append(index)
elif name in names:
err = (
f"Modbus {component}/{name}  is duplicate, second entry not"
" loaded!"
)
_LOGGER.warning(err)
errors.append(index)
else:
names.add(name)
addresses.update(entry_addrs)
for i in reversed(errors):
del config[hub_index][conf_key][i]
return config
def duplicate_modbus_validator(config: dict) -> dict:
"""Control modbus connection for duplicates."""
hosts: set[str] = set()
names: set[str] = set()
errors = []
for index, hub in enumerate(config):
name = hub.get(CONF_NAME, DEFAULT_HUB)
if hub[CONF_TYPE] == SERIAL:
host = hub[CONF_PORT]
else:
host = f"{hub[CONF_HOST]}_{hub[CONF_PORT]}"
if host in hosts:
err = f"Modbus {name} contains duplicate host/port {host}, not loaded!"
_LOGGER.warning(err)
errors.append(index)
elif name in names:
err = f"Modbus {name} is duplicate, second entry not loaded!"
_LOGGER.warning(err)
errors.append(index)
else:
hosts.add(host)
names.add(name)
for i in reversed(errors):
del config[i]
return config
def register_int_list_validator(value: Any) -> Any:
"""Check if a register (CONF_ADRESS) is an int or a list having only 1 register."""
if isinstance(value, int) and value >= 0:
@ -354,7 +219,125 @@ def register_int_list_validator(value: Any) -> Any:
def check_config(config: dict) -> dict:
"""Do final config check."""
config2 = duplicate_modbus_validator(config)
config3 = scan_interval_validator(config2)
config4 = duplicate_entity_validator(config3)
return config4
hosts: set[str] = set()
hub_names: set[str] = set()
hub_name_inx = 0
minimum_scan_interval = 0
ent_names: set[str] = set()
ent_addr: set[str] = set()
def validate_modbus(hub: dict, hub_name_inx: int) -> bool:
"""Validate modbus entries."""
host: str = (
hub[CONF_PORT]
if hub[CONF_TYPE] == SERIAL
else f"{hub[CONF_HOST]}_{hub[CONF_PORT]}"
)
if CONF_NAME not in hub:
hub[CONF_NAME] = (
DEFAULT_HUB if not hub_name_inx else f"{DEFAULT_HUB}_{hub_name_inx}"
)
hub_name_inx += 1
err = f"Modbus host/port {host} is missing name, added {hub[CONF_NAME]}!"
_LOGGER.warning(err)
name = hub[CONF_NAME]
if host in hosts or name in hub_names:
err = f"Modbus {name} host/port {host} is duplicate, not loaded!"
_LOGGER.warning(err)
return False
hosts.add(host)
hub_names.add(name)
return True
def validate_entity(
hub_name: str,
entity: dict,
minimum_scan_interval: int,
ent_names: set,
ent_addr: set,
) -> bool:
"""Validate entity."""
name = entity[CONF_NAME]
addr = str(entity[CONF_ADDRESS])
scan_interval = entity.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
if scan_interval < 5:
_LOGGER.warning(
(
"%s %s scan_interval(%d) is lower than 5 seconds, "
"which may cause Home Assistant stability issues"
),
hub_name,
name,
scan_interval,
)
entity[CONF_SCAN_INTERVAL] = scan_interval
minimum_scan_interval = min(scan_interval, minimum_scan_interval)
for conf_type in (
CONF_INPUT_TYPE,
CONF_WRITE_TYPE,
CONF_COMMAND_ON,
CONF_COMMAND_OFF,
):
if conf_type in entity:
addr += f"_{entity[conf_type]}"
inx = entity.get(CONF_SLAVE, None) or entity.get(CONF_DEVICE_ADDRESS, 0)
addr += f"_{inx}"
loc_addr: set[str] = {addr}
if CONF_TARGET_TEMP in entity:
loc_addr.add(f"{entity[CONF_TARGET_TEMP]}_{inx}")
if CONF_HVAC_MODE_REGISTER in entity:
loc_addr.add(f"{entity[CONF_HVAC_MODE_REGISTER][CONF_ADDRESS]}_{inx}")
if CONF_FAN_MODE_REGISTER in entity:
loc_addr.add(f"{entity[CONF_FAN_MODE_REGISTER][CONF_ADDRESS]}_{inx}")
dup_addrs = ent_addr.intersection(loc_addr)
if len(dup_addrs) > 0:
for addr in dup_addrs:
err = (
f"Modbus {hub_name}/{name} address {addr} is duplicate, second"
" entry not loaded!"
)
_LOGGER.warning(err)
return False
if name in ent_names:
err = f"Modbus {hub_name}/{name} is duplicate, second entry not loaded!"
_LOGGER.warning(err)
return False
ent_names.add(name)
ent_addr.update(loc_addr)
return True
hub_inx = 0
while hub_inx < len(config):
hub = config[hub_inx]
if not validate_modbus(hub, hub_name_inx):
del config[hub_inx]
continue
for _component, conf_key in PLATFORMS:
if conf_key not in hub:
continue
entity_inx = 0
entities = hub[conf_key]
minimum_scan_interval = 9999
while entity_inx < len(entities):
if not validate_entity(
hub[CONF_NAME],
entities[entity_inx],
minimum_scan_interval,
ent_names,
ent_addr,
):
del entities[entity_inx]
else:
entity_inx += 1
if hub[CONF_TIMEOUT] >= minimum_scan_interval:
hub[CONF_TIMEOUT] = minimum_scan_interval - 1
_LOGGER.warning(
"Modbus %s timeout is adjusted(%d) due to scan_interval",
hub[CONF_NAME],
hub[CONF_TIMEOUT],
)
hub_inx += 1
return config