Allow discovery configuration of modbus platforms (#46591)

* Change modbus configuration to new style.

The old (frozen) configuration is still supported, but when detected a big
warning is issued that it will soon be removed. This allows users to change
their configuration at their pace.

Clean configuration SCHEMAs and move common modbus parts
to MODBUS_SCHEMA (renamed from BASE_SCHEMA).

Add BASE_COMPONENT_SCHEMA to ensure common configuration of components.
All component define e.g. NAME, move these to a common schema.
change components (binary_sensor, sensor, switch) to new config

Add test set for modbus itself (old config and discovery_info).
Add test of devices discovery_info configuration

* Update discovery_info configuration for binary_sensor.

* Update discovery_info configuration for sensor.

* Update discovery_info configuration for switch.

* Review comments.

* update due to change in core

* flake8 problem.

* Correct log message.

* add should_poll property.

* Fix polling for Modbus binary sensor

* Fix polling for Modbus sensor

* Fix polling for Modbus switch

* Fix switch.

* Fix pytest errors.

* Update homeassistant/components/modbus/binary_sensor.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/modbus/binary_sensor.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/modbus/modbus.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/modbus/sensor.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/modbus/sensor.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/modbus/sensor.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/modbus/switch.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/modbus/switch.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/modbus/switch.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* ToogleEntity -> SwitchEntity and add abastract

* Update homeassistant/components/modbus/switch.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update tests/components/modbus/test_init.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* removed if/else in test.

* Remove other if.

Co-authored-by: Vladimir Zahradnik <vladimir@zahradnik.io>
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
jan iversen 2021-03-27 22:48:06 +01:00 committed by GitHub
parent 23d7330a2f
commit ffdfc521b9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 605 additions and 139 deletions

View file

@ -1,11 +1,25 @@
"""Support for Modbus."""
from typing import Any, Union
import voluptuous as vol
from homeassistant.components.binary_sensor import (
DEVICE_CLASSES_SCHEMA as BINARY_SENSOR_DEVICE_CLASSES_SCHEMA,
)
from homeassistant.components.cover import (
DEVICE_CLASSES_SCHEMA as COVER_DEVICE_CLASSES_SCHEMA,
)
from homeassistant.components.sensor import (
DEVICE_CLASSES_SCHEMA as SENSOR_DEVICE_CLASSES_SCHEMA,
)
from homeassistant.components.switch import (
DEVICE_CLASSES_SCHEMA as SWITCH_DEVICE_CLASSES_SCHEMA,
)
from homeassistant.const import (
ATTR_STATE,
CONF_ADDRESS,
CONF_COMMAND_OFF,
CONF_COMMAND_ON,
CONF_COVERS,
CONF_DELAY,
CONF_DEVICE_CLASS,
@ -19,6 +33,7 @@ from homeassistant.const import (
CONF_STRUCTURE,
CONF_TIMEOUT,
CONF_TYPE,
CONF_UNIT_OF_MEASUREMENT,
)
import homeassistant.helpers.config_validation as cv
@ -28,11 +43,14 @@ from .const import (
ATTR_UNIT,
ATTR_VALUE,
CALL_TYPE_COIL,
CALL_TYPE_DISCRETE,
CALL_TYPE_REGISTER_HOLDING,
CALL_TYPE_REGISTER_INPUT,
CONF_BAUDRATE,
CONF_BINARY_SENSORS,
CONF_BYTESIZE,
CONF_CLIMATES,
CONF_COUNT,
CONF_CURRENT_TEMP,
CONF_CURRENT_TEMP_REGISTER_TYPE,
CONF_DATA_COUNT,
@ -43,24 +61,30 @@ from .const import (
CONF_PARITY,
CONF_PRECISION,
CONF_REGISTER,
CONF_REVERSE_ORDER,
CONF_SCALE,
CONF_SENSORS,
CONF_STATE_CLOSED,
CONF_STATE_CLOSING,
CONF_STATE_OFF,
CONF_STATE_ON,
CONF_STATE_OPEN,
CONF_STATE_OPENING,
CONF_STATUS_REGISTER,
CONF_STATUS_REGISTER_TYPE,
CONF_STEP,
CONF_STOPBITS,
CONF_SWITCHES,
CONF_TARGET_TEMP,
CONF_UNIT,
CONF_VERIFY_REGISTER,
DATA_TYPE_CUSTOM,
DATA_TYPE_FLOAT,
DATA_TYPE_INT,
DATA_TYPE_STRING,
DATA_TYPE_UINT,
DEFAULT_HUB,
DEFAULT_SCAN_INTERVAL,
DEFAULT_SLAVE,
DEFAULT_STRUCTURE_PREFIX,
DEFAULT_TEMP_UNIT,
MODBUS_DOMAIN as DOMAIN,
@ -69,11 +93,40 @@ from .modbus import modbus_setup
BASE_SCHEMA = vol.Schema({vol.Optional(CONF_NAME, default=DEFAULT_HUB): cv.string})
CLIMATE_SCHEMA = vol.Schema(
def number(value: Any) -> Union[int, float]:
"""Coerce a value to number without losing precision."""
if isinstance(value, int):
return value
if isinstance(value, float):
return value
try:
value = int(value)
return value
except (TypeError, ValueError):
pass
try:
value = float(value)
return value
except (TypeError, ValueError) as err:
raise vol.Invalid(f"invalid number {value}") from err
BASE_COMPONENT_SCHEMA = vol.Schema(
{
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_SLAVE): cv.positive_int,
vol.Optional(
CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL
): cv.positive_int,
}
)
CLIMATE_SCHEMA = BASE_COMPONENT_SCHEMA.extend(
{
vol.Required(CONF_CURRENT_TEMP): cv.positive_int,
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_SLAVE): cv.positive_int,
vol.Required(CONF_TARGET_TEMP): cv.positive_int,
vol.Optional(CONF_DATA_COUNT, default=2): cv.positive_int,
vol.Optional(
@ -84,9 +137,6 @@ CLIMATE_SCHEMA = vol.Schema(
),
vol.Optional(CONF_PRECISION, default=1): cv.positive_int,
vol.Optional(CONF_SCALE, default=1): vol.Coerce(float),
vol.Optional(CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL): vol.All(
cv.time_period, lambda value: value.total_seconds()
),
vol.Optional(CONF_OFFSET, default=0): vol.Coerce(float),
vol.Optional(CONF_MAX_TEMP, default=35): cv.positive_int,
vol.Optional(CONF_MIN_TEMP, default=5): cv.positive_int,
@ -98,14 +148,9 @@ CLIMATE_SCHEMA = vol.Schema(
COVERS_SCHEMA = vol.All(
cv.has_at_least_one_key(CALL_TYPE_COIL, CONF_REGISTER),
vol.Schema(
BASE_COMPONENT_SCHEMA.extend(
{
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL): vol.All(
cv.time_period, lambda value: value.total_seconds()
),
vol.Optional(CONF_DEVICE_CLASS): COVER_DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_SLAVE, default=DEFAULT_SLAVE): cv.positive_int,
vol.Optional(CONF_STATE_CLOSED, default=0): cv.positive_int,
vol.Optional(CONF_STATE_CLOSING, default=3): cv.positive_int,
vol.Optional(CONF_STATE_OPEN, default=1): cv.positive_int,
@ -121,33 +166,104 @@ COVERS_SCHEMA = vol.All(
),
)
SERIAL_SCHEMA = BASE_SCHEMA.extend(
SWITCH_SCHEMA = BASE_COMPONENT_SCHEMA.extend(
{
vol.Required(CONF_ADDRESS): cv.positive_int,
vol.Optional(CONF_DEVICE_CLASS): SWITCH_DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_INPUT_TYPE, default=CALL_TYPE_REGISTER_HOLDING): vol.In(
[CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_REGISTER_INPUT, CALL_TYPE_COIL]
),
vol.Optional(CONF_COMMAND_OFF, default=0x00): cv.positive_int,
vol.Optional(CONF_COMMAND_ON, default=0x01): cv.positive_int,
vol.Optional(CONF_STATE_OFF): cv.positive_int,
vol.Optional(CONF_STATE_ON): cv.positive_int,
vol.Optional(CONF_VERIFY_REGISTER): cv.positive_int,
}
)
SENSOR_SCHEMA = BASE_COMPONENT_SCHEMA.extend(
{
vol.Required(CONF_ADDRESS): cv.positive_int,
vol.Optional(CONF_COUNT, default=1): cv.positive_int,
vol.Optional(CONF_DATA_TYPE, default=DATA_TYPE_INT): vol.In(
[
DATA_TYPE_INT,
DATA_TYPE_UINT,
DATA_TYPE_FLOAT,
DATA_TYPE_STRING,
DATA_TYPE_CUSTOM,
]
),
vol.Optional(CONF_DEVICE_CLASS): SENSOR_DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_OFFSET, default=0): number,
vol.Optional(CONF_PRECISION, default=0): cv.positive_int,
vol.Optional(CONF_INPUT_TYPE, default=CALL_TYPE_REGISTER_HOLDING): vol.In(
[CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_REGISTER_INPUT]
),
vol.Optional(CONF_REVERSE_ORDER, default=False): cv.boolean,
vol.Optional(CONF_SCALE, default=1): number,
vol.Optional(CONF_STRUCTURE): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
}
)
BINARY_SENSOR_SCHEMA = BASE_COMPONENT_SCHEMA.extend(
{
vol.Required(CONF_ADDRESS): cv.positive_int,
vol.Optional(CONF_DEVICE_CLASS): BINARY_SENSOR_DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_INPUT_TYPE, default=CALL_TYPE_COIL): vol.In(
[CALL_TYPE_COIL, CALL_TYPE_DISCRETE]
),
}
)
MODBUS_SCHEMA = vol.Schema(
{
vol.Optional(CONF_NAME, default=DEFAULT_HUB): cv.string,
vol.Optional(CONF_TIMEOUT, default=3): cv.socket_timeout,
vol.Optional(CONF_DELAY, default=0): cv.positive_int,
vol.Optional(CONF_BINARY_SENSORS): vol.All(
cv.ensure_list, [BINARY_SENSOR_SCHEMA]
),
vol.Optional(CONF_CLIMATES): vol.All(cv.ensure_list, [CLIMATE_SCHEMA]),
vol.Optional(CONF_COVERS): vol.All(cv.ensure_list, [COVERS_SCHEMA]),
vol.Optional(CONF_SENSORS): vol.All(cv.ensure_list, [SENSOR_SCHEMA]),
vol.Optional(CONF_SWITCHES): vol.All(cv.ensure_list, [SWITCH_SCHEMA]),
}
)
SERIAL_SCHEMA = MODBUS_SCHEMA.extend(
{
vol.Required(CONF_TYPE): "serial",
vol.Required(CONF_BAUDRATE): cv.positive_int,
vol.Required(CONF_BYTESIZE): vol.Any(5, 6, 7, 8),
vol.Required(CONF_METHOD): vol.Any("rtu", "ascii"),
vol.Required(CONF_PORT): cv.string,
vol.Required(CONF_PARITY): vol.Any("E", "O", "N"),
vol.Required(CONF_STOPBITS): vol.Any(1, 2),
vol.Required(CONF_TYPE): "serial",
vol.Optional(CONF_TIMEOUT, default=3): cv.socket_timeout,
vol.Optional(CONF_CLIMATES): vol.All(cv.ensure_list, [CLIMATE_SCHEMA]),
vol.Optional(CONF_COVERS): vol.All(cv.ensure_list, [COVERS_SCHEMA]),
}
)
ETHERNET_SCHEMA = BASE_SCHEMA.extend(
ETHERNET_SCHEMA = MODBUS_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT): cv.port,
vol.Required(CONF_TYPE): vol.Any("tcp", "udp", "rtuovertcp"),
vol.Optional(CONF_TIMEOUT, default=3): cv.socket_timeout,
vol.Optional(CONF_DELAY, default=0): cv.positive_int,
vol.Optional(CONF_CLIMATES): vol.All(cv.ensure_list, [CLIMATE_SCHEMA]),
vol.Optional(CONF_COVERS): vol.All(cv.ensure_list, [COVERS_SCHEMA]),
}
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.All(
cv.ensure_list,
[
vol.Any(SERIAL_SCHEMA, ETHERNET_SCHEMA),
],
),
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_WRITE_REGISTER_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_HUB, default=DEFAULT_HUB): cv.string,
@ -168,18 +284,6 @@ SERVICE_WRITE_COIL_SCHEMA = vol.Schema(
}
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.All(
cv.ensure_list,
[
vol.Any(SERIAL_SCHEMA, ETHERNET_SCHEMA),
],
),
},
extra=vol.ALLOW_EXTRA,
)
def setup(hass, config):
"""Set up Modbus component."""

View file

@ -1,6 +1,9 @@
"""Support for Modbus Coil and Discrete Input sensors."""
from __future__ import annotations
from datetime import timedelta
import logging
from pymodbus.exceptions import ConnectionException, ModbusException
from pymodbus.pdu import ExceptionResponse
import voluptuous as vol
@ -10,19 +13,37 @@ from homeassistant.components.binary_sensor import (
PLATFORM_SCHEMA,
BinarySensorEntity,
)
from homeassistant.const import CONF_ADDRESS, CONF_DEVICE_CLASS, CONF_NAME, CONF_SLAVE
from homeassistant.const import (
CONF_ADDRESS,
CONF_DEVICE_CLASS,
CONF_NAME,
CONF_SCAN_INTERVAL,
CONF_SLAVE,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.typing import (
ConfigType,
DiscoveryInfoType,
HomeAssistantType,
)
from .const import (
CALL_TYPE_COIL,
CALL_TYPE_DISCRETE,
CONF_BINARY_SENSORS,
CONF_COILS,
CONF_HUB,
CONF_INPUT_TYPE,
CONF_INPUTS,
DEFAULT_HUB,
DEFAULT_SCAN_INTERVAL,
MODBUS_DOMAIN,
)
from .modbus import ModbusHub
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = vol.All(
cv.deprecated(CONF_COILS, CONF_INPUTS),
@ -50,11 +71,33 @@ PLATFORM_SCHEMA = vol.All(
)
def setup_platform(hass, config, add_entities, discovery_info=None):
async def async_setup_platform(
hass: HomeAssistantType,
config: ConfigType,
async_add_entities,
discovery_info: DiscoveryInfoType | None = None,
):
"""Set up the Modbus binary sensors."""
sensors = []
for entry in config[CONF_INPUTS]:
hub = hass.data[MODBUS_DOMAIN][entry[CONF_HUB]]
#  check for old config:
if discovery_info is None:
_LOGGER.warning(
"Binary_sensor configuration is deprecated, will be removed in a future release"
)
discovery_info = {
CONF_NAME: "no name",
CONF_BINARY_SENSORS: config[CONF_INPUTS],
}
config = None
for entry in discovery_info[CONF_BINARY_SENSORS]:
if CONF_HUB in entry:
# from old config!
discovery_info[CONF_NAME] = entry[CONF_HUB]
if CONF_SCAN_INTERVAL not in entry:
entry[CONF_SCAN_INTERVAL] = DEFAULT_SCAN_INTERVAL
hub: ModbusHub = hass.data[MODBUS_DOMAIN][discovery_info[CONF_NAME]]
sensors.append(
ModbusBinarySensor(
hub,
@ -63,16 +106,19 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
entry[CONF_ADDRESS],
entry.get(CONF_DEVICE_CLASS),
entry[CONF_INPUT_TYPE],
entry[CONF_SCAN_INTERVAL],
)
)
add_entities(sensors)
async_add_entities(sensors)
class ModbusBinarySensor(BinarySensorEntity):
"""Modbus binary sensor."""
def __init__(self, hub, name, slave, address, device_class, input_type):
def __init__(
self, hub, name, slave, address, device_class, input_type, scan_interval
):
"""Initialize the Modbus binary sensor."""
self._hub = hub
self._name = name
@ -82,6 +128,13 @@ class ModbusBinarySensor(BinarySensorEntity):
self._input_type = input_type
self._value = None
self._available = True
self._scan_interval = timedelta(seconds=scan_interval)
async def async_added_to_hass(self):
"""Handle entity which will be added."""
async_track_time_interval(
self.hass, lambda arg: self._update(), self._scan_interval
)
@property
def name(self):
@ -98,12 +151,22 @@ class ModbusBinarySensor(BinarySensorEntity):
"""Return the device class of the sensor."""
return self._device_class
@property
def should_poll(self):
"""Return True if entity has to be polled for state.
False if entity pushes its state to HA.
"""
# Handle polling directly in this entity
return False
@property
def available(self) -> bool:
"""Return True if entity is available."""
return self._available
def update(self):
def _update(self):
"""Update the state of the sensor."""
try:
if self._input_type == CALL_TYPE_COIL:
@ -120,3 +183,4 @@ class ModbusBinarySensor(BinarySensorEntity):
self._value = result.bits[0] & 1
self._available = True
self.schedule_update_ha_state()

View file

@ -115,7 +115,7 @@ class ModbusThermostat(ClimateEntity):
"""Initialize the modbus thermostat."""
self._hub: ModbusHub = hub
self._name = config[CONF_NAME]
self._slave = config[CONF_SLAVE]
self._slave = config.get(CONF_SLAVE)
self._target_temperature_register = config[CONF_TARGET_TEMP]
self._current_temperature_register = config[CONF_CURRENT_TEMP]
self._current_temperature_register_type = config[

View file

@ -50,6 +50,8 @@ DEFAULT_SCAN_INTERVAL = 15 # seconds
# binary_sensor.py
CONF_INPUTS = "inputs"
CONF_INPUT_TYPE = "input_type"
CONF_BINARY_SENSORS = "binary_sensors"
CONF_BINARY_SENSOR = "binary_sensor"
# sensor.py
# CONF_DATA_TYPE = "data_type"
@ -58,12 +60,16 @@ DEFAULT_STRUCT_FORMAT = {
DATA_TYPE_UINT: {1: "H", 2: "I", 4: "Q"},
DATA_TYPE_FLOAT: {1: "e", 2: "f", 4: "d"},
}
CONF_SENSOR = "sensor"
CONF_SENSORS = "sensors"
# switch.py
CONF_STATE_OFF = "state_off"
CONF_STATE_ON = "state_on"
CONF_VERIFY_REGISTER = "verify_register"
CONF_VERIFY_STATE = "verify_state"
CONF_SWITCH = "switch"
CONF_SWITCHES = "switches"
# climate.py
CONF_CLIMATES = "climates"

View file

@ -71,7 +71,7 @@ class ModbusCover(CoverEntity, RestoreEntity):
self._device_class = config.get(CONF_DEVICE_CLASS)
self._name = config[CONF_NAME]
self._register = config.get(CONF_REGISTER)
self._slave = config[CONF_SLAVE]
self._slave = config.get(CONF_SLAVE)
self._state_closed = config[CONF_STATE_CLOSED]
self._state_closing = config[CONF_STATE_CLOSING]
self._state_open = config[CONF_STATE_OPEN]

View file

@ -25,12 +25,18 @@ from .const import (
ATTR_UNIT,
ATTR_VALUE,
CONF_BAUDRATE,
CONF_BINARY_SENSOR,
CONF_BINARY_SENSORS,
CONF_BYTESIZE,
CONF_CLIMATE,
CONF_CLIMATES,
CONF_COVER,
CONF_PARITY,
CONF_SENSOR,
CONF_SENSORS,
CONF_STOPBITS,
CONF_SWITCH,
CONF_SWITCHES,
MODBUS_DOMAIN as DOMAIN,
SERVICE_WRITE_COIL,
SERVICE_WRITE_REGISTER,
@ -56,6 +62,9 @@ def modbus_setup(
for component, conf_key in (
(CONF_CLIMATE, CONF_CLIMATES),
(CONF_COVER, CONF_COVERS),
(CONF_BINARY_SENSOR, CONF_BINARY_SENSORS),
(CONF_SENSOR, CONF_SENSORS),
(CONF_SWITCH, CONF_SWITCHES),
):
if conf_key in conf_hub:
load_platform(hass, component, DOMAIN, conf_hub, config)

View file

@ -1,6 +1,7 @@
"""Support for Modbus Register sensors."""
from __future__ import annotations
from datetime import timedelta
import logging
import struct
from typing import Any
@ -15,15 +16,23 @@ from homeassistant.components.sensor import (
SensorEntity,
)
from homeassistant.const import (
CONF_ADDRESS,
CONF_DEVICE_CLASS,
CONF_NAME,
CONF_OFFSET,
CONF_SCAN_INTERVAL,
CONF_SLAVE,
CONF_STRUCTURE,
CONF_UNIT_OF_MEASUREMENT,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import (
ConfigType,
DiscoveryInfoType,
HomeAssistantType,
)
from .const import (
CALL_TYPE_REGISTER_HOLDING,
@ -31,21 +40,25 @@ from .const import (
CONF_COUNT,
CONF_DATA_TYPE,
CONF_HUB,
CONF_INPUT_TYPE,
CONF_PRECISION,
CONF_REGISTER,
CONF_REGISTER_TYPE,
CONF_REGISTERS,
CONF_REVERSE_ORDER,
CONF_SCALE,
CONF_SENSORS,
DATA_TYPE_CUSTOM,
DATA_TYPE_FLOAT,
DATA_TYPE_INT,
DATA_TYPE_STRING,
DATA_TYPE_UINT,
DEFAULT_HUB,
DEFAULT_SCAN_INTERVAL,
DEFAULT_STRUCT_FORMAT,
MODBUS_DOMAIN,
)
from .modbus import ModbusHub
_LOGGER = logging.getLogger(__name__)
@ -103,63 +116,89 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
)
def setup_platform(hass, config, add_entities, discovery_info=None):
async def async_setup_platform(
hass: HomeAssistantType,
config: ConfigType,
async_add_entities,
discovery_info: DiscoveryInfoType | None = None,
):
"""Set up the Modbus sensors."""
sensors = []
for register in config[CONF_REGISTERS]:
if register[CONF_DATA_TYPE] == DATA_TYPE_STRING:
structure = str(register[CONF_COUNT] * 2) + "s"
elif register[CONF_DATA_TYPE] != DATA_TYPE_CUSTOM:
#  check for old config:
if discovery_info is None:
_LOGGER.warning(
"Sensor configuration is deprecated, will be removed in a future release"
)
discovery_info = {
CONF_NAME: "no name",
CONF_SENSORS: config[CONF_REGISTERS],
}
for entry in discovery_info[CONF_SENSORS]:
entry[CONF_ADDRESS] = entry[CONF_REGISTER]
entry[CONF_INPUT_TYPE] = entry[CONF_REGISTER_TYPE]
del entry[CONF_REGISTER]
del entry[CONF_REGISTER_TYPE]
config = None
for entry in discovery_info[CONF_SENSORS]:
if entry[CONF_DATA_TYPE] == DATA_TYPE_STRING:
structure = str(entry[CONF_COUNT] * 2) + "s"
elif entry[CONF_DATA_TYPE] != DATA_TYPE_CUSTOM:
try:
structure = f">{DEFAULT_STRUCT_FORMAT[register[CONF_DATA_TYPE]][register[CONF_COUNT]]}"
structure = f">{DEFAULT_STRUCT_FORMAT[entry[CONF_DATA_TYPE]][entry[CONF_COUNT]]}"
except KeyError:
_LOGGER.error(
"Unable to detect data type for %s sensor, try a custom type",
register[CONF_NAME],
entry[CONF_NAME],
)
continue
else:
structure = register.get(CONF_STRUCTURE)
structure = entry.get(CONF_STRUCTURE)
try:
size = struct.calcsize(structure)
except struct.error as err:
_LOGGER.error("Error in sensor %s structure: %s", register[CONF_NAME], err)
_LOGGER.error("Error in sensor %s structure: %s", entry[CONF_NAME], err)
continue
if register[CONF_COUNT] * 2 != size:
if entry[CONF_COUNT] * 2 != size:
_LOGGER.error(
"Structure size (%d bytes) mismatch registers count (%d words)",
size,
register[CONF_COUNT],
entry[CONF_COUNT],
)
continue
hub_name = register[CONF_HUB]
hub = hass.data[MODBUS_DOMAIN][hub_name]
if CONF_HUB in entry:
# from old config!
discovery_info[CONF_NAME] = entry[CONF_HUB]
if CONF_SCAN_INTERVAL not in entry:
entry[CONF_SCAN_INTERVAL] = DEFAULT_SCAN_INTERVAL
hub: ModbusHub = hass.data[MODBUS_DOMAIN][discovery_info[CONF_NAME]]
sensors.append(
ModbusRegisterSensor(
hub,
register[CONF_NAME],
register.get(CONF_SLAVE),
register[CONF_REGISTER],
register[CONF_REGISTER_TYPE],
register.get(CONF_UNIT_OF_MEASUREMENT),
register[CONF_COUNT],
register[CONF_REVERSE_ORDER],
register[CONF_SCALE],
register[CONF_OFFSET],
entry[CONF_NAME],
entry.get(CONF_SLAVE),
entry[CONF_ADDRESS],
entry[CONF_INPUT_TYPE],
entry.get(CONF_UNIT_OF_MEASUREMENT),
entry[CONF_COUNT],
entry[CONF_REVERSE_ORDER],
entry[CONF_SCALE],
entry[CONF_OFFSET],
structure,
register[CONF_PRECISION],
register[CONF_DATA_TYPE],
register.get(CONF_DEVICE_CLASS),
entry[CONF_PRECISION],
entry[CONF_DATA_TYPE],
entry.get(CONF_DEVICE_CLASS),
entry[CONF_SCAN_INTERVAL],
)
)
if not sensors:
return False
add_entities(sensors)
return
async_add_entities(sensors)
class ModbusRegisterSensor(RestoreEntity, SensorEntity):
@ -181,6 +220,7 @@ class ModbusRegisterSensor(RestoreEntity, SensorEntity):
precision,
data_type,
device_class,
scan_interval,
):
"""Initialize the modbus register sensor."""
self._hub = hub
@ -199,13 +239,17 @@ class ModbusRegisterSensor(RestoreEntity, SensorEntity):
self._device_class = device_class
self._value = None
self._available = True
self._scan_interval = timedelta(seconds=scan_interval)
async def async_added_to_hass(self):
"""Handle entity which will be added."""
state = await self.async_get_last_state()
if not state:
return
self._value = state.state
if state:
self._value = state.state
async_track_time_interval(
self.hass, lambda arg: self._update(), self._scan_interval
)
@property
def state(self):
@ -217,6 +261,16 @@ class ModbusRegisterSensor(RestoreEntity, SensorEntity):
"""Return the name of the sensor."""
return self._name
@property
def should_poll(self):
"""Return True if entity has to be polled for state.
False if entity pushes its state to HA.
"""
# Handle polling directly in this entity
return False
@property
def unit_of_measurement(self):
"""Return the unit of measurement."""
@ -232,7 +286,7 @@ class ModbusRegisterSensor(RestoreEntity, SensorEntity):
"""Return True if entity is available."""
return self._available
def update(self):
def _update(self):
"""Update the state of the sensor."""
try:
if self._register_type == CALL_TYPE_REGISTER_INPUT:
@ -285,3 +339,4 @@ class ModbusRegisterSensor(RestoreEntity, SensorEntity):
self._value = str(val)
self._available = True
self.schedule_update_ha_state()

View file

@ -1,7 +1,8 @@
"""Support for Modbus switches."""
from __future__ import annotations
from abc import ABC
from abc import ABC, abstractmethod
from datetime import timedelta
import logging
from typing import Any
@ -11,14 +12,17 @@ import voluptuous as vol
from homeassistant.components.switch import PLATFORM_SCHEMA, SwitchEntity
from homeassistant.const import (
CONF_ADDRESS,
CONF_COMMAND_OFF,
CONF_COMMAND_ON,
CONF_NAME,
CONF_SCAN_INTERVAL,
CONF_SLAVE,
CONF_SWITCHES,
STATE_ON,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity import ToggleEntity
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
@ -28,6 +32,7 @@ from .const import (
CALL_TYPE_REGISTER_INPUT,
CONF_COILS,
CONF_HUB,
CONF_INPUT_TYPE,
CONF_REGISTER,
CONF_REGISTER_TYPE,
CONF_REGISTERS,
@ -36,6 +41,7 @@ from .const import (
CONF_VERIFY_REGISTER,
CONF_VERIFY_STATE,
DEFAULT_HUB,
DEFAULT_SCAN_INTERVAL,
MODBUS_DOMAIN,
)
from .modbus import ModbusHub
@ -86,19 +92,48 @@ async def async_setup_platform(
):
"""Read configuration and create Modbus switches."""
switches = []
if CONF_COILS in config:
for coil in config[CONF_COILS]:
hub: ModbusHub = hass.data[MODBUS_DOMAIN][coil[CONF_HUB]]
switches.append(ModbusCoilSwitch(hub, coil))
if CONF_REGISTERS in config:
for register in config[CONF_REGISTERS]:
hub: ModbusHub = hass.data[MODBUS_DOMAIN][register[CONF_HUB]]
switches.append(ModbusRegisterSwitch(hub, register))
#  check for old config:
if discovery_info is None:
_LOGGER.warning(
"Switch configuration is deprecated, will be removed in a future release"
)
discovery_info = {
CONF_NAME: "no name",
CONF_SWITCHES: [],
}
if CONF_COILS in config:
discovery_info[CONF_SWITCHES].extend(config[CONF_COILS])
if CONF_REGISTERS in config:
discovery_info[CONF_SWITCHES].extend(config[CONF_REGISTERS])
for entry in discovery_info[CONF_SWITCHES]:
if CALL_TYPE_COIL in entry:
entry[CONF_ADDRESS] = entry[CALL_TYPE_COIL]
entry[CONF_INPUT_TYPE] = CALL_TYPE_COIL
del entry[CALL_TYPE_COIL]
if CONF_REGISTER in entry:
entry[CONF_ADDRESS] = entry[CONF_REGISTER]
del entry[CONF_REGISTER]
if CONF_REGISTER_TYPE in entry:
entry[CONF_INPUT_TYPE] = entry[CONF_REGISTER_TYPE]
del entry[CONF_REGISTER_TYPE]
if CONF_SCAN_INTERVAL not in entry:
entry[CONF_SCAN_INTERVAL] = DEFAULT_SCAN_INTERVAL
config = None
for entry in discovery_info[CONF_SWITCHES]:
if CONF_HUB in entry:
# from old config!
discovery_info[CONF_NAME] = entry[CONF_HUB]
hub: ModbusHub = hass.data[MODBUS_DOMAIN][discovery_info[CONF_NAME]]
if entry[CONF_INPUT_TYPE] == CALL_TYPE_COIL:
switches.append(ModbusCoilSwitch(hub, entry))
else:
switches.append(ModbusRegisterSwitch(hub, entry))
async_add_entities(switches)
class ModbusBaseSwitch(ToggleEntity, RestoreEntity, ABC):
class ModbusBaseSwitch(SwitchEntity, RestoreEntity, ABC):
"""Base class representing a Modbus switch."""
def __init__(self, hub: ModbusHub, config: dict[str, Any]):
@ -108,13 +143,21 @@ class ModbusBaseSwitch(ToggleEntity, RestoreEntity, ABC):
self._slave = config.get(CONF_SLAVE)
self._is_on = None
self._available = True
self._scan_interval = timedelta(seconds=config[CONF_SCAN_INTERVAL])
async def async_added_to_hass(self):
"""Handle entity which will be added."""
state = await self.async_get_last_state()
if not state:
return
self._is_on = state.state == STATE_ON
if state:
self._is_on = state.state == STATE_ON
async_track_time_interval(
self.hass, lambda arg: self._update(), self._scan_interval
)
@abstractmethod
def _update(self):
"""Update the entity state."""
@property
def is_on(self):
@ -126,6 +169,16 @@ class ModbusBaseSwitch(ToggleEntity, RestoreEntity, ABC):
"""Return the name of the switch."""
return self._name
@property
def should_poll(self):
"""Return True if entity has to be polled for state.
False if entity pushes its state to HA.
"""
# Handle polling directly in this entity
return False
@property
def available(self) -> bool:
"""Return True if entity is available."""
@ -138,21 +191,24 @@ class ModbusCoilSwitch(ModbusBaseSwitch, SwitchEntity):
def __init__(self, hub: ModbusHub, config: dict[str, Any]):
"""Initialize the coil switch."""
super().__init__(hub, config)
self._coil = config[CALL_TYPE_COIL]
self._coil = config[CONF_ADDRESS]
def turn_on(self, **kwargs):
"""Set switch on."""
self._write_coil(self._coil, True)
self._is_on = True
self.schedule_update_ha_state()
def turn_off(self, **kwargs):
"""Set switch off."""
self._write_coil(self._coil, False)
self._is_on = False
self.schedule_update_ha_state()
def update(self):
def _update(self):
"""Update the state of the switch."""
self._is_on = self._read_coil(self._coil)
self.schedule_update_ha_state()
def _read_coil(self, coil) -> bool:
"""Read coil using the Modbus hub slave."""
@ -189,14 +245,14 @@ class ModbusRegisterSwitch(ModbusBaseSwitch, SwitchEntity):
def __init__(self, hub: ModbusHub, config: dict[str, Any]):
"""Initialize the register switch."""
super().__init__(hub, config)
self._register = config[CONF_REGISTER]
self._register = config[CONF_ADDRESS]
self._command_on = config[CONF_COMMAND_ON]
self._command_off = config[CONF_COMMAND_OFF]
self._state_on = config.get(CONF_STATE_ON, self._command_on)
self._state_off = config.get(CONF_STATE_OFF, self._command_off)
self._verify_state = config[CONF_VERIFY_STATE]
self._verify_register = config.get(CONF_VERIFY_REGISTER, self._register)
self._register_type = config[CONF_REGISTER_TYPE]
self._register_type = config[CONF_INPUT_TYPE]
self._available = True
self._is_on = None
@ -207,6 +263,7 @@ class ModbusRegisterSwitch(ModbusBaseSwitch, SwitchEntity):
self._write_register(self._command_on)
if not self._verify_state:
self._is_on = True
self.schedule_update_ha_state()
def turn_off(self, **kwargs):
"""Set switch off."""
@ -215,13 +272,14 @@ class ModbusRegisterSwitch(ModbusBaseSwitch, SwitchEntity):
self._write_register(self._command_off)
if not self._verify_state:
self._is_on = False
self.schedule_update_ha_state()
@property
def available(self) -> bool:
"""Return True if entity is available."""
return self._available
def update(self):
def _update(self):
"""Update the state of the switch."""
if not self._verify_state:
return
@ -239,6 +297,7 @@ class ModbusRegisterSwitch(ModbusBaseSwitch, SwitchEntity):
self._register,
value,
)
self.schedule_update_ha_state()
def _read_register(self) -> int | None:
try:

View file

@ -146,4 +146,5 @@ async def base_config_test(
None,
method_discovery=method_discovery,
check_config_only=True,
config_modbus=config_modbus,
)

View file

@ -0,0 +1,30 @@
"""The tests for the Modbus init."""
import pytest
import voluptuous as vol
from homeassistant.components.modbus import number
async def test_number_validator():
"""Test number validator."""
# positive tests
value = number(15)
assert isinstance(value, int)
value = number(15.1)
assert isinstance(value, float)
value = number("15")
assert isinstance(value, int)
value = number("15.1")
assert isinstance(value, float)
# exception test
try:
value = number("x15.1")
except (vol.Invalid):
return
pytest.fail("Number not throwing exception")

View file

@ -0,0 +1,70 @@
"""The tests for the Modbus sensor component."""
import pytest
from homeassistant.components.modbus.const import (
CONF_BAUDRATE,
CONF_BYTESIZE,
CONF_PARITY,
CONF_STOPBITS,
MODBUS_DOMAIN as DOMAIN,
)
from homeassistant.const import (
CONF_DELAY,
CONF_HOST,
CONF_METHOD,
CONF_NAME,
CONF_PORT,
CONF_TIMEOUT,
CONF_TYPE,
)
from .conftest import base_config_test
@pytest.mark.parametrize("do_discovery", [False, True])
@pytest.mark.parametrize(
"do_options",
[
{},
{
CONF_NAME: "modbusTest",
CONF_TIMEOUT: 30,
CONF_DELAY: 10,
},
],
)
@pytest.mark.parametrize(
"do_config",
[
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_PORT: 5501,
},
{
CONF_TYPE: "serial",
CONF_BAUDRATE: 9600,
CONF_BYTESIZE: 8,
CONF_METHOD: "rtu",
CONF_PORT: "usb01",
CONF_PARITY: "E",
CONF_STOPBITS: 1,
},
],
)
async def test_config_modbus(hass, do_discovery, do_options, do_config):
"""Run test for modbus."""
config = {
DOMAIN: do_config,
}
config.update(do_options)
await base_config_test(
hass,
None,
"",
DOMAIN,
None,
None,
method_discovery=do_discovery,
config_modbus=config,
)

View file

@ -5,16 +5,25 @@ from homeassistant.components.binary_sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.components.modbus.const import (
CALL_TYPE_COIL,
CALL_TYPE_DISCRETE,
CONF_BINARY_SENSORS,
CONF_INPUT_TYPE,
CONF_INPUTS,
)
from homeassistant.const import CONF_ADDRESS, CONF_NAME, CONF_SLAVE, STATE_OFF, STATE_ON
from homeassistant.const import (
CONF_ADDRESS,
CONF_DEVICE_CLASS,
CONF_NAME,
CONF_SLAVE,
STATE_OFF,
STATE_ON,
)
from .conftest import base_config_test, base_test
@pytest.mark.parametrize("do_discovery", [False, True])
@pytest.mark.parametrize("do_options", [False, True])
async def test_config_binary_sensor(hass, do_options):
async def test_config_binary_sensor(hass, do_discovery, do_options):
"""Run test for binary sensor."""
sensor_name = "test_sensor"
config_sensor = {
@ -26,6 +35,7 @@ async def test_config_binary_sensor(hass, do_options):
{
CONF_SLAVE: 10,
CONF_INPUT_TYPE: CALL_TYPE_DISCRETE,
CONF_DEVICE_CLASS: "door",
}
)
await base_config_test(
@ -33,9 +43,9 @@ async def test_config_binary_sensor(hass, do_options):
config_sensor,
sensor_name,
SENSOR_DOMAIN,
None,
CONF_BINARY_SENSORS,
CONF_INPUTS,
method_discovery=False,
method_discovery=do_discovery,
)
@ -73,11 +83,11 @@ async def test_all_binary_sensor(hass, do_type, regs, expected):
{CONF_NAME: sensor_name, CONF_ADDRESS: 1234, CONF_INPUT_TYPE: do_type},
sensor_name,
SENSOR_DOMAIN,
None,
CONF_BINARY_SENSORS,
CONF_INPUTS,
regs,
expected,
method_discovery=False,
method_discovery=True,
scan_interval=5,
)
assert state == expected

View file

@ -6,30 +6,42 @@ from homeassistant.components.modbus.const import (
CALL_TYPE_REGISTER_INPUT,
CONF_COUNT,
CONF_DATA_TYPE,
CONF_INPUT_TYPE,
CONF_PRECISION,
CONF_REGISTER,
CONF_REGISTER_TYPE,
CONF_REGISTERS,
CONF_REVERSE_ORDER,
CONF_SCALE,
CONF_SENSORS,
DATA_TYPE_FLOAT,
DATA_TYPE_INT,
DATA_TYPE_STRING,
DATA_TYPE_UINT,
)
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.const import CONF_NAME, CONF_OFFSET, CONF_SLAVE
from homeassistant.const import (
CONF_ADDRESS,
CONF_DEVICE_CLASS,
CONF_NAME,
CONF_OFFSET,
CONF_SLAVE,
)
from .conftest import base_config_test, base_test
@pytest.mark.parametrize("do_discovery", [False, True])
@pytest.mark.parametrize("do_options", [False, True])
async def test_config_sensor(hass, do_options):
@pytest.mark.parametrize(
"do_type", [CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_REGISTER_INPUT]
)
async def test_config_sensor(hass, do_discovery, do_options, do_type):
"""Run test for sensor."""
sensor_name = "test_sensor"
config_sensor = {
CONF_NAME: sensor_name,
CONF_REGISTER: 51,
CONF_ADDRESS: 51,
}
if do_options:
config_sensor.update(
@ -41,17 +53,25 @@ async def test_config_sensor(hass, do_options):
CONF_SCALE: 1,
CONF_REVERSE_ORDER: False,
CONF_OFFSET: 0,
CONF_REGISTER_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_INPUT_TYPE: do_type,
CONF_DEVICE_CLASS: "battery",
}
)
if not do_discovery:
# bridge difference in configuration
config_sensor[CONF_REGISTER] = config_sensor[CONF_ADDRESS]
del config_sensor[CONF_ADDRESS]
if do_options:
config_sensor[CONF_REGISTER_TYPE] = config_sensor[CONF_INPUT_TYPE]
del config_sensor[CONF_INPUT_TYPE]
await base_config_test(
hass,
config_sensor,
sensor_name,
SENSOR_DOMAIN,
None,
CONF_SENSORS,
CONF_REGISTERS,
method_discovery=False,
method_discovery=do_discovery,
)
@ -218,7 +238,7 @@ async def test_config_sensor(hass, do_options):
(
{
CONF_COUNT: 2,
CONF_REGISTER_TYPE: CALL_TYPE_REGISTER_INPUT,
CONF_INPUT_TYPE: CALL_TYPE_REGISTER_INPUT,
CONF_DATA_TYPE: DATA_TYPE_UINT,
CONF_SCALE: 1,
CONF_OFFSET: 0,
@ -230,7 +250,7 @@ async def test_config_sensor(hass, do_options):
(
{
CONF_COUNT: 2,
CONF_REGISTER_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_INPUT_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_DATA_TYPE: DATA_TYPE_UINT,
CONF_SCALE: 1,
CONF_OFFSET: 0,
@ -242,7 +262,7 @@ async def test_config_sensor(hass, do_options):
(
{
CONF_COUNT: 2,
CONF_REGISTER_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_INPUT_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_DATA_TYPE: DATA_TYPE_FLOAT,
CONF_SCALE: 1,
CONF_OFFSET: 0,
@ -254,7 +274,7 @@ async def test_config_sensor(hass, do_options):
(
{
CONF_COUNT: 8,
CONF_REGISTER_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_INPUT_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_DATA_TYPE: DATA_TYPE_STRING,
CONF_SCALE: 1,
CONF_OFFSET: 0,
@ -270,14 +290,14 @@ async def test_all_sensor(hass, cfg, regs, expected):
sensor_name = "modbus_test_sensor"
state = await base_test(
hass,
{CONF_NAME: sensor_name, CONF_REGISTER: 1234, **cfg},
{CONF_NAME: sensor_name, CONF_ADDRESS: 1234, **cfg},
sensor_name,
SENSOR_DOMAIN,
None,
CONF_SENSORS,
CONF_REGISTERS,
regs,
expected,
method_discovery=False,
method_discovery=True,
scan_interval=5,
)
assert state == expected

View file

@ -3,14 +3,25 @@ import pytest
from homeassistant.components.modbus.const import (
CALL_TYPE_COIL,
CALL_TYPE_REGISTER_HOLDING,
CALL_TYPE_REGISTER_INPUT,
CONF_COILS,
CONF_INPUT_TYPE,
CONF_REGISTER,
CONF_REGISTER_TYPE,
CONF_REGISTERS,
CONF_STATE_OFF,
CONF_STATE_ON,
CONF_SWITCHES,
CONF_VERIFY_REGISTER,
CONF_VERIFY_STATE,
)
from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN
from homeassistant.const import (
CONF_ADDRESS,
CONF_COMMAND_OFF,
CONF_COMMAND_ON,
CONF_DEVICE_CLASS,
CONF_NAME,
CONF_SLAVE,
STATE_OFF,
@ -20,39 +31,66 @@ from homeassistant.const import (
from .conftest import base_config_test, base_test
@pytest.mark.parametrize("do_discovery", [False, True])
@pytest.mark.parametrize("do_options", [False, True])
@pytest.mark.parametrize("read_type", [CALL_TYPE_COIL, CONF_REGISTER])
async def test_config_switch(hass, do_options, read_type):
@pytest.mark.parametrize(
"read_type", [CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_REGISTER_INPUT, CALL_TYPE_COIL]
)
async def test_config_switch(hass, do_discovery, do_options, read_type):
"""Run test for switch."""
device_name = "test_switch"
if read_type == CONF_REGISTER:
device_config = {
CONF_NAME: device_name,
CONF_REGISTER: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
CONF_COMMAND_ON: 0x01,
}
array_type = CONF_REGISTERS
device_config = {
CONF_NAME: device_name,
}
if not do_discovery:
if read_type == CALL_TYPE_COIL:
array_type = CONF_COILS
device_config[CALL_TYPE_COIL] = 1234
device_config[CONF_SLAVE] = 1
else:
array_type = CONF_REGISTERS
device_config[CONF_REGISTER] = 1234
device_config[CONF_COMMAND_OFF] = 0x00
device_config[CONF_COMMAND_ON] = 0x01
else:
device_config = {
CONF_NAME: device_name,
read_type: 1234,
CONF_SLAVE: 10,
}
array_type = CONF_COILS
array_type = None
device_config[CONF_ADDRESS] = 1234
if read_type == CALL_TYPE_COIL:
device_config[CONF_INPUT_TYPE] = CALL_TYPE_COIL
if do_options:
device_config.update({})
device_config[CONF_SLAVE] = 1
if read_type != CALL_TYPE_COIL:
device_config.update(
{
CONF_STATE_OFF: 0,
CONF_STATE_ON: 1,
CONF_VERIFY_REGISTER: 1235,
CONF_COMMAND_OFF: 0x00,
CONF_COMMAND_ON: 0x01,
}
)
if do_discovery:
device_config.update(
{
CONF_DEVICE_CLASS: "switch",
CONF_INPUT_TYPE: read_type,
}
)
else:
if read_type != CALL_TYPE_COIL:
device_config[CONF_VERIFY_STATE] = True
device_config[CONF_REGISTER_TYPE] = read_type
await base_config_test(
hass,
device_config,
device_name,
SWITCH_DOMAIN,
None,
CONF_SWITCHES,
array_type,
method_discovery=False,
method_discovery=do_discovery,
)
@ -88,16 +126,16 @@ async def test_coil_switch(hass, regs, expected):
hass,
{
CONF_NAME: switch_name,
CALL_TYPE_COIL: 1234,
CONF_SLAVE: 1,
CONF_ADDRESS: 1234,
CONF_INPUT_TYPE: CALL_TYPE_COIL,
},
switch_name,
SWITCH_DOMAIN,
None,
CONF_SWITCHES,
CONF_COILS,
regs,
expected,
method_discovery=False,
method_discovery=True,
scan_interval=5,
)
assert state == expected
@ -142,7 +180,7 @@ async def test_register_switch(hass, regs, expected):
},
switch_name,
SWITCH_DOMAIN,
None,
CONF_SWITCHES,
CONF_REGISTERS,
regs,
expected,
@ -183,7 +221,7 @@ async def test_register_state_switch(hass, regs, expected):
},
switch_name,
SWITCH_DOMAIN,
None,
CONF_SWITCHES,
CONF_REGISTERS,
regs,
expected,