Make validator for modbus table controlled (#99092)

This commit is contained in:
jan iversen 2023-09-03 21:04:58 +02:00 committed by GitHub
parent 7931d74938
commit f85a3861f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 89 additions and 85 deletions

View file

@ -171,7 +171,6 @@ BASE_STRUCT_SCHEMA = BASE_COMPONENT_SCHEMA.extend(
DataType.FLOAT32,
DataType.FLOAT64,
DataType.STRING,
DataType.STRING,
DataType.CUSTOM,
]
),

View file

@ -30,6 +30,8 @@ from .const import (
CONF_SWAP,
CONF_SWAP_BYTE,
CONF_SWAP_NONE,
CONF_SWAP_WORD,
CONF_SWAP_WORD_BYTE,
CONF_WRITE_TYPE,
DEFAULT_HUB,
DEFAULT_SCAN_INTERVAL,
@ -40,97 +42,108 @@ from .const import (
_LOGGER = logging.getLogger(__name__)
ENTRY = namedtuple("ENTRY", ["struct_id", "register_count"])
ENTRY = namedtuple(
"ENTRY",
[
"struct_id",
"register_count",
"validate_parm",
],
)
PARM_IS_LEGAL = namedtuple(
"PARM_IS_LEGAL",
[
"count",
"structure",
"slave_count",
"swap_byte",
"swap_word",
],
)
# PARM_IS_LEGAL defines if the keywords:
# count: ..
# structure: ..
# swap: byte
# swap: word
# swap: word_byte (identical to swap: word)
# are legal to use.
# These keywords are only legal with some datatype: ...
# As expressed in DEFAULT_STRUCT_FORMAT
DEFAULT_STRUCT_FORMAT = {
DataType.INT8: ENTRY("b", 1),
DataType.INT16: ENTRY("h", 1),
DataType.INT32: ENTRY("i", 2),
DataType.INT64: ENTRY("q", 4),
DataType.UINT8: ENTRY("c", 1),
DataType.UINT16: ENTRY("H", 1),
DataType.UINT32: ENTRY("I", 2),
DataType.UINT64: ENTRY("Q", 4),
DataType.FLOAT16: ENTRY("e", 1),
DataType.FLOAT32: ENTRY("f", 2),
DataType.FLOAT64: ENTRY("d", 4),
DataType.STRING: ENTRY("s", 1),
DataType.INT8: ENTRY("b", 1, PARM_IS_LEGAL(False, False, False, False, False)),
DataType.UINT8: ENTRY("c", 1, PARM_IS_LEGAL(False, False, False, False, False)),
DataType.INT16: ENTRY("h", 1, PARM_IS_LEGAL(False, False, True, True, False)),
DataType.UINT16: ENTRY("H", 1, PARM_IS_LEGAL(False, False, True, True, False)),
DataType.FLOAT16: ENTRY("e", 1, PARM_IS_LEGAL(False, False, True, True, False)),
DataType.INT32: ENTRY("i", 2, PARM_IS_LEGAL(False, False, True, True, True)),
DataType.UINT32: ENTRY("I", 2, PARM_IS_LEGAL(False, False, True, True, True)),
DataType.FLOAT32: ENTRY("f", 2, PARM_IS_LEGAL(False, False, True, True, True)),
DataType.INT64: ENTRY("q", 4, PARM_IS_LEGAL(False, False, True, True, True)),
DataType.UINT64: ENTRY("Q", 4, PARM_IS_LEGAL(False, False, True, True, True)),
DataType.FLOAT64: ENTRY("d", 4, PARM_IS_LEGAL(False, False, True, True, True)),
DataType.STRING: ENTRY("s", 1, PARM_IS_LEGAL(True, False, False, False, False)),
DataType.CUSTOM: ENTRY("?", 0, PARM_IS_LEGAL(True, True, False, False, False)),
}
def struct_validator(config: dict[str, Any]) -> dict[str, Any]:
"""Sensor schema validator."""
data_type = config[CONF_DATA_TYPE]
count = config.get(CONF_COUNT, 1)
name = config[CONF_NAME]
structure = config.get(CONF_STRUCTURE)
slave_count = config.get(CONF_SLAVE_COUNT, 0) + 1
data_type = config[CONF_DATA_TYPE]
if data_type == "int":
data_type = config[CONF_DATA_TYPE] = DataType.INT16
count = config.get(CONF_COUNT, None)
structure = config.get(CONF_STRUCTURE, None)
slave_count = config.get(CONF_SLAVE_COUNT, None)
swap_type = config.get(CONF_SWAP, CONF_SWAP_NONE)
if (
slave_count > 1
and count > 1
and data_type not in (DataType.CUSTOM, DataType.STRING)
):
error = f"{name} {CONF_COUNT} cannot be mixed with {data_type}"
validator = DEFAULT_STRUCT_FORMAT[data_type].validate_parm
if count and not validator.count:
error = f"{name}: `{CONF_COUNT}: {count}` cannot be combined with `{CONF_DATA_TYPE}: {data_type}`"
raise vol.Invalid(error)
if config[CONF_DATA_TYPE] != DataType.CUSTOM:
if structure:
error = f"{name} structure: cannot be mixed with {data_type}"
if not count and validator.count:
error = f"{name}: `{CONF_COUNT}:` missing, demanded with `{CONF_DATA_TYPE}: {data_type}`"
raise vol.Invalid(error)
if structure and not validator.structure:
error = f"{name}: `{CONF_STRUCTURE}: {structure}` cannot be combined with `{CONF_DATA_TYPE}: {data_type}`"
raise vol.Invalid(error)
if not structure and validator.structure:
error = f"{name}: `{CONF_STRUCTURE}` missing or empty, demanded with `{CONF_DATA_TYPE}: {data_type}`"
raise vol.Invalid(error)
if slave_count and not validator.slave_count:
error = f"{name}: `{CONF_SLAVE_COUNT}: {slave_count}` cannot be combined with `{CONF_DATA_TYPE}: {data_type}`"
raise vol.Invalid(error)
if swap_type != CONF_SWAP_NONE:
swap_type_validator = {
CONF_SWAP_NONE: False,
CONF_SWAP_BYTE: validator.swap_byte,
CONF_SWAP_WORD: validator.swap_word,
CONF_SWAP_WORD_BYTE: validator.swap_word,
}[swap_type]
if not swap_type_validator:
error = f"{name}: `{CONF_SWAP}:{swap_type}` cannot be combined with `{CONF_DATA_TYPE}: {data_type}`"
raise vol.Invalid(error)
if config[CONF_DATA_TYPE] == DataType.CUSTOM:
if slave_count > 1:
error = f"{name}: `{CONF_STRUCTURE}` illegal with `{CONF_SLAVE_COUNT}` / `{CONF_SLAVE}`"
raise vol.Invalid(error)
if swap_type != CONF_SWAP_NONE:
error = f"{name}: `{CONF_STRUCTURE}` illegal with `{CONF_SWAP}`"
raise vol.Invalid(error)
if not structure:
error = (
f"Error in sensor {name}. The `{CONF_STRUCTURE}` field cannot be empty"
)
raise vol.Invalid(error)
try:
size = struct.calcsize(structure)
except struct.error as err:
raise vol.Invalid(f"Error in {name} structure: {str(err)}") from err
count = config.get(CONF_COUNT, 1)
raise vol.Invalid(
f"{name}: error in structure format --> {str(err)}"
) from err
bytecount = count * 2
if bytecount != size:
raise vol.Invalid(
f"Structure request {size} bytes, "
f"but {count} registers have a size of {bytecount} bytes"
f"{name}: Size of structure is {size} bytes but `{CONF_COUNT}: {count}` is {bytecount} bytes"
)
return {
**config,
CONF_STRUCTURE: structure,
CONF_SWAP: swap_type,
}
if data_type not in DEFAULT_STRUCT_FORMAT:
error = f"Error in sensor {name}. data_type `{data_type}` not supported"
raise vol.Invalid(error)
if slave_count > 1 and data_type == DataType.STRING:
error = f"{name}: `{data_type}` illegal with `{CONF_SLAVE_COUNT}`"
raise vol.Invalid(error)
if CONF_COUNT not in config:
config[CONF_COUNT] = DEFAULT_STRUCT_FORMAT[data_type].register_count
if swap_type != CONF_SWAP_NONE:
if swap_type == CONF_SWAP_BYTE:
regs_needed = 1
else: # CONF_SWAP_WORD_BYTE, CONF_SWAP_WORD
regs_needed = 2
count = config[CONF_COUNT]
if count < regs_needed or (count % regs_needed) != 0:
raise vol.Invalid(
f"Error in sensor {name} swap({swap_type}) "
f"impossible because datatype({data_type}) is too small"
)
structure = f">{DEFAULT_STRUCT_FORMAT[data_type].struct_id}"
if slave_count > 1:
structure = f">{slave_count}{DEFAULT_STRUCT_FORMAT[data_type].struct_id}"
else:
structure = f">{DEFAULT_STRUCT_FORMAT[data_type].struct_id}"
config[CONF_COUNT] = DEFAULT_STRUCT_FORMAT[data_type].register_count
if slave_count:
structure = (
f">{slave_count + 1}{DEFAULT_STRUCT_FORMAT[data_type].struct_id}"
)
else:
structure = f">{DEFAULT_STRUCT_FORMAT[data_type].struct_id}"
return {
**config,
CONF_STRUCTURE: structure,

View file

@ -187,7 +187,7 @@ async def test_config_sensor(hass: HomeAssistant, mock_modbus) -> None:
},
]
},
"Structure request 16 bytes, but 2 registers have a size of 4 bytes",
f"{TEST_ENTITY_NAME}: Size of structure is 16 bytes but `{CONF_COUNT}: 2` is 4 bytes",
),
(
{
@ -212,12 +212,11 @@ async def test_config_sensor(hass: HomeAssistant, mock_modbus) -> None:
CONF_ADDRESS: 1234,
CONF_DATA_TYPE: DataType.CUSTOM,
CONF_COUNT: 4,
CONF_SWAP: CONF_SWAP_NONE,
CONF_STRUCTURE: "",
},
]
},
f"Error in sensor {TEST_ENTITY_NAME}. The `structure` field cannot be empty",
f"{TEST_ENTITY_NAME}: `{CONF_STRUCTURE}` missing or empty, demanded with `{CONF_DATA_TYPE}: {DataType.CUSTOM}`",
),
(
{
@ -227,12 +226,11 @@ async def test_config_sensor(hass: HomeAssistant, mock_modbus) -> None:
CONF_ADDRESS: 1234,
CONF_DATA_TYPE: DataType.CUSTOM,
CONF_COUNT: 4,
CONF_SWAP: CONF_SWAP_NONE,
CONF_STRUCTURE: "1s",
},
]
},
"Structure request 1 bytes, but 4 registers have a size of 8 bytes",
f"{TEST_ENTITY_NAME}: Size of structure is 1 bytes but `{CONF_COUNT}: 4` is 8 bytes",
),
(
{
@ -247,7 +245,7 @@ async def test_config_sensor(hass: HomeAssistant, mock_modbus) -> None:
},
]
},
f"{TEST_ENTITY_NAME}: `structure` illegal with `swap`",
f"{TEST_ENTITY_NAME}: `{CONF_SWAP}:{CONF_SWAP_WORD}` cannot be combined with `{CONF_DATA_TYPE}: {DataType.CUSTOM}`",
),
],
)
@ -1011,7 +1009,6 @@ async def test_struct_sensor(hass: HomeAssistant, mock_do_cycle, expected) -> No
[
(
{
CONF_COUNT: 1,
CONF_SWAP: CONF_SWAP_NONE,
CONF_DATA_TYPE: DataType.UINT16,
},
@ -1020,7 +1017,6 @@ async def test_struct_sensor(hass: HomeAssistant, mock_do_cycle, expected) -> No
),
(
{
CONF_COUNT: 1,
CONF_SWAP: CONF_SWAP_BYTE,
CONF_DATA_TYPE: DataType.UINT16,
},
@ -1029,7 +1025,6 @@ async def test_struct_sensor(hass: HomeAssistant, mock_do_cycle, expected) -> No
),
(
{
CONF_COUNT: 2,
CONF_SWAP: CONF_SWAP_NONE,
CONF_DATA_TYPE: DataType.UINT32,
},
@ -1038,7 +1033,6 @@ async def test_struct_sensor(hass: HomeAssistant, mock_do_cycle, expected) -> No
),
(
{
CONF_COUNT: 2,
CONF_SWAP: CONF_SWAP_BYTE,
CONF_DATA_TYPE: DataType.UINT32,
},
@ -1047,7 +1041,6 @@ async def test_struct_sensor(hass: HomeAssistant, mock_do_cycle, expected) -> No
),
(
{
CONF_COUNT: 2,
CONF_SWAP: CONF_SWAP_WORD,
CONF_DATA_TYPE: DataType.UINT32,
},
@ -1056,7 +1049,6 @@ async def test_struct_sensor(hass: HomeAssistant, mock_do_cycle, expected) -> No
),
(
{
CONF_COUNT: 2,
CONF_SWAP: CONF_SWAP_WORD_BYTE,
CONF_DATA_TYPE: DataType.UINT32,
},