Simplify calls to pymodbus (#50717)

* simplify pymodbus_call.

Do not call with a function object and a check attribute, call instead
with CALL_TYPE*.

Avoid if <type> call x else call y.
Call async_pymodbus_call directly, instead of unpacking/packing.

* Declare call type in __init__.

* Modbus.py back to 100% test coverage.
This commit is contained in:
jan iversen 2021-05-17 11:20:12 +02:00 committed by GitHub
parent 7b18860dcd
commit ff856a9bba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 92 additions and 34 deletions

View file

@ -27,6 +27,14 @@ from .const import (
ATTR_STATE,
ATTR_UNIT,
ATTR_VALUE,
CALL_TYPE_COIL,
CALL_TYPE_DISCRETE,
CALL_TYPE_REGISTER_HOLDING,
CALL_TYPE_REGISTER_INPUT,
CALL_TYPE_WRITE_COIL,
CALL_TYPE_WRITE_COILS,
CALL_TYPE_WRITE_REGISTER,
CALL_TYPE_WRITE_REGISTERS,
CONF_BAUDRATE,
CONF_BYTESIZE,
CONF_CLOSE_COMM_ON_ERROR,
@ -39,6 +47,9 @@ from .const import (
SERVICE_WRITE_REGISTER,
)
ENTRY_FUNC = "func"
ENTRY_ATTR = "attr"
_LOGGER = logging.getLogger(__name__)
@ -145,6 +156,41 @@ class ModbusHub:
# network configuration
self._config_host = client_config[CONF_HOST]
self._call_type = {
CALL_TYPE_COIL: {
ENTRY_ATTR: "bits",
ENTRY_FUNC: None,
},
CALL_TYPE_DISCRETE: {
ENTRY_ATTR: "bits",
ENTRY_FUNC: None,
},
CALL_TYPE_REGISTER_HOLDING: {
ENTRY_ATTR: "registers",
ENTRY_FUNC: None,
},
CALL_TYPE_REGISTER_INPUT: {
ENTRY_ATTR: "registers",
ENTRY_FUNC: None,
},
CALL_TYPE_WRITE_COIL: {
ENTRY_ATTR: "value",
ENTRY_FUNC: None,
},
CALL_TYPE_WRITE_COILS: {
ENTRY_ATTR: "count",
ENTRY_FUNC: None,
},
CALL_TYPE_WRITE_REGISTER: {
ENTRY_ATTR: "value",
ENTRY_FUNC: None,
},
CALL_TYPE_WRITE_REGISTERS: {
ENTRY_ATTR: "count",
ENTRY_FUNC: None,
},
}
@property
def name(self):
"""Return the name of this hub."""
@ -202,6 +248,25 @@ class ModbusHub:
async with self._lock:
await self.hass.async_add_executor_job(self._pymodbus_connect)
self._call_type[CALL_TYPE_COIL][ENTRY_FUNC] = self._client.read_coils
self._call_type[CALL_TYPE_DISCRETE][
ENTRY_FUNC
] = self._client.read_discrete_inputs
self._call_type[CALL_TYPE_REGISTER_HOLDING][
ENTRY_FUNC
] = self._client.read_holding_registers
self._call_type[CALL_TYPE_REGISTER_INPUT][
ENTRY_FUNC
] = self._client.read_input_registers
self._call_type[CALL_TYPE_WRITE_COIL][ENTRY_FUNC] = self._client.write_coil
self._call_type[CALL_TYPE_WRITE_COILS][ENTRY_FUNC] = self._client.write_coils
self._call_type[CALL_TYPE_WRITE_REGISTER][
ENTRY_FUNC
] = self._client.write_register
self._call_type[CALL_TYPE_WRITE_REGISTERS][
ENTRY_FUNC
] = self._client.write_registers
# Start counting down to allow modbus requests.
if self._config_delay:
self._async_cancel_listener = async_call_later(
@ -239,73 +304,69 @@ class ModbusHub:
except ModbusException as exception_error:
self._log_error(exception_error, error_state=False)
def _pymodbus_call(self, unit, address, value, check_attr, func):
def _pymodbus_call(self, unit, address, value, use_call):
"""Call sync. pymodbus."""
kwargs = {"unit": unit} if unit else {}
try:
result = func(address, value, **kwargs)
result = self._call_type[use_call][ENTRY_FUNC](address, value, **kwargs)
except ModbusException as exception_error:
self._log_error(exception_error)
result = exception_error
if not hasattr(result, check_attr):
if not hasattr(result, self._call_type[use_call][ENTRY_ATTR]):
self._log_error(result)
return None
self._in_error = False
return result
async def async_pymodbus_call(self, unit, address, value, check_attr, func):
async def async_pymodbus_call(self, unit, address, value, use_call):
"""Convert async to sync pymodbus call."""
if self._config_delay:
return None
async with self._lock:
return await self.hass.async_add_executor_job(
self._pymodbus_call, unit, address, value, check_attr, func
self._pymodbus_call, unit, address, value, use_call
)
async def async_read_coils(self, unit, address, count):
"""Read coils."""
return await self.async_pymodbus_call(
unit, address, count, "bits", self._client.read_coils
)
return await self.async_pymodbus_call(unit, address, count, CALL_TYPE_COIL)
async def async_read_discrete_inputs(self, unit, address, count):
"""Read discrete inputs."""
return await self.async_pymodbus_call(
unit, address, count, "bits", self._client.read_discrete_inputs
)
return await self.async_pymodbus_call(unit, address, count, CALL_TYPE_DISCRETE)
async def async_read_input_registers(self, unit, address, count):
"""Read input registers."""
return await self.async_pymodbus_call(
unit, address, count, "registers", self._client.read_input_registers
unit, address, count, CALL_TYPE_REGISTER_INPUT
)
async def async_read_holding_registers(self, unit, address, count):
"""Read holding registers."""
return await self.async_pymodbus_call(
unit, address, count, "registers", self._client.read_holding_registers
unit, address, count, CALL_TYPE_REGISTER_HOLDING
)
async def async_write_coil(self, unit, address, value) -> bool:
"""Write coil."""
return await self.async_pymodbus_call(
unit, address, value, "value", self._client.write_coil
unit, address, value, CALL_TYPE_WRITE_COIL
)
async def async_write_coils(self, unit, address, values) -> bool:
"""Write coil."""
return await self.async_pymodbus_call(
unit, address, values, "count", self._client.write_coils
unit, address, values, CALL_TYPE_WRITE_COILS
)
async def async_write_register(self, unit, address, value) -> bool:
"""Write register."""
return await self.async_pymodbus_call(
unit, address, value, "value", self._client.write_register
unit, address, value, CALL_TYPE_WRITE_REGISTER
)
async def async_write_registers(self, unit, address, values) -> bool:
"""Write registers."""
return await self.async_pymodbus_call(
unit, address, values, "count", self._client.write_registers
unit, address, values, CALL_TYPE_WRITE_REGISTERS
)