"""Support for Modbus Register sensors.""" from __future__ import annotations import logging import struct from typing import Any from pymodbus.exceptions import ConnectionException, ModbusException from pymodbus.pdu import ExceptionResponse import voluptuous as vol from homeassistant.components.sensor import DEVICE_CLASSES_SCHEMA, PLATFORM_SCHEMA from homeassistant.const import ( CONF_DEVICE_CLASS, CONF_NAME, CONF_OFFSET, CONF_SLAVE, CONF_STRUCTURE, CONF_UNIT_OF_MEASUREMENT, ) from homeassistant.helpers import config_validation as cv from homeassistant.helpers.restore_state import RestoreEntity from .const import ( CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_REGISTER_INPUT, CONF_COUNT, CONF_DATA_TYPE, CONF_HUB, CONF_PRECISION, CONF_REGISTER, CONF_REGISTER_TYPE, CONF_REGISTERS, CONF_REVERSE_ORDER, CONF_SCALE, DATA_TYPE_CUSTOM, DATA_TYPE_FLOAT, DATA_TYPE_INT, DATA_TYPE_STRING, DATA_TYPE_UINT, DEFAULT_HUB, DEFAULT_STRUCT_FORMAT, MODBUS_DOMAIN, ) _LOGGER = logging.getLogger(__name__) def number(value: Any) -> int | float: """Coerce a value to number without losing precision.""" if isinstance(value, int): return value if isinstance(value, str): try: value = int(value) return value except (TypeError, ValueError): pass try: value = float(value) return value except (TypeError, ValueError) as err: raise vol.Invalid(f"invalid number {value}") from err PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( { vol.Required(CONF_REGISTERS): [ { vol.Required(CONF_NAME): cv.string, vol.Required(CONF_REGISTER): cv.positive_int, vol.Optional(CONF_COUNT, default=1): cv.positive_int, vol.Optional(CONF_DATA_TYPE, default=DATA_TYPE_INT): vol.In( [ DATA_TYPE_INT, DATA_TYPE_UINT, DATA_TYPE_FLOAT, DATA_TYPE_STRING, DATA_TYPE_CUSTOM, ] ), vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA, vol.Optional(CONF_HUB, default=DEFAULT_HUB): cv.string, vol.Optional(CONF_OFFSET, default=0): number, vol.Optional(CONF_PRECISION, default=0): cv.positive_int, vol.Optional( CONF_REGISTER_TYPE, default=CALL_TYPE_REGISTER_HOLDING ): vol.In([CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_REGISTER_INPUT]), vol.Optional(CONF_REVERSE_ORDER, default=False): cv.boolean, vol.Optional(CONF_SCALE, default=1): number, vol.Optional(CONF_SLAVE): cv.positive_int, vol.Optional(CONF_STRUCTURE): cv.string, vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string, } ] } ) def setup_platform(hass, config, add_entities, discovery_info=None): """Set up the Modbus sensors.""" sensors = [] for register in config[CONF_REGISTERS]: if register[CONF_DATA_TYPE] == DATA_TYPE_STRING: structure = str(register[CONF_COUNT] * 2) + "s" elif register[CONF_DATA_TYPE] != DATA_TYPE_CUSTOM: try: structure = f">{DEFAULT_STRUCT_FORMAT[register[CONF_DATA_TYPE]][register[CONF_COUNT]]}" except KeyError: _LOGGER.error( "Unable to detect data type for %s sensor, try a custom type", register[CONF_NAME], ) continue else: structure = register.get(CONF_STRUCTURE) try: size = struct.calcsize(structure) except struct.error as err: _LOGGER.error("Error in sensor %s structure: %s", register[CONF_NAME], err) continue if register[CONF_COUNT] * 2 != size: _LOGGER.error( "Structure size (%d bytes) mismatch registers count (%d words)", size, register[CONF_COUNT], ) continue hub_name = register[CONF_HUB] hub = hass.data[MODBUS_DOMAIN][hub_name] sensors.append( ModbusRegisterSensor( hub, register[CONF_NAME], register.get(CONF_SLAVE), register[CONF_REGISTER], register[CONF_REGISTER_TYPE], register.get(CONF_UNIT_OF_MEASUREMENT), register[CONF_COUNT], register[CONF_REVERSE_ORDER], register[CONF_SCALE], register[CONF_OFFSET], structure, register[CONF_PRECISION], register[CONF_DATA_TYPE], register.get(CONF_DEVICE_CLASS), ) ) if not sensors: return False add_entities(sensors) class ModbusRegisterSensor(RestoreEntity): """Modbus register sensor.""" def __init__( self, hub, name, slave, register, register_type, unit_of_measurement, count, reverse_order, scale, offset, structure, precision, data_type, device_class, ): """Initialize the modbus register sensor.""" self._hub = hub self._name = name self._slave = int(slave) if slave else None self._register = int(register) self._register_type = register_type self._unit_of_measurement = unit_of_measurement self._count = int(count) self._reverse_order = reverse_order self._scale = scale self._offset = offset self._precision = precision self._structure = structure self._data_type = data_type self._device_class = device_class self._value = None self._available = True async def async_added_to_hass(self): """Handle entity which will be added.""" state = await self.async_get_last_state() if not state: return self._value = state.state @property def state(self): """Return the state of the sensor.""" return self._value @property def name(self): """Return the name of the sensor.""" return self._name @property def unit_of_measurement(self): """Return the unit of measurement.""" return self._unit_of_measurement @property def device_class(self) -> str | None: """Return the device class of the sensor.""" return self._device_class @property def available(self) -> bool: """Return True if entity is available.""" return self._available def update(self): """Update the state of the sensor.""" try: if self._register_type == CALL_TYPE_REGISTER_INPUT: result = self._hub.read_input_registers( self._slave, self._register, self._count ) else: result = self._hub.read_holding_registers( self._slave, self._register, self._count ) except ConnectionException: self._available = False return if isinstance(result, (ModbusException, ExceptionResponse)): self._available = False return registers = result.registers if self._reverse_order: registers.reverse() byte_string = b"".join([x.to_bytes(2, byteorder="big") for x in registers]) if self._data_type == DATA_TYPE_STRING: self._value = byte_string.decode() else: val = struct.unpack(self._structure, byte_string) # Issue: https://github.com/home-assistant/core/issues/41944 # If unpack() returns a tuple greater than 1, don't try to process the value. # Instead, return the values of unpack(...) separated by commas. if len(val) > 1: self._value = ",".join(map(str, val)) else: val = val[0] # Apply scale and precision to floats and ints if isinstance(val, (float, int)): val = self._scale * val + self._offset # We could convert int to float, and the code would still work; however # we lose some precision, and unit tests will fail. Therefore, we do # the conversion only when it's absolutely necessary. if isinstance(val, int) and self._precision == 0: self._value = str(val) else: self._value = f"{float(val):.{self._precision}f}" else: # Don't process remaining datatypes (bytes and booleans) self._value = str(val) self._available = True