Add write_registers support for Fan Mode in modbus (#108053)

This commit is contained in:
CR-Tech 2024-01-24 20:48:55 +01:00 committed by GitHub
parent 0d633f33fa
commit df9faeae6f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 88 additions and 14 deletions

View file

@ -137,6 +137,7 @@ from .validators import (
duplicate_fan_mode_validator,
nan_validator,
number_validator,
register_int_list_validator,
struct_validator,
)
@ -279,7 +280,7 @@ CLIMATE_SCHEMA = vol.All(
vol.Optional(CONF_FAN_MODE_REGISTER): vol.Maybe(
vol.All(
{
CONF_ADDRESS: cv.positive_int,
vol.Required(CONF_ADDRESS): register_int_list_validator,
CONF_FAN_MODE_VALUES: {
vol.Optional(CONF_FAN_MODE_ON): cv.positive_int,
vol.Optional(CONF_FAN_MODE_OFF): cv.positive_int,

View file

@ -170,7 +170,6 @@ class ModbusThermostat(BaseStructPlatform, RestoreEntity, ClimateEntity):
self._fan_mode_mapping_to_modbus: dict[str, int] = {}
self._fan_mode_mapping_from_modbus: dict[int, str] = {}
mode_value_config = mode_config[CONF_FAN_MODE_VALUES]
for fan_mode_kw, fan_mode in (
(CONF_FAN_MODE_ON, FAN_ON),
(CONF_FAN_MODE_OFF, FAN_OFF),
@ -253,16 +252,23 @@ class ModbusThermostat(BaseStructPlatform, RestoreEntity, ClimateEntity):
async def async_set_fan_mode(self, fan_mode: str) -> None:
"""Set new target fan mode."""
if self._fan_mode_register is not None:
# Write a value to the mode register for the desired mode.
value = self._fan_mode_mapping_to_modbus[fan_mode]
await self._hub.async_pb_call(
self._slave,
self._fan_mode_register,
value,
CALL_TYPE_WRITE_REGISTER,
)
if isinstance(self._fan_mode_register, list):
await self._hub.async_pb_call(
self._slave,
self._fan_mode_register[0],
[value],
CALL_TYPE_WRITE_REGISTERS,
)
else:
await self._hub.async_pb_call(
self._slave,
self._fan_mode_register,
value,
CALL_TYPE_WRITE_REGISTER,
)
await self.async_update()
@ -344,7 +350,11 @@ class ModbusThermostat(BaseStructPlatform, RestoreEntity, ClimateEntity):
# Read the Fan mode register if defined
if self._fan_mode_register is not None:
fan_mode = await self._async_read_register(
CALL_TYPE_REGISTER_HOLDING, self._fan_mode_register, raw=True
CALL_TYPE_REGISTER_HOLDING,
self._fan_mode_register
if isinstance(self._fan_mode_register, int)
else self._fan_mode_register[0],
raw=True,
)
# Translate the value received

View file

@ -293,7 +293,11 @@ def duplicate_entity_validator(config: dict) -> dict:
a += "_" + str(inx)
entry_addrs.add(a)
if CONF_FAN_MODE_REGISTER in entry:
a = str(entry[CONF_FAN_MODE_REGISTER][CONF_ADDRESS])
a = str(
entry[CONF_FAN_MODE_REGISTER][CONF_ADDRESS]
if isinstance(entry[CONF_FAN_MODE_REGISTER][CONF_ADDRESS], int)
else entry[CONF_FAN_MODE_REGISTER][CONF_ADDRESS][0]
)
a += "_" + str(inx)
entry_addrs.add(a)
@ -351,6 +355,20 @@ def duplicate_modbus_validator(config: dict) -> dict:
return config
def register_int_list_validator(value: Any) -> Any:
"""Check if a register (CONF_ADRESS) is an int or a list having only 1 register."""
if isinstance(value, int) and value >= 0:
return value
if isinstance(value, list):
if (len(value) == 1) and isinstance(value[0], int) and value[0] >= 0:
return value
raise vol.Invalid(
f"Invalid {CONF_ADDRESS} register for fan mode. Required type: positive integer, allowed 1 or list of 1 register."
)
def check_config(config: dict) -> dict:
"""Do final config check."""
config2 = duplicate_modbus_validator(config)

View file

@ -30,6 +30,7 @@ from homeassistant.components.modbus.const import (
CONF_FAN_MODE_OFF,
CONF_FAN_MODE_ON,
CONF_FAN_MODE_REGISTER,
CONF_FAN_MODE_TOP,
CONF_FAN_MODE_VALUES,
CONF_HVAC_MODE_AUTO,
CONF_HVAC_MODE_COOL,
@ -491,7 +492,7 @@ async def test_service_climate_update(
CONF_SCAN_INTERVAL: 0,
CONF_DATA_TYPE: DataType.INT32,
CONF_FAN_MODE_REGISTER: {
CONF_ADDRESS: 118,
CONF_ADDRESS: [118],
CONF_FAN_MODE_VALUES: {
CONF_FAN_MODE_LOW: 0,
CONF_FAN_MODE_MEDIUM: 1,
@ -505,6 +506,31 @@ async def test_service_climate_update(
FAN_HIGH,
[0x02],
),
(
{
CONF_CLIMATES: [
{
CONF_NAME: TEST_ENTITY_NAME,
CONF_TARGET_TEMP: 117,
CONF_ADDRESS: 117,
CONF_SLAVE: 10,
CONF_SCAN_INTERVAL: 0,
CONF_DATA_TYPE: DataType.INT32,
CONF_FAN_MODE_REGISTER: {
CONF_ADDRESS: [118],
CONF_FAN_MODE_VALUES: {
CONF_FAN_MODE_LOW: 0,
CONF_FAN_MODE_MEDIUM: 1,
CONF_FAN_MODE_HIGH: 2,
CONF_FAN_MODE_TOP: 3,
},
},
},
]
},
FAN_TOP,
[0x03],
),
],
)
async def test_service_climate_fan_update(
@ -740,7 +766,7 @@ async def test_service_set_hvac_mode(
CONF_ADDRESS: 117,
CONF_SLAVE: 10,
CONF_FAN_MODE_REGISTER: {
CONF_ADDRESS: 118,
CONF_ADDRESS: [118],
CONF_FAN_MODE_VALUES: {
CONF_FAN_MODE_ON: 1,
CONF_FAN_MODE_OFF: 2,

View file

@ -84,6 +84,7 @@ from homeassistant.components.modbus.validators import (
duplicate_modbus_validator,
nan_validator,
number_validator,
register_int_list_validator,
struct_validator,
)
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
@ -138,6 +139,24 @@ async def mock_modbus_with_pymodbus_fixture(hass, caplog, do_config, mock_pymodb
return mock_pymodbus
async def test_register_int_list_validator() -> None:
"""Test conf address register validator."""
for value, vtype in (
(15, int),
([15], list),
):
assert isinstance(register_int_list_validator(value), vtype)
with pytest.raises(vol.Invalid):
register_int_list_validator([15, 16])
with pytest.raises(vol.Invalid):
register_int_list_validator(-15)
with pytest.raises(vol.Invalid):
register_int_list_validator(["aq"])
async def test_number_validator() -> None:
"""Test number validator."""
@ -584,7 +603,7 @@ async def test_duplicate_entity_validator(do_config) -> None:
CONF_SLAVE: 0,
CONF_TARGET_TEMP: 117,
CONF_FAN_MODE_REGISTER: {
CONF_ADDRESS: 121,
CONF_ADDRESS: [121],
CONF_FAN_MODE_VALUES: {
CONF_FAN_MODE_ON: 0,
CONF_FAN_MODE_HIGH: 1,