From d4a95b3735f495e59f33774ddb29d052c77e0722 Mon Sep 17 00:00:00 2001 From: Jan Bouwhuis Date: Sat, 25 May 2024 23:24:38 +0200 Subject: [PATCH] Refactor mqtt callbacks for light basic, json and template schema (#118113) --- .../components/mqtt/light/schema_basic.py | 473 +++++++++--------- .../components/mqtt/light/schema_json.py | 211 ++++---- .../components/mqtt/light/schema_template.py | 186 +++---- 3 files changed, 429 insertions(+), 441 deletions(-) diff --git a/homeassistant/components/mqtt/light/schema_basic.py b/homeassistant/components/mqtt/light/schema_basic.py index 904e45b3d2f..650ca1eff6a 100644 --- a/homeassistant/components/mqtt/light/schema_basic.py +++ b/homeassistant/components/mqtt/light/schema_basic.py @@ -3,6 +3,7 @@ from __future__ import annotations from collections.abc import Callable +from functools import partial import logging from typing import Any, cast @@ -53,8 +54,7 @@ from ..const import ( CONF_STATE_VALUE_TEMPLATE, PAYLOAD_NONE, ) -from ..debug_info import log_messages -from ..mixins import MqttEntity, write_state_on_attr_change +from ..mixins import MqttEntity from ..models import ( MessageCallbackType, MqttCommandTemplate, @@ -378,263 +378,248 @@ class MqttLight(MqttEntity, LightEntity, RestoreEntity): attr: bool = getattr(self, f"_optimistic_{attribute}") return attr + @callback + def _state_received(self, msg: ReceiveMessage) -> None: + """Handle new MQTT messages.""" + payload = self._value_templates[CONF_STATE_VALUE_TEMPLATE]( + msg.payload, PayloadSentinel.NONE + ) + if not payload: + _LOGGER.debug("Ignoring empty state message from '%s'", msg.topic) + return + + if payload == self._payload["on"]: + self._attr_is_on = True + elif payload == self._payload["off"]: + self._attr_is_on = False + elif payload == PAYLOAD_NONE: + self._attr_is_on = None + + @callback + def _brightness_received(self, msg: ReceiveMessage) -> None: + """Handle new MQTT messages for the brightness.""" + payload = self._value_templates[CONF_BRIGHTNESS_VALUE_TEMPLATE]( + msg.payload, PayloadSentinel.DEFAULT + ) + if payload is PayloadSentinel.DEFAULT or not payload: + _LOGGER.debug("Ignoring empty brightness message from '%s'", msg.topic) + return + + device_value = float(payload) + if device_value == 0: + _LOGGER.debug("Ignoring zero brightness from '%s'", msg.topic) + return + + percent_bright = device_value / self._config[CONF_BRIGHTNESS_SCALE] + self._attr_brightness = min(round(percent_bright * 255), 255) + + @callback + def _rgbx_received( + self, + msg: ReceiveMessage, + template: str, + color_mode: ColorMode, + convert_color: Callable[..., tuple[int, ...]], + ) -> tuple[int, ...] | None: + """Process MQTT messages for RGBW and RGBWW.""" + payload = self._value_templates[template](msg.payload, PayloadSentinel.DEFAULT) + if payload is PayloadSentinel.DEFAULT or not payload: + _LOGGER.debug("Ignoring empty %s message from '%s'", color_mode, msg.topic) + return None + color = tuple(int(val) for val in str(payload).split(",")) + if self._optimistic_color_mode: + self._attr_color_mode = color_mode + if self._topic[CONF_BRIGHTNESS_STATE_TOPIC] is None: + rgb = convert_color(*color) + brightness = max(rgb) + if brightness == 0: + _LOGGER.debug( + "Ignoring %s message with zero rgb brightness from '%s'", + color_mode, + msg.topic, + ) + return None + self._attr_brightness = brightness + # Normalize the color to 100% brightness + color = tuple( + min(round(channel / brightness * 255), 255) for channel in color + ) + return color + + @callback + def _rgb_received(self, msg: ReceiveMessage) -> None: + """Handle new MQTT messages for RGB.""" + rgb = self._rgbx_received( + msg, CONF_RGB_VALUE_TEMPLATE, ColorMode.RGB, lambda *x: x + ) + if rgb is None: + return + self._attr_rgb_color = cast(tuple[int, int, int], rgb) + + @callback + def _rgbw_received(self, msg: ReceiveMessage) -> None: + """Handle new MQTT messages for RGBW.""" + rgbw = self._rgbx_received( + msg, + CONF_RGBW_VALUE_TEMPLATE, + ColorMode.RGBW, + color_util.color_rgbw_to_rgb, + ) + if rgbw is None: + return + self._attr_rgbw_color = cast(tuple[int, int, int, int], rgbw) + + @callback + def _rgbww_received(self, msg: ReceiveMessage) -> None: + """Handle new MQTT messages for RGBWW.""" + + @callback + def _converter( + r: int, g: int, b: int, cw: int, ww: int + ) -> tuple[int, int, int]: + min_kelvin = color_util.color_temperature_mired_to_kelvin(self.max_mireds) + max_kelvin = color_util.color_temperature_mired_to_kelvin(self.min_mireds) + return color_util.color_rgbww_to_rgb( + r, g, b, cw, ww, min_kelvin, max_kelvin + ) + + rgbww = self._rgbx_received( + msg, + CONF_RGBWW_VALUE_TEMPLATE, + ColorMode.RGBWW, + _converter, + ) + if rgbww is None: + return + self._attr_rgbww_color = cast(tuple[int, int, int, int, int], rgbww) + + @callback + def _color_mode_received(self, msg: ReceiveMessage) -> None: + """Handle new MQTT messages for color mode.""" + payload = self._value_templates[CONF_COLOR_MODE_VALUE_TEMPLATE]( + msg.payload, PayloadSentinel.DEFAULT + ) + if payload is PayloadSentinel.DEFAULT or not payload: + _LOGGER.debug("Ignoring empty color mode message from '%s'", msg.topic) + return + + self._attr_color_mode = ColorMode(str(payload)) + + @callback + def _color_temp_received(self, msg: ReceiveMessage) -> None: + """Handle new MQTT messages for color temperature.""" + payload = self._value_templates[CONF_COLOR_TEMP_VALUE_TEMPLATE]( + msg.payload, PayloadSentinel.DEFAULT + ) + if payload is PayloadSentinel.DEFAULT or not payload: + _LOGGER.debug("Ignoring empty color temp message from '%s'", msg.topic) + return + + if self._optimistic_color_mode: + self._attr_color_mode = ColorMode.COLOR_TEMP + self._attr_color_temp = int(payload) + + @callback + def _effect_received(self, msg: ReceiveMessage) -> None: + """Handle new MQTT messages for effect.""" + payload = self._value_templates[CONF_EFFECT_VALUE_TEMPLATE]( + msg.payload, PayloadSentinel.DEFAULT + ) + if payload is PayloadSentinel.DEFAULT or not payload: + _LOGGER.debug("Ignoring empty effect message from '%s'", msg.topic) + return + + self._attr_effect = str(payload) + + @callback + def _hs_received(self, msg: ReceiveMessage) -> None: + """Handle new MQTT messages for hs color.""" + payload = self._value_templates[CONF_HS_VALUE_TEMPLATE]( + msg.payload, PayloadSentinel.DEFAULT + ) + if payload is PayloadSentinel.DEFAULT or not payload: + _LOGGER.debug("Ignoring empty hs message from '%s'", msg.topic) + return + try: + hs_color = tuple(float(val) for val in str(payload).split(",", 2)) + if self._optimistic_color_mode: + self._attr_color_mode = ColorMode.HS + self._attr_hs_color = cast(tuple[float, float], hs_color) + except ValueError: + _LOGGER.warning("Failed to parse hs state update: '%s'", payload) + + @callback + def _xy_received(self, msg: ReceiveMessage) -> None: + """Handle new MQTT messages for xy color.""" + payload = self._value_templates[CONF_XY_VALUE_TEMPLATE]( + msg.payload, PayloadSentinel.DEFAULT + ) + if payload is PayloadSentinel.DEFAULT or not payload: + _LOGGER.debug("Ignoring empty xy-color message from '%s'", msg.topic) + return + + xy_color = tuple(float(val) for val in str(payload).split(",", 2)) + if self._optimistic_color_mode: + self._attr_color_mode = ColorMode.XY + self._attr_xy_color = cast(tuple[float, float], xy_color) + def _prepare_subscribe_topics(self) -> None: # noqa: C901 """(Re)Subscribe to topics.""" topics: dict[str, dict[str, Any]] = {} - def add_topic(topic: str, msg_callback: MessageCallbackType) -> None: + def add_topic( + topic: str, msg_callback: MessageCallbackType, tracked_attributes: set[str] + ) -> None: """Add a topic.""" if self._topic[topic] is not None: topics[topic] = { "topic": self._topic[topic], - "msg_callback": msg_callback, + "msg_callback": partial( + self._message_callback, msg_callback, tracked_attributes + ), + "entity_id": self.entity_id, "qos": self._config[CONF_QOS], "encoding": self._config[CONF_ENCODING] or None, } - @callback - @log_messages(self.hass, self.entity_id) - @write_state_on_attr_change(self, {"_attr_is_on"}) - def state_received(msg: ReceiveMessage) -> None: - """Handle new MQTT messages.""" - payload = self._value_templates[CONF_STATE_VALUE_TEMPLATE]( - msg.payload, PayloadSentinel.NONE - ) - if not payload: - _LOGGER.debug("Ignoring empty state message from '%s'", msg.topic) - return - - if payload == self._payload["on"]: - self._attr_is_on = True - elif payload == self._payload["off"]: - self._attr_is_on = False - elif payload == PAYLOAD_NONE: - self._attr_is_on = None - - if self._topic[CONF_STATE_TOPIC] is not None: - topics[CONF_STATE_TOPIC] = { - "topic": self._topic[CONF_STATE_TOPIC], - "msg_callback": state_received, - "qos": self._config[CONF_QOS], - "encoding": self._config[CONF_ENCODING] or None, - } - - @callback - @log_messages(self.hass, self.entity_id) - @write_state_on_attr_change(self, {"_attr_brightness"}) - def brightness_received(msg: ReceiveMessage) -> None: - """Handle new MQTT messages for the brightness.""" - payload = self._value_templates[CONF_BRIGHTNESS_VALUE_TEMPLATE]( - msg.payload, PayloadSentinel.DEFAULT - ) - if payload is PayloadSentinel.DEFAULT or not payload: - _LOGGER.debug("Ignoring empty brightness message from '%s'", msg.topic) - return - - device_value = float(payload) - if device_value == 0: - _LOGGER.debug("Ignoring zero brightness from '%s'", msg.topic) - return - - percent_bright = device_value / self._config[CONF_BRIGHTNESS_SCALE] - self._attr_brightness = min(round(percent_bright * 255), 255) - - add_topic(CONF_BRIGHTNESS_STATE_TOPIC, brightness_received) - - @callback - def _rgbx_received( - msg: ReceiveMessage, - template: str, - color_mode: ColorMode, - convert_color: Callable[..., tuple[int, ...]], - ) -> tuple[int, ...] | None: - """Handle new MQTT messages for RGBW and RGBWW.""" - payload = self._value_templates[template]( - msg.payload, PayloadSentinel.DEFAULT - ) - if payload is PayloadSentinel.DEFAULT or not payload: - _LOGGER.debug( - "Ignoring empty %s message from '%s'", color_mode, msg.topic - ) - return None - color = tuple(int(val) for val in str(payload).split(",")) - if self._optimistic_color_mode: - self._attr_color_mode = color_mode - if self._topic[CONF_BRIGHTNESS_STATE_TOPIC] is None: - rgb = convert_color(*color) - brightness = max(rgb) - if brightness == 0: - _LOGGER.debug( - "Ignoring %s message with zero rgb brightness from '%s'", - color_mode, - msg.topic, - ) - return None - self._attr_brightness = brightness - # Normalize the color to 100% brightness - color = tuple( - min(round(channel / brightness * 255), 255) for channel in color - ) - return color - - @callback - @log_messages(self.hass, self.entity_id) - @write_state_on_attr_change( - self, {"_attr_brightness", "_attr_color_mode", "_attr_rgb_color"} + add_topic(CONF_STATE_TOPIC, self._state_received, {"_attr_is_on"}) + add_topic( + CONF_BRIGHTNESS_STATE_TOPIC, self._brightness_received, {"_attr_brightness"} ) - def rgb_received(msg: ReceiveMessage) -> None: - """Handle new MQTT messages for RGB.""" - rgb = _rgbx_received( - msg, CONF_RGB_VALUE_TEMPLATE, ColorMode.RGB, lambda *x: x - ) - if rgb is None: - return - self._attr_rgb_color = cast(tuple[int, int, int], rgb) - - add_topic(CONF_RGB_STATE_TOPIC, rgb_received) - - @callback - @log_messages(self.hass, self.entity_id) - @write_state_on_attr_change( - self, {"_attr_brightness", "_attr_color_mode", "_attr_rgbw_color"} + add_topic( + CONF_RGB_STATE_TOPIC, + self._rgb_received, + {"_attr_brightness", "_attr_color_mode", "_attr_rgb_color"}, ) - def rgbw_received(msg: ReceiveMessage) -> None: - """Handle new MQTT messages for RGBW.""" - rgbw = _rgbx_received( - msg, - CONF_RGBW_VALUE_TEMPLATE, - ColorMode.RGBW, - color_util.color_rgbw_to_rgb, - ) - if rgbw is None: - return - self._attr_rgbw_color = cast(tuple[int, int, int, int], rgbw) - - add_topic(CONF_RGBW_STATE_TOPIC, rgbw_received) - - @callback - @log_messages(self.hass, self.entity_id) - @write_state_on_attr_change( - self, {"_attr_brightness", "_attr_color_mode", "_attr_rgbww_color"} + add_topic( + CONF_RGBW_STATE_TOPIC, + self._rgbw_received, + {"_attr_brightness", "_attr_color_mode", "_attr_rgbw_color"}, + ) + add_topic( + CONF_RGBWW_STATE_TOPIC, + self._rgbww_received, + {"_attr_brightness", "_attr_color_mode", "_attr_rgbww_color"}, + ) + add_topic( + CONF_COLOR_MODE_STATE_TOPIC, self._color_mode_received, {"_attr_color_mode"} + ) + add_topic( + CONF_COLOR_TEMP_STATE_TOPIC, + self._color_temp_received, + {"_attr_color_mode", "_attr_color_temp"}, + ) + add_topic(CONF_EFFECT_STATE_TOPIC, self._effect_received, {"_attr_effect"}) + add_topic( + CONF_HS_STATE_TOPIC, + self._hs_received, + {"_attr_color_mode", "_attr_hs_color"}, + ) + add_topic( + CONF_XY_STATE_TOPIC, + self._xy_received, + {"_attr_color_mode", "_attr_xy_color"}, ) - def rgbww_received(msg: ReceiveMessage) -> None: - """Handle new MQTT messages for RGBWW.""" - - @callback - def _converter( - r: int, g: int, b: int, cw: int, ww: int - ) -> tuple[int, int, int]: - min_kelvin = color_util.color_temperature_mired_to_kelvin( - self.max_mireds - ) - max_kelvin = color_util.color_temperature_mired_to_kelvin( - self.min_mireds - ) - return color_util.color_rgbww_to_rgb( - r, g, b, cw, ww, min_kelvin, max_kelvin - ) - - rgbww = _rgbx_received( - msg, - CONF_RGBWW_VALUE_TEMPLATE, - ColorMode.RGBWW, - _converter, - ) - if rgbww is None: - return - self._attr_rgbww_color = cast(tuple[int, int, int, int, int], rgbww) - - add_topic(CONF_RGBWW_STATE_TOPIC, rgbww_received) - - @callback - @log_messages(self.hass, self.entity_id) - @write_state_on_attr_change(self, {"_attr_color_mode"}) - def color_mode_received(msg: ReceiveMessage) -> None: - """Handle new MQTT messages for color mode.""" - payload = self._value_templates[CONF_COLOR_MODE_VALUE_TEMPLATE]( - msg.payload, PayloadSentinel.DEFAULT - ) - if payload is PayloadSentinel.DEFAULT or not payload: - _LOGGER.debug("Ignoring empty color mode message from '%s'", msg.topic) - return - - self._attr_color_mode = ColorMode(str(payload)) - - add_topic(CONF_COLOR_MODE_STATE_TOPIC, color_mode_received) - - @callback - @log_messages(self.hass, self.entity_id) - @write_state_on_attr_change(self, {"_attr_color_mode", "_attr_color_temp"}) - def color_temp_received(msg: ReceiveMessage) -> None: - """Handle new MQTT messages for color temperature.""" - payload = self._value_templates[CONF_COLOR_TEMP_VALUE_TEMPLATE]( - msg.payload, PayloadSentinel.DEFAULT - ) - if payload is PayloadSentinel.DEFAULT or not payload: - _LOGGER.debug("Ignoring empty color temp message from '%s'", msg.topic) - return - - if self._optimistic_color_mode: - self._attr_color_mode = ColorMode.COLOR_TEMP - self._attr_color_temp = int(payload) - - add_topic(CONF_COLOR_TEMP_STATE_TOPIC, color_temp_received) - - @callback - @log_messages(self.hass, self.entity_id) - @write_state_on_attr_change(self, {"_attr_effect"}) - def effect_received(msg: ReceiveMessage) -> None: - """Handle new MQTT messages for effect.""" - payload = self._value_templates[CONF_EFFECT_VALUE_TEMPLATE]( - msg.payload, PayloadSentinel.DEFAULT - ) - if payload is PayloadSentinel.DEFAULT or not payload: - _LOGGER.debug("Ignoring empty effect message from '%s'", msg.topic) - return - - self._attr_effect = str(payload) - - add_topic(CONF_EFFECT_STATE_TOPIC, effect_received) - - @callback - @log_messages(self.hass, self.entity_id) - @write_state_on_attr_change(self, {"_attr_color_mode", "_attr_hs_color"}) - def hs_received(msg: ReceiveMessage) -> None: - """Handle new MQTT messages for hs color.""" - payload = self._value_templates[CONF_HS_VALUE_TEMPLATE]( - msg.payload, PayloadSentinel.DEFAULT - ) - if payload is PayloadSentinel.DEFAULT or not payload: - _LOGGER.debug("Ignoring empty hs message from '%s'", msg.topic) - return - try: - hs_color = tuple(float(val) for val in str(payload).split(",", 2)) - if self._optimistic_color_mode: - self._attr_color_mode = ColorMode.HS - self._attr_hs_color = cast(tuple[float, float], hs_color) - except ValueError: - _LOGGER.warning("Failed to parse hs state update: '%s'", payload) - - add_topic(CONF_HS_STATE_TOPIC, hs_received) - - @callback - @log_messages(self.hass, self.entity_id) - @write_state_on_attr_change(self, {"_attr_color_mode", "_attr_xy_color"}) - def xy_received(msg: ReceiveMessage) -> None: - """Handle new MQTT messages for xy color.""" - payload = self._value_templates[CONF_XY_VALUE_TEMPLATE]( - msg.payload, PayloadSentinel.DEFAULT - ) - if payload is PayloadSentinel.DEFAULT or not payload: - _LOGGER.debug("Ignoring empty xy-color message from '%s'", msg.topic) - return - - xy_color = tuple(float(val) for val in str(payload).split(",", 2)) - if self._optimistic_color_mode: - self._attr_color_mode = ColorMode.XY - self._attr_xy_color = cast(tuple[float, float], xy_color) - - add_topic(CONF_XY_STATE_TOPIC, xy_received) self._sub_state = subscription.async_prepare_subscribe_topics( self.hass, self._sub_state, topics diff --git a/homeassistant/components/mqtt/light/schema_json.py b/homeassistant/components/mqtt/light/schema_json.py index 52fbf3429b6..14e477d0c35 100644 --- a/homeassistant/components/mqtt/light/schema_json.py +++ b/homeassistant/components/mqtt/light/schema_json.py @@ -4,6 +4,7 @@ from __future__ import annotations from collections.abc import Callable from contextlib import suppress +from functools import partial import logging from typing import TYPE_CHECKING, Any, cast @@ -66,8 +67,7 @@ from ..const import ( CONF_STATE_TOPIC, DOMAIN as MQTT_DOMAIN, ) -from ..debug_info import log_messages -from ..mixins import MqttEntity, write_state_on_attr_change +from ..mixins import MqttEntity from ..models import ReceiveMessage from ..schemas import MQTT_ENTITY_COMMON_SCHEMA from ..util import valid_subscribe_topic @@ -414,114 +414,117 @@ class MqttLightJson(MqttEntity, LightEntity, RestoreEntity): self.entity_id, ) + @callback + def _state_received(self, msg: ReceiveMessage) -> None: + """Handle new MQTT messages.""" + values = json_loads_object(msg.payload) + + if values["state"] == "ON": + self._attr_is_on = True + elif values["state"] == "OFF": + self._attr_is_on = False + elif values["state"] is None: + self._attr_is_on = None + + if ( + self._deprecated_color_handling + and color_supported(self.supported_color_modes) + and "color" in values + ): + # Deprecated color handling + if values["color"] is None: + self._attr_hs_color = None + else: + self._update_color(values) + + if not self._deprecated_color_handling and "color_mode" in values: + self._update_color(values) + + if brightness_supported(self.supported_color_modes): + try: + if brightness := values["brightness"]: + if TYPE_CHECKING: + assert isinstance(brightness, float) + self._attr_brightness = color_util.value_to_brightness( + (1, self._config[CONF_BRIGHTNESS_SCALE]), brightness + ) + else: + _LOGGER.debug( + "Ignoring zero brightness value for entity %s", + self.entity_id, + ) + + except KeyError: + pass + except (TypeError, ValueError): + _LOGGER.warning( + "Invalid brightness value '%s' received for entity %s", + values["brightness"], + self.entity_id, + ) + + if ( + self._deprecated_color_handling + and self.supported_color_modes + and ColorMode.COLOR_TEMP in self.supported_color_modes + ): + # Deprecated color handling + try: + if values["color_temp"] is None: + self._attr_color_temp = None + else: + self._attr_color_temp = int(values["color_temp"]) # type: ignore[arg-type] + except KeyError: + pass + except ValueError: + _LOGGER.warning( + "Invalid color temp value '%s' received for entity %s", + values["color_temp"], + self.entity_id, + ) + # Allow to switch back to color_temp + if "color" not in values: + self._attr_hs_color = None + + if self.supported_features and LightEntityFeature.EFFECT: + with suppress(KeyError): + self._attr_effect = cast(str, values["effect"]) + def _prepare_subscribe_topics(self) -> None: """(Re)Subscribe to topics.""" - @callback - @log_messages(self.hass, self.entity_id) - @write_state_on_attr_change( - self, + # + if self._topic[CONF_STATE_TOPIC] is None: + return + + self._sub_state = subscription.async_prepare_subscribe_topics( + self.hass, + self._sub_state, { - "_attr_brightness", - "_attr_color_temp", - "_attr_effect", - "_attr_hs_color", - "_attr_is_on", - "_attr_rgb_color", - "_attr_rgbw_color", - "_attr_rgbww_color", - "_attr_xy_color", - "color_mode", + CONF_STATE_TOPIC: { + "topic": self._topic[CONF_STATE_TOPIC], + "msg_callback": partial( + self._message_callback, + self._state_received, + { + "_attr_brightness", + "_attr_color_temp", + "_attr_effect", + "_attr_hs_color", + "_attr_is_on", + "_attr_rgb_color", + "_attr_rgbw_color", + "_attr_rgbww_color", + "_attr_xy_color", + "color_mode", + }, + ), + "entity_id": self.entity_id, + "qos": self._config[CONF_QOS], + "encoding": self._config[CONF_ENCODING] or None, + } }, ) - def state_received(msg: ReceiveMessage) -> None: - """Handle new MQTT messages.""" - values = json_loads_object(msg.payload) - - if values["state"] == "ON": - self._attr_is_on = True - elif values["state"] == "OFF": - self._attr_is_on = False - elif values["state"] is None: - self._attr_is_on = None - - if ( - self._deprecated_color_handling - and color_supported(self.supported_color_modes) - and "color" in values - ): - # Deprecated color handling - if values["color"] is None: - self._attr_hs_color = None - else: - self._update_color(values) - - if not self._deprecated_color_handling and "color_mode" in values: - self._update_color(values) - - if brightness_supported(self.supported_color_modes): - try: - if brightness := values["brightness"]: - if TYPE_CHECKING: - assert isinstance(brightness, float) - self._attr_brightness = color_util.value_to_brightness( - (1, self._config[CONF_BRIGHTNESS_SCALE]), brightness - ) - else: - _LOGGER.debug( - "Ignoring zero brightness value for entity %s", - self.entity_id, - ) - - except KeyError: - pass - except (TypeError, ValueError): - _LOGGER.warning( - "Invalid brightness value '%s' received for entity %s", - values["brightness"], - self.entity_id, - ) - - if ( - self._deprecated_color_handling - and self.supported_color_modes - and ColorMode.COLOR_TEMP in self.supported_color_modes - ): - # Deprecated color handling - try: - if values["color_temp"] is None: - self._attr_color_temp = None - else: - self._attr_color_temp = int(values["color_temp"]) # type: ignore[arg-type] - except KeyError: - pass - except ValueError: - _LOGGER.warning( - "Invalid color temp value '%s' received for entity %s", - values["color_temp"], - self.entity_id, - ) - # Allow to switch back to color_temp - if "color" not in values: - self._attr_hs_color = None - - if self.supported_features and LightEntityFeature.EFFECT: - with suppress(KeyError): - self._attr_effect = cast(str, values["effect"]) - - if self._topic[CONF_STATE_TOPIC] is not None: - self._sub_state = subscription.async_prepare_subscribe_topics( - self.hass, - self._sub_state, - { - "state_topic": { - "topic": self._topic[CONF_STATE_TOPIC], - "msg_callback": state_received, - "qos": self._config[CONF_QOS], - "encoding": self._config[CONF_ENCODING] or None, - } - }, - ) async def _subscribe_topics(self) -> None: """(Re)Subscribe to topics.""" diff --git a/homeassistant/components/mqtt/light/schema_template.py b/homeassistant/components/mqtt/light/schema_template.py index 651b691e28e..647bf6df401 100644 --- a/homeassistant/components/mqtt/light/schema_template.py +++ b/homeassistant/components/mqtt/light/schema_template.py @@ -3,6 +3,7 @@ from __future__ import annotations from collections.abc import Callable +from functools import partial import logging from typing import Any @@ -44,8 +45,7 @@ from ..const import ( CONF_STATE_TOPIC, PAYLOAD_NONE, ) -from ..debug_info import log_messages -from ..mixins import MqttEntity, write_state_on_attr_change +from ..mixins import MqttEntity from ..models import ( MqttCommandTemplate, MqttValueTemplate, @@ -188,103 +188,103 @@ class MqttLightTemplate(MqttEntity, LightEntity, RestoreEntity): # Support for ct + hs, prioritize hs self._attr_color_mode = ColorMode.HS if self.hs_color else ColorMode.COLOR_TEMP + @callback + def _state_received(self, msg: ReceiveMessage) -> None: + """Handle new MQTT messages.""" + state = self._value_templates[CONF_STATE_TEMPLATE](msg.payload) + if state == STATE_ON: + self._attr_is_on = True + elif state == STATE_OFF: + self._attr_is_on = False + elif state == PAYLOAD_NONE: + self._attr_is_on = None + else: + _LOGGER.warning("Invalid state value received") + + if CONF_BRIGHTNESS_TEMPLATE in self._config: + try: + if brightness := int( + self._value_templates[CONF_BRIGHTNESS_TEMPLATE](msg.payload) + ): + self._attr_brightness = brightness + else: + _LOGGER.debug( + "Ignoring zero brightness value for entity %s", + self.entity_id, + ) + + except ValueError: + _LOGGER.warning("Invalid brightness value received from %s", msg.topic) + + if CONF_COLOR_TEMP_TEMPLATE in self._config: + try: + color_temp = self._value_templates[CONF_COLOR_TEMP_TEMPLATE]( + msg.payload + ) + self._attr_color_temp = ( + int(color_temp) if color_temp != "None" else None + ) + except ValueError: + _LOGGER.warning("Invalid color temperature value received") + + if ( + CONF_RED_TEMPLATE in self._config + and CONF_GREEN_TEMPLATE in self._config + and CONF_BLUE_TEMPLATE in self._config + ): + try: + red = self._value_templates[CONF_RED_TEMPLATE](msg.payload) + green = self._value_templates[CONF_GREEN_TEMPLATE](msg.payload) + blue = self._value_templates[CONF_BLUE_TEMPLATE](msg.payload) + if red == "None" and green == "None" and blue == "None": + self._attr_hs_color = None + else: + self._attr_hs_color = color_util.color_RGB_to_hs( + int(red), int(green), int(blue) + ) + self._update_color_mode() + except ValueError: + _LOGGER.warning("Invalid color value received") + + if CONF_EFFECT_TEMPLATE in self._config: + effect = str(self._value_templates[CONF_EFFECT_TEMPLATE](msg.payload)) + if ( + effect_list := self._config[CONF_EFFECT_LIST] + ) and effect in effect_list: + self._attr_effect = effect + else: + _LOGGER.warning("Unsupported effect value received") + def _prepare_subscribe_topics(self) -> None: """(Re)Subscribe to topics.""" - @callback - @log_messages(self.hass, self.entity_id) - @write_state_on_attr_change( - self, + if self._topics[CONF_STATE_TOPIC] is None: + return + + self._sub_state = subscription.async_prepare_subscribe_topics( + self.hass, + self._sub_state, { - "_attr_brightness", - "_attr_color_mode", - "_attr_color_temp", - "_attr_effect", - "_attr_hs_color", - "_attr_is_on", + "state_topic": { + "topic": self._topics[CONF_STATE_TOPIC], + "msg_callback": partial( + self._message_callback, + self._state_received, + { + "_attr_brightness", + "_attr_color_mode", + "_attr_color_temp", + "_attr_effect", + "_attr_hs_color", + "_attr_is_on", + }, + ), + "entity_id": self.entity_id, + "qos": self._config[CONF_QOS], + "encoding": self._config[CONF_ENCODING] or None, + } }, ) - def state_received(msg: ReceiveMessage) -> None: - """Handle new MQTT messages.""" - state = self._value_templates[CONF_STATE_TEMPLATE](msg.payload) - if state == STATE_ON: - self._attr_is_on = True - elif state == STATE_OFF: - self._attr_is_on = False - elif state == PAYLOAD_NONE: - self._attr_is_on = None - else: - _LOGGER.warning("Invalid state value received") - - if CONF_BRIGHTNESS_TEMPLATE in self._config: - try: - if brightness := int( - self._value_templates[CONF_BRIGHTNESS_TEMPLATE](msg.payload) - ): - self._attr_brightness = brightness - else: - _LOGGER.debug( - "Ignoring zero brightness value for entity %s", - self.entity_id, - ) - - except ValueError: - _LOGGER.warning( - "Invalid brightness value received from %s", msg.topic - ) - - if CONF_COLOR_TEMP_TEMPLATE in self._config: - try: - color_temp = self._value_templates[CONF_COLOR_TEMP_TEMPLATE]( - msg.payload - ) - self._attr_color_temp = ( - int(color_temp) if color_temp != "None" else None - ) - except ValueError: - _LOGGER.warning("Invalid color temperature value received") - - if ( - CONF_RED_TEMPLATE in self._config - and CONF_GREEN_TEMPLATE in self._config - and CONF_BLUE_TEMPLATE in self._config - ): - try: - red = self._value_templates[CONF_RED_TEMPLATE](msg.payload) - green = self._value_templates[CONF_GREEN_TEMPLATE](msg.payload) - blue = self._value_templates[CONF_BLUE_TEMPLATE](msg.payload) - if red == "None" and green == "None" and blue == "None": - self._attr_hs_color = None - else: - self._attr_hs_color = color_util.color_RGB_to_hs( - int(red), int(green), int(blue) - ) - self._update_color_mode() - except ValueError: - _LOGGER.warning("Invalid color value received") - - if CONF_EFFECT_TEMPLATE in self._config: - effect = str(self._value_templates[CONF_EFFECT_TEMPLATE](msg.payload)) - if ( - effect_list := self._config[CONF_EFFECT_LIST] - ) and effect in effect_list: - self._attr_effect = effect - else: - _LOGGER.warning("Unsupported effect value received") - - if self._topics[CONF_STATE_TOPIC] is not None: - self._sub_state = subscription.async_prepare_subscribe_topics( - self.hass, - self._sub_state, - { - "state_topic": { - "topic": self._topics[CONF_STATE_TOPIC], - "msg_callback": state_received, - "qos": self._config[CONF_QOS], - "encoding": self._config[CONF_ENCODING] or None, - } - }, - ) async def _subscribe_topics(self) -> None: """(Re)Subscribe to topics."""