Fix 64-bit modbus sensor register reads (#25672)

* Fix 64-bit modbus sensor register reads

When reading four 16-bit modbus registers as a sensor value,
slave output is stored first as 64-bit integer, but before returning
that value is converted to double precision floating point. This
causes rounding errors for integer values bigger than 2^53.

After this change floating point conversion is done only if user
has configured scaling or offset using floating points.

* Formatting

* Review fixes
This commit is contained in:
Tomi Lehto 2019-08-10 03:03:12 +03:00 committed by Paulus Schoutsen
parent fafd228418
commit b79f1336be
3 changed files with 392 additions and 5 deletions

View file

@ -2,6 +2,7 @@
import logging
import struct
from typing import Any, Union
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
@ -36,6 +37,26 @@ DATA_TYPE_UINT = "uint"
REGISTER_TYPE_HOLDING = "holding"
REGISTER_TYPE_INPUT = "input"
def number(value: Any) -> Union[int, float]:
"""Coerce a value to number without losing precision."""
if isinstance(value, int):
return value
if isinstance(value, str):
try:
value = int(value)
return value
except (TypeError, ValueError):
pass
try:
value = float(value)
return value
except (TypeError, ValueError):
raise vol.Invalid("invalid number {}".format(value))
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_REGISTERS): [
@ -47,13 +68,13 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
[DATA_TYPE_INT, DATA_TYPE_UINT, DATA_TYPE_FLOAT, DATA_TYPE_CUSTOM]
),
vol.Optional(CONF_HUB, default=DEFAULT_HUB): cv.string,
vol.Optional(CONF_OFFSET, default=0): vol.Coerce(float),
vol.Optional(CONF_OFFSET, default=0): number,
vol.Optional(CONF_PRECISION, default=0): cv.positive_int,
vol.Optional(CONF_REGISTER_TYPE, default=REGISTER_TYPE_HOLDING): vol.In(
[REGISTER_TYPE_HOLDING, REGISTER_TYPE_INPUT]
),
vol.Optional(CONF_REVERSE_ORDER, default=False): cv.boolean,
vol.Optional(CONF_SCALE, default=1): vol.Coerce(float),
vol.Optional(CONF_SCALE, default=1): number,
vol.Optional(CONF_SLAVE): cv.positive_int,
vol.Optional(CONF_STRUCTURE): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
@ -207,6 +228,10 @@ class ModbusRegisterSensor(RestoreEntity):
return
byte_string = b"".join([x.to_bytes(2, byteorder="big") for x in registers])
val = struct.unpack(self._structure, byte_string)[0]
self._value = format(
self._scale * val + self._offset, ".{}f".format(self._precision)
)
val = self._scale * val + self._offset
if isinstance(val, int):
self._value = str(val)
if self._precision > 0:
self._value += "." + "0" * self._precision
else:
self._value = f"{val:.{self._precision}f}"

View file

@ -0,0 +1 @@
"""The tests for Modbus platforms."""

View file

@ -0,0 +1,361 @@
"""The tests for the Modbus sensor component."""
import pytest
from datetime import timedelta
from unittest import mock
from homeassistant.const import (
CONF_NAME,
CONF_OFFSET,
CONF_PLATFORM,
CONF_SCAN_INTERVAL,
)
from homeassistant.components.modbus import DEFAULT_HUB, DOMAIN as MODBUS_DOMAIN
from homeassistant.components.modbus.sensor import (
CONF_COUNT,
CONF_DATA_TYPE,
CONF_PRECISION,
CONF_REGISTER,
CONF_REGISTER_TYPE,
CONF_REGISTERS,
CONF_REVERSE_ORDER,
CONF_SCALE,
DATA_TYPE_FLOAT,
DATA_TYPE_INT,
DATA_TYPE_UINT,
REGISTER_TYPE_HOLDING,
REGISTER_TYPE_INPUT,
)
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.common import MockModule, mock_integration, async_fire_time_changed
@pytest.fixture()
def mock_hub(hass):
"""Mock hub."""
mock_integration(hass, MockModule(MODBUS_DOMAIN))
hub = mock.MagicMock()
hub.name = "hub"
hass.data[MODBUS_DOMAIN] = {DEFAULT_HUB: hub}
return hub
common_register_config = {CONF_NAME: "test-config", CONF_REGISTER: 1234}
class ReadResult:
"""Storage class for register read results."""
def __init__(self, register_words):
"""Init."""
self.registers = register_words
async def run_test(hass, mock_hub, register_config, register_words, expected):
"""Run test for given config and check that sensor outputs expected result."""
# Full sensor configuration
sensor_name = "modbus_test_sensor"
scan_interval = 5
config = {
SENSOR_DOMAIN: {
CONF_PLATFORM: "modbus",
CONF_SCAN_INTERVAL: scan_interval,
CONF_REGISTERS: [
dict(**{CONF_NAME: sensor_name, CONF_REGISTER: 1234}, **register_config)
],
}
}
# Setup inputs for the sensor
read_result = ReadResult(register_words)
if register_config.get(CONF_REGISTER_TYPE) == REGISTER_TYPE_INPUT:
mock_hub.read_input_registers.return_value = read_result
else:
mock_hub.read_holding_registers.return_value = read_result
# Initialize sensor
now = dt_util.utcnow()
with mock.patch("homeassistant.helpers.event.dt_util.utcnow", return_value=now):
assert await async_setup_component(hass, SENSOR_DOMAIN, config)
# Trigger update call with time_changed event
now += timedelta(seconds=scan_interval + 1)
with mock.patch("homeassistant.helpers.event.dt_util.utcnow", return_value=now):
async_fire_time_changed(hass, now)
await hass.async_block_till_done()
# Check state
entity_id = f"{SENSOR_DOMAIN}.{sensor_name}"
state = hass.states.get(entity_id).state
assert state == expected
async def test_simple_word_register(hass, mock_hub):
"""Test conversion of single word register."""
register_config = {
CONF_COUNT: 1,
CONF_DATA_TYPE: DATA_TYPE_INT,
CONF_SCALE: 1,
CONF_OFFSET: 0,
CONF_PRECISION: 0,
}
await run_test(hass, mock_hub, register_config, register_words=[0], expected="0")
async def test_optional_conf_keys(hass, mock_hub):
"""Test handling of optional configuration keys."""
register_config = {}
await run_test(
hass, mock_hub, register_config, register_words=[0x8000], expected="-32768"
)
async def test_offset(hass, mock_hub):
"""Test offset calculation."""
register_config = {
CONF_COUNT: 1,
CONF_DATA_TYPE: DATA_TYPE_INT,
CONF_SCALE: 1,
CONF_OFFSET: 13,
CONF_PRECISION: 0,
}
await run_test(hass, mock_hub, register_config, register_words=[7], expected="20")
async def test_scale_and_offset(hass, mock_hub):
"""Test handling of scale and offset."""
register_config = {
CONF_COUNT: 1,
CONF_DATA_TYPE: DATA_TYPE_INT,
CONF_SCALE: 3,
CONF_OFFSET: 13,
CONF_PRECISION: 0,
}
await run_test(hass, mock_hub, register_config, register_words=[7], expected="34")
async def test_ints_can_have_precision(hass, mock_hub):
"""Test precision can be specified event if using integer values only."""
register_config = {
CONF_COUNT: 1,
CONF_DATA_TYPE: DATA_TYPE_UINT,
CONF_SCALE: 3,
CONF_OFFSET: 13,
CONF_PRECISION: 4,
}
await run_test(
hass, mock_hub, register_config, register_words=[7], expected="34.0000"
)
async def test_floats_get_rounded_correctly(hass, mock_hub):
"""Test that floating point values get rounded correctly."""
register_config = {
CONF_COUNT: 1,
CONF_DATA_TYPE: DATA_TYPE_INT,
CONF_SCALE: 1.5,
CONF_OFFSET: 0,
CONF_PRECISION: 0,
}
await run_test(hass, mock_hub, register_config, register_words=[1], expected="2")
async def test_parameters_as_strings(hass, mock_hub):
"""Test that scale, offset and precision can be given as strings."""
register_config = {
CONF_COUNT: 1,
CONF_DATA_TYPE: DATA_TYPE_INT,
CONF_SCALE: "1.5",
CONF_OFFSET: "5",
CONF_PRECISION: "1",
}
await run_test(hass, mock_hub, register_config, register_words=[9], expected="18.5")
async def test_floating_point_scale(hass, mock_hub):
"""Test use of floating point scale."""
register_config = {
CONF_COUNT: 1,
CONF_DATA_TYPE: DATA_TYPE_INT,
CONF_SCALE: 2.4,
CONF_OFFSET: 0,
CONF_PRECISION: 2,
}
await run_test(hass, mock_hub, register_config, register_words=[1], expected="2.40")
async def test_floating_point_offset(hass, mock_hub):
"""Test use of floating point scale."""
register_config = {
CONF_COUNT: 1,
CONF_DATA_TYPE: DATA_TYPE_INT,
CONF_SCALE: 1,
CONF_OFFSET: -10.3,
CONF_PRECISION: 1,
}
await run_test(hass, mock_hub, register_config, register_words=[2], expected="-8.3")
async def test_signed_two_word_register(hass, mock_hub):
"""Test reading of signed register with two words."""
register_config = {
CONF_COUNT: 2,
CONF_DATA_TYPE: DATA_TYPE_INT,
CONF_SCALE: 1,
CONF_OFFSET: 0,
CONF_PRECISION: 0,
}
await run_test(
hass,
mock_hub,
register_config,
register_words=[0x89AB, 0xCDEF],
expected="-1985229329",
)
async def test_unsigned_two_word_register(hass, mock_hub):
"""Test reading of unsigned register with two words."""
register_config = {
CONF_COUNT: 2,
CONF_DATA_TYPE: DATA_TYPE_UINT,
CONF_SCALE: 1,
CONF_OFFSET: 0,
CONF_PRECISION: 0,
}
await run_test(
hass,
mock_hub,
register_config,
register_words=[0x89AB, 0xCDEF],
expected=str(0x89ABCDEF),
)
async def test_reversed(hass, mock_hub):
"""Test handling of reversed register words."""
register_config = {
CONF_COUNT: 2,
CONF_DATA_TYPE: DATA_TYPE_UINT,
CONF_REVERSE_ORDER: True,
}
await run_test(
hass,
mock_hub,
register_config,
register_words=[0x89AB, 0xCDEF],
expected=str(0xCDEF89AB),
)
async def test_four_word_register(hass, mock_hub):
"""Test reading of 64-bit register."""
register_config = {
CONF_COUNT: 4,
CONF_DATA_TYPE: DATA_TYPE_UINT,
CONF_SCALE: 1,
CONF_OFFSET: 0,
CONF_PRECISION: 0,
}
await run_test(
hass,
mock_hub,
register_config,
register_words=[0x89AB, 0xCDEF, 0x0123, 0x4567],
expected="9920249030613615975",
)
async def test_four_word_register_precision_is_intact_with_int_params(hass, mock_hub):
"""Test that precision is not lost when doing integer arithmetic for 64-bit register."""
register_config = {
CONF_COUNT: 4,
CONF_DATA_TYPE: DATA_TYPE_UINT,
CONF_SCALE: 2,
CONF_OFFSET: 3,
CONF_PRECISION: 0,
}
await run_test(
hass,
mock_hub,
register_config,
register_words=[0x0123, 0x4567, 0x89AB, 0xCDEF],
expected="163971058432973793",
)
async def test_four_word_register_precision_is_lost_with_float_params(hass, mock_hub):
"""Test that precision is affected when floating point conversion is done."""
register_config = {
CONF_COUNT: 4,
CONF_DATA_TYPE: DATA_TYPE_UINT,
CONF_SCALE: 2.0,
CONF_OFFSET: 3.0,
CONF_PRECISION: 0,
}
await run_test(
hass,
mock_hub,
register_config,
register_words=[0x0123, 0x4567, 0x89AB, 0xCDEF],
expected="163971058432973792",
)
async def test_two_word_input_register(hass, mock_hub):
"""Test reaging of input register."""
register_config = {
CONF_COUNT: 2,
CONF_REGISTER_TYPE: REGISTER_TYPE_INPUT,
CONF_DATA_TYPE: DATA_TYPE_UINT,
CONF_SCALE: 1,
CONF_OFFSET: 0,
CONF_PRECISION: 0,
}
await run_test(
hass,
mock_hub,
register_config,
register_words=[0x89AB, 0xCDEF],
expected=str(0x89ABCDEF),
)
async def test_two_word_holding_register(hass, mock_hub):
"""Test reaging of holding register."""
register_config = {
CONF_COUNT: 2,
CONF_REGISTER_TYPE: REGISTER_TYPE_HOLDING,
CONF_DATA_TYPE: DATA_TYPE_UINT,
CONF_SCALE: 1,
CONF_OFFSET: 0,
CONF_PRECISION: 0,
}
await run_test(
hass,
mock_hub,
register_config,
register_words=[0x89AB, 0xCDEF],
expected=str(0x89ABCDEF),
)
async def test_float_data_type(hass, mock_hub):
"""Test floating point register data type."""
register_config = {
CONF_COUNT: 2,
CONF_REGISTER_TYPE: REGISTER_TYPE_HOLDING,
CONF_DATA_TYPE: DATA_TYPE_FLOAT,
CONF_SCALE: 1,
CONF_OFFSET: 0,
CONF_PRECISION: 5,
}
await run_test(
hass,
mock_hub,
register_config,
register_words=[16286, 1617],
expected="1.23457",
)