Fix race in MQTT sensor when last_reset_topic is configured (#55463)
This commit is contained in:
parent
9b3346bc80
commit
18c03e2f8d
2 changed files with 117 additions and 29 deletions
|
@ -53,18 +53,48 @@ MQTT_SENSOR_ATTRIBUTES_BLOCKED = frozenset(
|
||||||
|
|
||||||
DEFAULT_NAME = "MQTT Sensor"
|
DEFAULT_NAME = "MQTT Sensor"
|
||||||
DEFAULT_FORCE_UPDATE = False
|
DEFAULT_FORCE_UPDATE = False
|
||||||
PLATFORM_SCHEMA = mqtt.MQTT_RO_PLATFORM_SCHEMA.extend(
|
|
||||||
{
|
|
||||||
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
|
def validate_options(conf):
|
||||||
vol.Optional(CONF_EXPIRE_AFTER): cv.positive_int,
|
"""Validate options.
|
||||||
vol.Optional(CONF_FORCE_UPDATE, default=DEFAULT_FORCE_UPDATE): cv.boolean,
|
|
||||||
vol.Optional(CONF_LAST_RESET_TOPIC): mqtt.valid_subscribe_topic,
|
If last reset topic is present it must be same as the state topic.
|
||||||
vol.Optional(CONF_LAST_RESET_VALUE_TEMPLATE): cv.template,
|
"""
|
||||||
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
|
if (
|
||||||
vol.Optional(CONF_STATE_CLASS): STATE_CLASSES_SCHEMA,
|
CONF_LAST_RESET_TOPIC in conf
|
||||||
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
|
and CONF_STATE_TOPIC in conf
|
||||||
}
|
and conf[CONF_LAST_RESET_TOPIC] != conf[CONF_STATE_TOPIC]
|
||||||
).extend(MQTT_ENTITY_COMMON_SCHEMA.schema)
|
):
|
||||||
|
_LOGGER.warning(
|
||||||
|
"'%s' must be same as '%s'", CONF_LAST_RESET_TOPIC, CONF_STATE_TOPIC
|
||||||
|
)
|
||||||
|
|
||||||
|
if CONF_LAST_RESET_TOPIC in conf and CONF_LAST_RESET_VALUE_TEMPLATE not in conf:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"'%s' must be set if '%s' is set",
|
||||||
|
CONF_LAST_RESET_VALUE_TEMPLATE,
|
||||||
|
CONF_LAST_RESET_TOPIC,
|
||||||
|
)
|
||||||
|
|
||||||
|
return conf
|
||||||
|
|
||||||
|
|
||||||
|
PLATFORM_SCHEMA = vol.All(
|
||||||
|
cv.deprecated(CONF_LAST_RESET_TOPIC),
|
||||||
|
mqtt.MQTT_RO_PLATFORM_SCHEMA.extend(
|
||||||
|
{
|
||||||
|
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
|
||||||
|
vol.Optional(CONF_EXPIRE_AFTER): cv.positive_int,
|
||||||
|
vol.Optional(CONF_FORCE_UPDATE, default=DEFAULT_FORCE_UPDATE): cv.boolean,
|
||||||
|
vol.Optional(CONF_LAST_RESET_TOPIC): mqtt.valid_subscribe_topic,
|
||||||
|
vol.Optional(CONF_LAST_RESET_VALUE_TEMPLATE): cv.template,
|
||||||
|
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
|
||||||
|
vol.Optional(CONF_STATE_CLASS): STATE_CLASSES_SCHEMA,
|
||||||
|
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
|
||||||
|
}
|
||||||
|
).extend(MQTT_ENTITY_COMMON_SCHEMA.schema),
|
||||||
|
validate_options,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_platform(
|
async def async_setup_platform(
|
||||||
|
@ -127,10 +157,7 @@ class MqttSensor(MqttEntity, SensorEntity):
|
||||||
"""(Re)Subscribe to topics."""
|
"""(Re)Subscribe to topics."""
|
||||||
topics = {}
|
topics = {}
|
||||||
|
|
||||||
@callback
|
def _update_state(msg):
|
||||||
@log_messages(self.hass, self.entity_id)
|
|
||||||
def message_received(msg):
|
|
||||||
"""Handle new MQTT messages."""
|
|
||||||
payload = msg.payload
|
payload = msg.payload
|
||||||
# auto-expire enabled?
|
# auto-expire enabled?
|
||||||
expire_after = self._config.get(CONF_EXPIRE_AFTER)
|
expire_after = self._config.get(CONF_EXPIRE_AFTER)
|
||||||
|
@ -159,18 +186,8 @@ class MqttSensor(MqttEntity, SensorEntity):
|
||||||
variables=variables,
|
variables=variables,
|
||||||
)
|
)
|
||||||
self._state = payload
|
self._state = payload
|
||||||
self.async_write_ha_state()
|
|
||||||
|
|
||||||
topics["state_topic"] = {
|
def _update_last_reset(msg):
|
||||||
"topic": self._config[CONF_STATE_TOPIC],
|
|
||||||
"msg_callback": message_received,
|
|
||||||
"qos": self._config[CONF_QOS],
|
|
||||||
}
|
|
||||||
|
|
||||||
@callback
|
|
||||||
@log_messages(self.hass, self.entity_id)
|
|
||||||
def last_reset_message_received(msg):
|
|
||||||
"""Handle new last_reset messages."""
|
|
||||||
payload = msg.payload
|
payload = msg.payload
|
||||||
|
|
||||||
template = self._config.get(CONF_LAST_RESET_VALUE_TEMPLATE)
|
template = self._config.get(CONF_LAST_RESET_VALUE_TEMPLATE)
|
||||||
|
@ -193,9 +210,36 @@ class MqttSensor(MqttEntity, SensorEntity):
|
||||||
_LOGGER.warning(
|
_LOGGER.warning(
|
||||||
"Invalid last_reset message '%s' from '%s'", msg.payload, msg.topic
|
"Invalid last_reset message '%s' from '%s'", msg.payload, msg.topic
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@callback
|
||||||
|
@log_messages(self.hass, self.entity_id)
|
||||||
|
def message_received(msg):
|
||||||
|
"""Handle new MQTT messages."""
|
||||||
|
_update_state(msg)
|
||||||
|
if CONF_LAST_RESET_VALUE_TEMPLATE in self._config and (
|
||||||
|
CONF_LAST_RESET_TOPIC not in self._config
|
||||||
|
or self._config[CONF_LAST_RESET_TOPIC] == self._config[CONF_STATE_TOPIC]
|
||||||
|
):
|
||||||
|
_update_last_reset(msg)
|
||||||
self.async_write_ha_state()
|
self.async_write_ha_state()
|
||||||
|
|
||||||
if CONF_LAST_RESET_TOPIC in self._config:
|
topics["state_topic"] = {
|
||||||
|
"topic": self._config[CONF_STATE_TOPIC],
|
||||||
|
"msg_callback": message_received,
|
||||||
|
"qos": self._config[CONF_QOS],
|
||||||
|
}
|
||||||
|
|
||||||
|
@callback
|
||||||
|
@log_messages(self.hass, self.entity_id)
|
||||||
|
def last_reset_message_received(msg):
|
||||||
|
"""Handle new last_reset messages."""
|
||||||
|
_update_last_reset(msg)
|
||||||
|
self.async_write_ha_state()
|
||||||
|
|
||||||
|
if (
|
||||||
|
CONF_LAST_RESET_TOPIC in self._config
|
||||||
|
and self._config[CONF_LAST_RESET_TOPIC] != self._config[CONF_STATE_TOPIC]
|
||||||
|
):
|
||||||
topics["last_reset_topic"] = {
|
topics["last_reset_topic"] = {
|
||||||
"topic": self._config[CONF_LAST_RESET_TOPIC],
|
"topic": self._config[CONF_LAST_RESET_TOPIC],
|
||||||
"msg_callback": last_reset_message_received,
|
"msg_callback": last_reset_message_received,
|
||||||
|
|
|
@ -208,7 +208,7 @@ async def test_setting_sensor_value_via_mqtt_json_message(hass, mqtt_mock):
|
||||||
assert state.state == "100"
|
assert state.state == "100"
|
||||||
|
|
||||||
|
|
||||||
async def test_setting_sensor_last_reset_via_mqtt_message(hass, mqtt_mock):
|
async def test_setting_sensor_last_reset_via_mqtt_message(hass, mqtt_mock, caplog):
|
||||||
"""Test the setting of the last_reset property via MQTT."""
|
"""Test the setting of the last_reset property via MQTT."""
|
||||||
assert await async_setup_component(
|
assert await async_setup_component(
|
||||||
hass,
|
hass,
|
||||||
|
@ -228,6 +228,11 @@ async def test_setting_sensor_last_reset_via_mqtt_message(hass, mqtt_mock):
|
||||||
async_fire_mqtt_message(hass, "last-reset-topic", "2020-01-02 08:11:00")
|
async_fire_mqtt_message(hass, "last-reset-topic", "2020-01-02 08:11:00")
|
||||||
state = hass.states.get("sensor.test")
|
state = hass.states.get("sensor.test")
|
||||||
assert state.attributes.get("last_reset") == "2020-01-02T08:11:00"
|
assert state.attributes.get("last_reset") == "2020-01-02T08:11:00"
|
||||||
|
assert "'last_reset_topic' must be same as 'state_topic'" in caplog.text
|
||||||
|
assert (
|
||||||
|
"'last_reset_value_template' must be set if 'last_reset_topic' is set"
|
||||||
|
in caplog.text
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("datestring", ["2020-21-02 08:11:00", "Hello there!"])
|
@pytest.mark.parametrize("datestring", ["2020-21-02 08:11:00", "Hello there!"])
|
||||||
|
@ -306,6 +311,45 @@ async def test_setting_sensor_last_reset_via_mqtt_json_message(hass, mqtt_mock):
|
||||||
assert state.attributes.get("last_reset") == "2020-01-02T08:11:00"
|
assert state.attributes.get("last_reset") == "2020-01-02T08:11:00"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("extra", [{}, {"last_reset_topic": "test-topic"}])
|
||||||
|
async def test_setting_sensor_last_reset_via_mqtt_json_message_2(
|
||||||
|
hass, mqtt_mock, caplog, extra
|
||||||
|
):
|
||||||
|
"""Test the setting of the value via MQTT with JSON payload."""
|
||||||
|
assert await async_setup_component(
|
||||||
|
hass,
|
||||||
|
sensor.DOMAIN,
|
||||||
|
{
|
||||||
|
sensor.DOMAIN: {
|
||||||
|
**{
|
||||||
|
"platform": "mqtt",
|
||||||
|
"name": "test",
|
||||||
|
"state_topic": "test-topic",
|
||||||
|
"unit_of_measurement": "kWh",
|
||||||
|
"value_template": "{{ value_json.value | float / 60000 }}",
|
||||||
|
"last_reset_value_template": "{{ utcnow().fromtimestamp(value_json.time / 1000, tz=utcnow().tzinfo) }}",
|
||||||
|
},
|
||||||
|
**extra,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
async_fire_mqtt_message(
|
||||||
|
hass,
|
||||||
|
"test-topic",
|
||||||
|
'{"type":"minute","time":1629385500000,"value":947.7706166666667}',
|
||||||
|
)
|
||||||
|
state = hass.states.get("sensor.test")
|
||||||
|
assert float(state.state) == pytest.approx(0.015796176944444445)
|
||||||
|
assert state.attributes.get("last_reset") == "2021-08-19T15:05:00+00:00"
|
||||||
|
assert "'last_reset_topic' must be same as 'state_topic'" not in caplog.text
|
||||||
|
assert (
|
||||||
|
"'last_reset_value_template' must be set if 'last_reset_topic' is set"
|
||||||
|
not in caplog.text
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def test_force_update_disabled(hass, mqtt_mock):
|
async def test_force_update_disabled(hass, mqtt_mock):
|
||||||
"""Test force update option."""
|
"""Test force update option."""
|
||||||
assert await async_setup_component(
|
assert await async_setup_component(
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue