From 042ce1cb927306610c90dd2bce7200b48d15b3a3 Mon Sep 17 00:00:00 2001 From: Erik Montnemery Date: Wed, 14 Oct 2020 17:47:13 +0200 Subject: [PATCH] Add Tasmota light (#41485) --- .../components/tasmota/binary_sensor.py | 2 +- homeassistant/components/tasmota/discovery.py | 5 +- homeassistant/components/tasmota/light.py | 233 ++++++ homeassistant/components/tasmota/mixins.py | 4 +- tests/components/tasmota/test_light.py | 747 ++++++++++++++++++ tests/components/tasmota/test_switch.py | 34 + 6 files changed, 1019 insertions(+), 6 deletions(-) create mode 100644 homeassistant/components/tasmota/light.py create mode 100644 tests/components/tasmota/test_light.py diff --git a/homeassistant/components/tasmota/binary_sensor.py b/homeassistant/components/tasmota/binary_sensor.py index d1154756557..4e32ed25194 100644 --- a/homeassistant/components/tasmota/binary_sensor.py +++ b/homeassistant/components/tasmota/binary_sensor.py @@ -61,7 +61,7 @@ class TasmotaBinarySensor( @callback def state_updated(self, state, **kwargs): - """Handle new MQTT state messages.""" + """Handle state updates.""" self._state = state if self._delay_listener is not None: diff --git a/homeassistant/components/tasmota/discovery.py b/homeassistant/components/tasmota/discovery.py index c8b00910b34..8d4e2695b79 100644 --- a/homeassistant/components/tasmota/discovery.py +++ b/homeassistant/components/tasmota/discovery.py @@ -1,4 +1,4 @@ -"""Support for MQTT discovery.""" +"""Support for Tasmota device discovery.""" import asyncio import logging @@ -22,6 +22,7 @@ _LOGGER = logging.getLogger(__name__) SUPPORTED_PLATFORMS = [ "binary_sensor", + "light", "switch", ] @@ -46,7 +47,7 @@ def set_discovery_hash(hass, discovery_hash): async def async_start( hass: HomeAssistantType, discovery_topic, config_entry, tasmota_mqtt ) -> bool: - """Start MQTT Discovery.""" + """Start Tasmota device discovery.""" async def _load_platform(platform): """Load a Tasmota platform if not already done.""" diff --git a/homeassistant/components/tasmota/light.py b/homeassistant/components/tasmota/light.py new file mode 100644 index 00000000000..b0993e3cdfc --- /dev/null +++ b/homeassistant/components/tasmota/light.py @@ -0,0 +1,233 @@ +"""Support for Tasmota lights.""" +import logging + +from hatasmota.light import ( + LIGHT_TYPE_COLDWARM, + LIGHT_TYPE_NONE, + LIGHT_TYPE_RGB, + LIGHT_TYPE_RGBCW, + LIGHT_TYPE_RGBW, +) + +from homeassistant.components import light +from homeassistant.components.light import ( + ATTR_BRIGHTNESS, + ATTR_COLOR_TEMP, + ATTR_EFFECT, + ATTR_HS_COLOR, + ATTR_TRANSITION, + ATTR_WHITE_VALUE, + SUPPORT_BRIGHTNESS, + SUPPORT_COLOR, + SUPPORT_COLOR_TEMP, + SUPPORT_EFFECT, + SUPPORT_TRANSITION, + SUPPORT_WHITE_VALUE, + LightEntity, +) +from homeassistant.core import callback +from homeassistant.helpers.dispatcher import async_dispatcher_connect +import homeassistant.util.color as color_util + +from .const import DOMAIN as TASMOTA_DOMAIN +from .discovery import TASMOTA_DISCOVERY_ENTITY_NEW +from .mixins import TasmotaAvailability, TasmotaDiscoveryUpdate + +_LOGGER = logging.getLogger(__name__) + +DEFAULT_BRIGHTNESS_MAX = 255 +TASMOTA_BRIGHTNESS_MAX = 100 + + +async def async_setup_entry(hass, config_entry, async_add_entities): + """Set up Tasmota light dynamically through discovery.""" + + @callback + def async_discover(tasmota_entity, discovery_hash): + """Discover and add a Tasmota light.""" + async_add_entities( + [TasmotaLight(tasmota_entity=tasmota_entity, discovery_hash=discovery_hash)] + ) + + async_dispatcher_connect( + hass, + TASMOTA_DISCOVERY_ENTITY_NEW.format(light.DOMAIN, TASMOTA_DOMAIN), + async_discover, + ) + + +class TasmotaLight( + TasmotaAvailability, + TasmotaDiscoveryUpdate, + LightEntity, +): + """Representation of a Tasmota light.""" + + def __init__(self, **kwds): + """Initialize Tasmota light.""" + self._state = False + self._supported_features = 0 + + self._brightness = None + self._color_temp = None + self._effect = None + self._hs = None + self._white_value = None + self._flash_times = None + + super().__init__( + discovery_update=self.discovery_update, + **kwds, + ) + + self._setup_from_entity() + + async def discovery_update(self, update): + """Handle updated discovery message.""" + self._setup_from_entity() + await super().discovery_update(update) + + def _setup_from_entity(self): + """(Re)Setup the entity.""" + supported_features = 0 + light_type = self._tasmota_entity.light_type + + if light_type != LIGHT_TYPE_NONE: + supported_features |= SUPPORT_BRIGHTNESS + supported_features |= SUPPORT_TRANSITION + + if light_type in [LIGHT_TYPE_COLDWARM, LIGHT_TYPE_RGBCW]: + supported_features |= SUPPORT_COLOR_TEMP + + if light_type in [LIGHT_TYPE_RGB, LIGHT_TYPE_RGBW, LIGHT_TYPE_RGBCW]: + supported_features |= SUPPORT_COLOR + supported_features |= SUPPORT_EFFECT + + if light_type in [LIGHT_TYPE_RGBW, LIGHT_TYPE_RGBCW]: + supported_features |= SUPPORT_WHITE_VALUE + + self._supported_features = supported_features + + @callback + def state_updated(self, state, **kwargs): + """Handle state updates.""" + self._state = state + attributes = kwargs.get("attributes") + if attributes: + if "brightness" in attributes: + brightness = float(attributes["brightness"]) + percent_bright = brightness / TASMOTA_BRIGHTNESS_MAX + self._brightness = percent_bright * 255 + if "color" in attributes: + color = attributes["color"] + self._hs = color_util.color_RGB_to_hs(*color) + if "color_temp" in attributes: + self._color_temp = attributes["color_temp"] + if "effect" in attributes: + self._effect = attributes["effect"] + if "white_value" in attributes: + white_value = float(attributes["white_value"]) + percent_white = white_value / TASMOTA_BRIGHTNESS_MAX + self._white_value = percent_white * 255 + self.async_write_ha_state() + + @property + def brightness(self): + """Return the brightness of this light between 0..255.""" + return self._brightness + + @property + def color_temp(self): + """Return the color temperature in mired.""" + return self._color_temp + + @property + def min_mireds(self): + """Return the coldest color_temp that this light supports.""" + return self._tasmota_entity.min_mireds + + @property + def max_mireds(self): + """Return the warmest color_temp that this light supports.""" + return self._tasmota_entity.max_mireds + + @property + def effect(self): + """Return the current effect.""" + return self._effect + + @property + def effect_list(self): + """Return the list of supported effects.""" + return self._tasmota_entity.effect_list + + @property + def hs_color(self): + """Return the hs color value.""" + return self._hs + + @property + def white_value(self): + """Return the white property.""" + return self._white_value + + @property + def is_on(self): + """Return true if device is on.""" + return self._state + + @property + def supported_features(self): + """Flag supported features.""" + return self._supported_features + + async def async_turn_on(self, **kwargs): + """Turn the entity on.""" + supported_features = self._supported_features + + attributes = {} + + if ATTR_HS_COLOR in kwargs and supported_features & SUPPORT_COLOR: + hs_color = kwargs[ATTR_HS_COLOR] + attributes["color"] = {} + + rgb = color_util.color_hsv_to_RGB(hs_color[0], hs_color[1], 100) + attributes["color"] = [rgb[0], rgb[1], rgb[2]] + + if ATTR_TRANSITION in kwargs: + attributes["transition"] = kwargs[ATTR_TRANSITION] + + if ATTR_BRIGHTNESS in kwargs and supported_features & SUPPORT_BRIGHTNESS: + brightness_normalized = kwargs[ATTR_BRIGHTNESS] / DEFAULT_BRIGHTNESS_MAX + device_brightness = min( + round(brightness_normalized * TASMOTA_BRIGHTNESS_MAX), + TASMOTA_BRIGHTNESS_MAX, + ) + # Make sure the brightness is not rounded down to 0 + device_brightness = max(device_brightness, 1) + attributes["brightness"] = device_brightness + + if ATTR_COLOR_TEMP in kwargs and supported_features & SUPPORT_COLOR_TEMP: + attributes["color_temp"] = int(kwargs[ATTR_COLOR_TEMP]) + + if ATTR_EFFECT in kwargs: + attributes["effect"] = kwargs[ATTR_EFFECT] + + if ATTR_WHITE_VALUE in kwargs: + white_value_normalized = kwargs[ATTR_WHITE_VALUE] / DEFAULT_BRIGHTNESS_MAX + device_white_value = min( + round(white_value_normalized * TASMOTA_BRIGHTNESS_MAX), + TASMOTA_BRIGHTNESS_MAX, + ) + attributes["white_value"] = device_white_value + + self._tasmota_entity.set_state(True, attributes) + + async def async_turn_off(self, **kwargs): + """Turn the entity off.""" + attributes = {"state": "OFF"} + + if ATTR_TRANSITION in kwargs: + attributes["transition"] = kwargs[ATTR_TRANSITION] + + self._tasmota_entity.set_state(False, attributes) diff --git a/homeassistant/components/tasmota/mixins.py b/homeassistant/components/tasmota/mixins.py index 3e25babf658..e3ca2a1949d 100644 --- a/homeassistant/components/tasmota/mixins.py +++ b/homeassistant/components/tasmota/mixins.py @@ -16,8 +16,6 @@ from .discovery import ( set_discovery_hash, ) -DATA_MQTT = "mqtt" - _LOGGER = logging.getLogger(__name__) @@ -52,7 +50,7 @@ class TasmotaEntity(Entity): @callback def state_updated(self, state, **kwargs): - """Handle new MQTT state messages.""" + """Handle state updates.""" self._state = state self.async_write_ha_state() diff --git a/tests/components/tasmota/test_light.py b/tests/components/tasmota/test_light.py new file mode 100644 index 00000000000..5636cd92d49 --- /dev/null +++ b/tests/components/tasmota/test_light.py @@ -0,0 +1,747 @@ +"""The tests for the Tasmota light platform.""" +import copy +import json + +from homeassistant.components import light +from homeassistant.components.light import ( + SUPPORT_BRIGHTNESS, + SUPPORT_COLOR, + SUPPORT_COLOR_TEMP, + SUPPORT_EFFECT, + SUPPORT_TRANSITION, + SUPPORT_WHITE_VALUE, +) +from homeassistant.components.tasmota.const import DEFAULT_PREFIX +from homeassistant.const import ATTR_ASSUMED_STATE, STATE_OFF, STATE_ON + +from .test_common import ( + DEFAULT_CONFIG, + help_test_availability, + help_test_availability_discovery_update, + help_test_availability_poll_state, + help_test_availability_when_connection_lost, + help_test_discovery_device_remove, + help_test_discovery_removal, + help_test_discovery_update_unchanged, + help_test_entity_id_update_discovery_update, + help_test_entity_id_update_subscriptions, +) + +from tests.async_mock import patch +from tests.common import async_fire_mqtt_message +from tests.components.light import common + + +async def test_attributes_on_off(hass, mqtt_mock, setup_tasmota): + """Test state update via MQTT.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 1 + config["so"]["30"] = 1 # Enforce Home Assistant auto-discovery as light + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON"}') + + state = hass.states.get("light.test") + assert state.attributes.get("effect_list") is None + assert state.attributes.get("min_mireds") is None + assert state.attributes.get("max_mireds") is None + assert state.attributes.get("supported_features") == 0 + + +async def test_attributes_dimmer(hass, mqtt_mock, setup_tasmota): + """Test state update via MQTT.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 1 # 1 channel light (dimmer) + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON"}') + + state = hass.states.get("light.test") + assert state.attributes.get("effect_list") is None + assert state.attributes.get("min_mireds") is None + assert state.attributes.get("max_mireds") is None + assert ( + state.attributes.get("supported_features") + == SUPPORT_BRIGHTNESS | SUPPORT_TRANSITION + ) + + +async def test_attributes_ct(hass, mqtt_mock, setup_tasmota): + """Test state update via MQTT.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 2 # 2 channel light (CW) + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON"}') + + state = hass.states.get("light.test") + assert state.attributes.get("effect_list") is None + assert state.attributes.get("min_mireds") == 153 + assert state.attributes.get("max_mireds") == 500 + assert ( + state.attributes.get("supported_features") + == SUPPORT_BRIGHTNESS | SUPPORT_COLOR_TEMP | SUPPORT_TRANSITION + ) + + +async def test_attributes_ct_reduced(hass, mqtt_mock, setup_tasmota): + """Test state update via MQTT.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 2 # 2 channel light (CW) + config["so"]["82"] = 1 # Reduced CT range + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON"}') + + state = hass.states.get("light.test") + assert state.attributes.get("effect_list") is None + assert state.attributes.get("min_mireds") == 200 + assert state.attributes.get("max_mireds") == 380 + assert ( + state.attributes.get("supported_features") + == SUPPORT_BRIGHTNESS | SUPPORT_COLOR_TEMP | SUPPORT_TRANSITION + ) + + +async def test_attributes_rgb(hass, mqtt_mock, setup_tasmota): + """Test state update via MQTT.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 3 # 3 channel light (RGB) + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON"}') + + state = hass.states.get("light.test") + assert state.attributes.get("effect_list") == [ + "None", + "Wake up", + "Cycle up", + "Cycle down", + "Random", + ] + assert state.attributes.get("min_mireds") is None + assert state.attributes.get("max_mireds") is None + assert ( + state.attributes.get("supported_features") + == SUPPORT_BRIGHTNESS | SUPPORT_COLOR | SUPPORT_EFFECT | SUPPORT_TRANSITION + ) + + +async def test_attributes_rgbw(hass, mqtt_mock, setup_tasmota): + """Test state update via MQTT.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 4 # 5 channel light (RGBW) + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON"}') + + state = hass.states.get("light.test") + assert state.attributes.get("effect_list") == [ + "None", + "Wake up", + "Cycle up", + "Cycle down", + "Random", + ] + assert state.attributes.get("min_mireds") is None + assert state.attributes.get("max_mireds") is None + assert ( + state.attributes.get("supported_features") + == SUPPORT_BRIGHTNESS + | SUPPORT_COLOR + | SUPPORT_EFFECT + | SUPPORT_TRANSITION + | SUPPORT_WHITE_VALUE + ) + + +async def test_attributes_rgbww(hass, mqtt_mock, setup_tasmota): + """Test state update via MQTT.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 5 # 5 channel light (RGBCW) + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON"}') + + state = hass.states.get("light.test") + assert state.attributes.get("effect_list") == [ + "None", + "Wake up", + "Cycle up", + "Cycle down", + "Random", + ] + assert state.attributes.get("min_mireds") == 153 + assert state.attributes.get("max_mireds") == 500 + assert ( + state.attributes.get("supported_features") + == SUPPORT_BRIGHTNESS + | SUPPORT_COLOR + | SUPPORT_COLOR_TEMP + | SUPPORT_EFFECT + | SUPPORT_TRANSITION + | SUPPORT_WHITE_VALUE + ) + + +async def test_attributes_rgbww_reduced(hass, mqtt_mock, setup_tasmota): + """Test state update via MQTT.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 5 # 5 channel light (RGBCW) + config["so"]["82"] = 1 # Reduced CT range + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON"}') + + state = hass.states.get("light.test") + assert state.attributes.get("effect_list") == [ + "None", + "Wake up", + "Cycle up", + "Cycle down", + "Random", + ] + assert state.attributes.get("min_mireds") == 200 + assert state.attributes.get("max_mireds") == 380 + assert ( + state.attributes.get("supported_features") + == SUPPORT_BRIGHTNESS + | SUPPORT_COLOR + | SUPPORT_COLOR_TEMP + | SUPPORT_EFFECT + | SUPPORT_TRANSITION + | SUPPORT_WHITE_VALUE + ) + + +async def test_controlling_state_via_mqtt(hass, mqtt_mock, setup_tasmota): + """Test state update via MQTT.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 5 # 5 channel light (RGBCW) + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + + state = hass.states.get("light.test") + assert state.state == "unavailable" + assert not state.attributes.get(ATTR_ASSUMED_STATE) + + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + state = hass.states.get("light.test") + assert state.state == STATE_OFF + assert not state.attributes.get(ATTR_ASSUMED_STATE) + + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON"}') + state = hass.states.get("light.test") + assert state.state == STATE_ON + + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"OFF"}') + state = hass.states.get("light.test") + assert state.state == STATE_OFF + + async_fire_mqtt_message( + hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON","Dimmer":50}' + ) + state = hass.states.get("light.test") + assert state.state == STATE_ON + assert state.attributes.get("brightness") == 127.5 + + async_fire_mqtt_message( + hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON","Color":"255,128,0"}' + ) + state = hass.states.get("light.test") + assert state.state == STATE_ON + assert state.attributes.get("rgb_color") == (255, 128, 0) + + async_fire_mqtt_message( + hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON","White":50}' + ) + state = hass.states.get("light.test") + assert state.state == STATE_ON + assert state.attributes.get("white_value") == 127.5 + + async_fire_mqtt_message( + hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON","CT":300}' + ) + state = hass.states.get("light.test") + assert state.state == STATE_ON + assert state.attributes.get("color_temp") == 300 + + async_fire_mqtt_message( + hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON","Scheme":3}' + ) + state = hass.states.get("light.test") + assert state.state == STATE_ON + assert state.attributes.get("effect") == "Cycle down" + + +async def test_sending_mqtt_commands(hass, mqtt_mock, setup_tasmota): + """Test the sending MQTT commands.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 5 # 5 channel light (RGBCW) + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + state = hass.states.get("light.test") + assert state.state == STATE_OFF + await hass.async_block_till_done() + await hass.async_block_till_done() + mqtt_mock.async_publish.reset_mock() + + # Turn the light on and verify MQTT message is sent + await common.async_turn_on(hass, "light.test") + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", "Fade 0;Power1 ON", 0, False + ) + mqtt_mock.async_publish.reset_mock() + + # Tasmota is not optimistic, the state should still be off + state = hass.states.get("light.test") + assert state.state == STATE_OFF + + # Turn the light off and verify MQTT message is sent + await common.async_turn_off(hass, "light.test") + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", "Fade 0;Power1 OFF", 0, False + ) + mqtt_mock.async_publish.reset_mock() + + # Turn the light on and verify MQTT messages are sent + await common.async_turn_on(hass, "light.test", brightness=192) + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", "Fade 0;Dimmer 75", 0, False + ) + mqtt_mock.async_publish.reset_mock() + + await common.async_turn_on(hass, "light.test", rgb_color=[255, 128, 0]) + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", "Fade 0;Power1 ON;Color2 255,128,0", 0, False + ) + mqtt_mock.async_publish.reset_mock() + + await common.async_turn_on(hass, "light.test", color_temp=200) + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", "Fade 0;Power1 ON;CT 200", 0, False + ) + mqtt_mock.async_publish.reset_mock() + + await common.async_turn_on(hass, "light.test", white_value=128) + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", "Fade 0;Power1 ON;White 50", 0, False + ) + mqtt_mock.async_publish.reset_mock() + + await common.async_turn_on(hass, "light.test", effect="Random") + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", "Fade 0;Power1 ON;Scheme 4", 0, False + ) + mqtt_mock.async_publish.reset_mock() + + +async def test_transition(hass, mqtt_mock, setup_tasmota): + """Test transition commands.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 5 # 5 channel light (RGBCW) + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + state = hass.states.get("light.test") + assert state.state == STATE_OFF + await hass.async_block_till_done() + await hass.async_block_till_done() + mqtt_mock.async_publish.reset_mock() + + # Dim the light from 0->100: Speed should be 4*2=8 + await common.async_turn_on(hass, "light.test", brightness=255, transition=4) + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", "Fade 1;Speed 8;Dimmer 100", 0, False + ) + mqtt_mock.async_publish.reset_mock() + + # Dim the light from 0->50: Speed should be 4*2/2=4 + await common.async_turn_on(hass, "light.test", brightness=128, transition=4) + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", "Fade 1;Speed 4;Dimmer 50", 0, False + ) + mqtt_mock.async_publish.reset_mock() + + # Fake state update from the light + async_fire_mqtt_message( + hass, "tasmota_49A3BC/tele/STATE", '{"POWER":"ON","Dimmer":50}' + ) + state = hass.states.get("light.test") + assert state.state == STATE_ON + assert state.attributes.get("brightness") == 127.5 + + # Dim the light from 50->0: Speed should be 6*2/2=6 + await common.async_turn_off(hass, "light.test", transition=6) + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", "Fade 1;Speed 6;Power1 OFF", 0, False + ) + mqtt_mock.async_publish.reset_mock() + + +async def test_relay_as_light(hass, mqtt_mock, setup_tasmota): + """Test relay show up as light in light mode.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 1 + config["so"]["30"] = 1 # Enforce Home Assistant auto-discovery as light + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + + state = hass.states.get("switch.test") + assert state is None + state = hass.states.get("light.test") + assert state is not None + + +async def _test_split_light(hass, mqtt_mock, config, num_lights, num_switches): + """Test multi-channel light split to single-channel dimmers.""" + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + await hass.async_block_till_done() + await hass.async_block_till_done() + + assert len(hass.states.async_entity_ids("switch")) == num_switches + assert len(hass.states.async_entity_ids("light")) == num_lights + + lights = hass.states.async_entity_ids("light") + for idx, entity in enumerate(lights): + mqtt_mock.async_publish.reset_mock() + # Turn the light on and verify MQTT message is sent + await common.async_turn_on(hass, entity) + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", + f"Fade 0;Power{idx+num_switches+1} ON", + 0, + False, + ) + + mqtt_mock.async_publish.reset_mock() + # Dim the light and verify MQTT message is sent + await common.async_turn_on(hass, entity, brightness=(idx + 1) * 25.5) + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", + f"Fade 0;Channel{idx+num_switches+1} {(idx+1)*10}", + 0, + False, + ) + + +async def test_split_light(hass, mqtt_mock, setup_tasmota): + """Test multi-channel light split to single-channel dimmers.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["rl"][1] = 2 + config["rl"][2] = 2 + config["rl"][3] = 2 + config["rl"][4] = 2 + config["so"][68] = 1 # Multi-channel PWM instead of a single light + config["lt_st"] = 5 # 5 channel light (RGBCW) + + await _test_split_light(hass, mqtt_mock, config, 5, 0) + + +async def test_split_light2(hass, mqtt_mock, setup_tasmota): + """Test multi-channel light split to single-channel dimmers.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 1 + config["rl"][1] = 1 + config["rl"][2] = 2 + config["rl"][3] = 2 + config["rl"][4] = 2 + config["rl"][5] = 2 + config["rl"][6] = 2 + config["so"][68] = 1 # Multi-channel PWM instead of a single light + config["lt_st"] = 5 # 5 channel light (RGBCW) + + await _test_split_light(hass, mqtt_mock, config, 5, 2) + + +async def _test_unlinked_light(hass, mqtt_mock, config, num_switches): + """Test multi-channel light split to single-channel dimmers.""" + mac = config["mac"] + num_lights = 2 + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + + async_fire_mqtt_message(hass, "tasmota_49A3BC/tele/LWT", "Online") + await hass.async_block_till_done() + await hass.async_block_till_done() + + assert len(hass.states.async_entity_ids("switch")) == num_switches + assert len(hass.states.async_entity_ids("light")) == num_lights + + lights = hass.states.async_entity_ids("light") + for idx, entity in enumerate(lights): + mqtt_mock.async_publish.reset_mock() + # Turn the light on and verify MQTT message is sent + await common.async_turn_on(hass, entity) + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", + f"Fade 0;Power{idx+num_switches+1} ON", + 0, + False, + ) + + mqtt_mock.async_publish.reset_mock() + # Dim the light and verify MQTT message is sent + await common.async_turn_on(hass, entity, brightness=(idx + 1) * 25.5) + mqtt_mock.async_publish.assert_called_once_with( + "tasmota_49A3BC/cmnd/Backlog", + f"Fade 0;Dimmer{idx+1} {(idx+1)*10}", + 0, + False, + ) + + +async def test_unlinked_light(hass, mqtt_mock, setup_tasmota): + """Test multi-channel light split to rgb+ww.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["rl"][1] = 2 + config["lk"] = 0 # RGB + white channels unlinked + config["lt_st"] = 5 # 5 channel light (RGBCW) + + await _test_unlinked_light(hass, mqtt_mock, config, 0) + + +async def test_unlinked_light2(hass, mqtt_mock, setup_tasmota): + """Test multi-channel light split to single-channel dimmers.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 1 + config["rl"][1] = 1 + config["rl"][2] = 2 + config["rl"][3] = 2 + config["lk"] = 0 # RGB + white channels unlinked + config["lt_st"] = 5 # 5 channel light (RGBCW) + + await _test_unlinked_light(hass, mqtt_mock, config, 2) + + +async def test_availability_when_connection_lost( + hass, mqtt_client_mock, mqtt_mock, setup_tasmota +): + """Test availability after MQTT disconnection.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 1 # 1 channel light (Dimmer) + await help_test_availability_when_connection_lost( + hass, mqtt_client_mock, mqtt_mock, light.DOMAIN, config + ) + + +async def test_availability(hass, mqtt_mock, setup_tasmota): + """Test availability.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 1 # 1 channel light (Dimmer) + await help_test_availability(hass, mqtt_mock, light.DOMAIN, config) + + +async def test_availability_discovery_update(hass, mqtt_mock, setup_tasmota): + """Test availability discovery update.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 1 # 1 channel light (Dimmer) + await help_test_availability_discovery_update(hass, mqtt_mock, light.DOMAIN, config) + + +async def test_availability_poll_state( + hass, mqtt_client_mock, mqtt_mock, setup_tasmota +): + """Test polling after MQTT connection (re)established.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 1 # 1 channel light (Dimmer) + poll_topic = "tasmota_49A3BC/cmnd/STATE" + await help_test_availability_poll_state( + hass, mqtt_client_mock, mqtt_mock, light.DOMAIN, config, poll_topic, "" + ) + + +async def test_discovery_removal_light(hass, mqtt_mock, caplog, setup_tasmota): + """Test removal of discovered light.""" + config1 = copy.deepcopy(DEFAULT_CONFIG) + config1["rl"][0] = 2 + config1["lt_st"] = 1 # 1 channel light (Dimmer) + config2 = copy.deepcopy(DEFAULT_CONFIG) + config2["rl"][0] = 0 + config2["lt_st"] = 0 + + await help_test_discovery_removal( + hass, mqtt_mock, caplog, light.DOMAIN, config1, config2 + ) + + +async def test_discovery_removal_relay_as_light(hass, mqtt_mock, caplog, setup_tasmota): + """Test removal of discovered relay as light.""" + config1 = copy.deepcopy(DEFAULT_CONFIG) + config1["rl"][0] = 1 + config1["so"]["30"] = 1 # Enforce Home Assistant auto-discovery as light + config2 = copy.deepcopy(DEFAULT_CONFIG) + config2["rl"][0] = 1 + config2["so"]["30"] = 0 # Disable Home Assistant auto-discovery as light + + await help_test_discovery_removal( + hass, mqtt_mock, caplog, light.DOMAIN, config1, config2 + ) + + +async def test_discovery_update_unchanged_light(hass, mqtt_mock, caplog, setup_tasmota): + """Test update of discovered light.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 1 # 1 channel light (Dimmer) + with patch( + "homeassistant.components.tasmota.light.TasmotaLight.discovery_update" + ) as discovery_update: + await help_test_discovery_update_unchanged( + hass, mqtt_mock, caplog, light.DOMAIN, config, discovery_update + ) + + +async def test_discovery_device_remove(hass, mqtt_mock, setup_tasmota): + """Test device registry remove.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 1 # 1 channel light (Dimmer) + unique_id = f"{DEFAULT_CONFIG['mac']}_light_light_0" + await help_test_discovery_device_remove( + hass, mqtt_mock, light.DOMAIN, unique_id, config + ) + + +async def test_discovery_device_remove_relay_as_light(hass, mqtt_mock, setup_tasmota): + """Test device registry remove.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 1 + config["so"]["30"] = 1 # Enforce Home Assistant auto-discovery as light + unique_id = f"{DEFAULT_CONFIG['mac']}_light_relay_0" + await help_test_discovery_device_remove( + hass, mqtt_mock, light.DOMAIN, unique_id, config + ) + + +async def test_entity_id_update_subscriptions(hass, mqtt_mock, setup_tasmota): + """Test MQTT subscriptions are managed when entity_id is updated.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 1 # 1 channel light (Dimmer) + await help_test_entity_id_update_subscriptions( + hass, mqtt_mock, light.DOMAIN, config + ) + + +async def test_entity_id_update_discovery_update(hass, mqtt_mock, setup_tasmota): + """Test MQTT discovery update when entity_id is updated.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 2 + config["lt_st"] = 1 # 1 channel light (Dimmer) + await help_test_entity_id_update_discovery_update( + hass, mqtt_mock, light.DOMAIN, config + ) diff --git a/tests/components/tasmota/test_switch.py b/tests/components/tasmota/test_switch.py index 47781771098..5757d161129 100644 --- a/tests/components/tasmota/test_switch.py +++ b/tests/components/tasmota/test_switch.py @@ -98,6 +98,26 @@ async def test_sending_mqtt_commands(hass, mqtt_mock, setup_tasmota): assert state.state == STATE_OFF +async def test_relay_as_light(hass, mqtt_mock, setup_tasmota): + """Test relay does not show up as switch in light mode.""" + config = copy.deepcopy(DEFAULT_CONFIG) + config["rl"][0] = 1 + config["so"]["30"] = 1 # Enforce Home Assistant auto-discovery as light + mac = config["mac"] + + async_fire_mqtt_message( + hass, + f"{DEFAULT_PREFIX}/{mac}/config", + json.dumps(config), + ) + await hass.async_block_till_done() + + state = hass.states.get("switch.test") + assert state is None + state = hass.states.get("light.test") + assert state is not None + + async def test_availability_when_connection_lost( hass, mqtt_client_mock, mqtt_mock, setup_tasmota ): @@ -149,6 +169,20 @@ async def test_discovery_removal_switch(hass, mqtt_mock, caplog, setup_tasmota): ) +async def test_discovery_removal_relay_as_light(hass, mqtt_mock, caplog, setup_tasmota): + """Test removal of discovered relay as light.""" + config1 = copy.deepcopy(DEFAULT_CONFIG) + config1["rl"][0] = 1 + config1["so"]["30"] = 0 # Disable Home Assistant auto-discovery as light + config2 = copy.deepcopy(DEFAULT_CONFIG) + config2["rl"][0] = 1 + config2["so"]["30"] = 1 # Enforce Home Assistant auto-discovery as light + + await help_test_discovery_removal( + hass, mqtt_mock, caplog, switch.DOMAIN, config1, config2 + ) + + async def test_discovery_update_unchanged_switch( hass, mqtt_mock, caplog, setup_tasmota ):