Allow any parameter of a light profile as an optional parameter (#44079)

* No code duplication for profile application

* Refactor color profile as a dataclass

* Typing

* Make color_x and color_y of a Light profile optional

* Update tests

* Make brightness field of a Light profile optional

* Transition can be of a float type

* Allow fractional transition times in light profiles

Make transition of a float type.
Allow transition to be optional with 5 column CSV files.

* Make pylint happy

* Fix dropped async_mock

* Simplify CSV row schema
This commit is contained in:
Alexei Chetroi 2021-01-13 06:11:20 -05:00 committed by GitHub
parent 10bc05df00
commit ec038bc6ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 271 additions and 90 deletions

View file

@ -1,8 +1,10 @@
"""Provides functionality to interact with lights."""
import csv
import dataclasses
from datetime import timedelta
import logging
import os
from typing import Dict, List, Optional, Tuple, cast
import voluptuous as vol
@ -21,6 +23,7 @@ from homeassistant.helpers.config_validation import ( # noqa: F401
)
from homeassistant.helpers.entity import ToggleEntity
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.loader import bind_hass
import homeassistant.util.color as color_util
@ -270,24 +273,74 @@ async def async_unload_entry(hass, entry):
return await hass.data[DOMAIN].async_unload_entry(entry)
class Profiles:
"""Representation of available color profiles."""
def _coerce_none(value: str) -> None:
"""Coerce an empty string as None."""
SCHEMA = vol.Schema(
if not isinstance(value, str):
raise vol.Invalid("Expected a string")
if value:
raise vol.Invalid("Not an empty string")
@dataclasses.dataclass
class Profile:
"""Representation of a profile."""
name: str
color_x: Optional[float] = dataclasses.field(repr=False)
color_y: Optional[float] = dataclasses.field(repr=False)
brightness: Optional[int]
transition: Optional[int] = None
hs_color: Optional[Tuple[float, float]] = dataclasses.field(init=False)
SCHEMA = vol.Schema( # pylint: disable=invalid-name
vol.Any(
vol.ExactSequence((str, cv.small_float, cv.small_float, cv.byte)),
vol.ExactSequence(
(str, cv.small_float, cv.small_float, cv.byte, cv.positive_int)
(
str,
vol.Any(cv.small_float, _coerce_none),
vol.Any(cv.small_float, _coerce_none),
vol.Any(cv.byte, _coerce_none),
)
),
vol.ExactSequence(
(
str,
vol.Any(cv.small_float, _coerce_none),
vol.Any(cv.small_float, _coerce_none),
vol.Any(cv.byte, _coerce_none),
vol.Any(VALID_TRANSITION, _coerce_none),
)
),
)
)
def __init__(self, hass):
def __post_init__(self) -> None:
"""Convert xy to hs color."""
if None in (self.color_x, self.color_y):
self.hs_color = None
return
self.hs_color = color_util.color_xy_to_hs(
cast(float, self.color_x), cast(float, self.color_y)
)
@classmethod
def from_csv_row(cls, csv_row: List[str]) -> "Profile":
"""Create profile from a CSV row tuple."""
return cls(*cls.SCHEMA(csv_row))
class Profiles:
"""Representation of available color profiles."""
def __init__(self, hass: HomeAssistantType):
"""Initialize profiles."""
self.hass = hass
self.data = None
self.data: Dict[str, Profile] = {}
def _load_profile_data(self):
def _load_profile_data(self) -> Dict[str, Profile]:
"""Load built-in profiles and custom profiles."""
profile_paths = [
os.path.join(os.path.dirname(__file__), LIGHT_PROFILES_FILE),
@ -306,56 +359,46 @@ class Profiles:
try:
for rec in reader:
(
profile,
color_x,
color_y,
brightness,
*transition,
) = Profiles.SCHEMA(rec)
profile = Profile.from_csv_row(rec)
profiles[profile.name] = profile
transition = transition[0] if transition else 0
profiles[profile] = color_util.color_xy_to_hs(
color_x, color_y
) + (
brightness,
transition,
)
except vol.MultipleInvalid as ex:
_LOGGER.error(
"Error parsing light profile from %s: %s", profile_path, ex
"Error parsing light profile row '%s' from %s: %s",
rec,
profile_path,
ex,
)
continue
return profiles
async def async_initialize(self):
async def async_initialize(self) -> None:
"""Load and cache profiles."""
self.data = await self.hass.async_add_executor_job(self._load_profile_data)
@callback
def apply_default(self, entity_id, params):
def apply_default(self, entity_id: str, params: Dict) -> None:
"""Return the default turn-on profile for the given light."""
name = f"{entity_id}.default"
if name in self.data:
self.apply_profile(name, params)
return
name = "group.all_lights.default"
if name in self.data:
self.apply_profile(name, params)
for _entity_id in (entity_id, "group.all_lights"):
name = f"{_entity_id}.default"
if name in self.data:
self.apply_profile(name, params)
return
@callback
def apply_profile(self, name, params):
def apply_profile(self, name: str, params: Dict) -> None:
"""Apply a profile."""
profile = self.data.get(name)
if profile is None:
return
params.setdefault(ATTR_HS_COLOR, profile[:2])
params.setdefault(ATTR_BRIGHTNESS, profile[2])
params.setdefault(ATTR_TRANSITION, profile[3])
if profile.hs_color is not None:
params.setdefault(ATTR_HS_COLOR, profile.hs_color)
if profile.brightness is not None:
params.setdefault(ATTR_BRIGHTNESS, profile.brightness)
if profile.transition is not None:
params.setdefault(ATTR_TRANSITION, profile.transition)
class LightEntity(ToggleEntity):

View file

@ -20,7 +20,6 @@ def mock_light_profiles():
with patch(
"homeassistant.components.light.Profiles",
SCHEMA=Profiles.SCHEMA,
side_effect=mock_profiles_class,
):
yield data

View file

@ -1,4 +1,6 @@
"""The tests for the Light component."""
from unittest.mock import MagicMock, mock_open, patch
import pytest
import voluptuous as vol
@ -16,7 +18,6 @@ from homeassistant.const import (
)
from homeassistant.exceptions import Unauthorized
from homeassistant.setup import async_setup_component
from homeassistant.util import color
from tests.common import async_mock_service
@ -280,14 +281,14 @@ async def test_services(hass, mock_light_profiles):
assert data == {}
# One of the light profiles
mock_light_profiles["relax"] = (35.932, 69.412, 144, 0)
prof_name, prof_h, prof_s, prof_bri, prof_t = "relax", 35.932, 69.412, 144, 0
profile = light.Profile("relax", 0.513, 0.413, 144, 0)
mock_light_profiles[profile.name] = profile
# Test light profiles
await hass.services.async_call(
light.DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: ent1.entity_id, light.ATTR_PROFILE: prof_name},
{ATTR_ENTITY_ID: ent1.entity_id, light.ATTR_PROFILE: profile.name},
blocking=True,
)
# Specify a profile and a brightness attribute to overwrite it
@ -296,7 +297,7 @@ async def test_services(hass, mock_light_profiles):
SERVICE_TURN_ON,
{
ATTR_ENTITY_ID: ent2.entity_id,
light.ATTR_PROFILE: prof_name,
light.ATTR_PROFILE: profile.name,
light.ATTR_BRIGHTNESS: 100,
light.ATTR_TRANSITION: 1,
},
@ -305,15 +306,15 @@ async def test_services(hass, mock_light_profiles):
_, data = ent1.last_call("turn_on")
assert data == {
light.ATTR_BRIGHTNESS: prof_bri,
light.ATTR_HS_COLOR: (prof_h, prof_s),
light.ATTR_TRANSITION: prof_t,
light.ATTR_BRIGHTNESS: profile.brightness,
light.ATTR_HS_COLOR: profile.hs_color,
light.ATTR_TRANSITION: profile.transition,
}
_, data = ent2.last_call("turn_on")
assert data == {
light.ATTR_BRIGHTNESS: 100,
light.ATTR_HS_COLOR: (prof_h, prof_s),
light.ATTR_HS_COLOR: profile.hs_color,
light.ATTR_TRANSITION: 1,
}
@ -323,7 +324,7 @@ async def test_services(hass, mock_light_profiles):
SERVICE_TOGGLE,
{
ATTR_ENTITY_ID: ent3.entity_id,
light.ATTR_PROFILE: prof_name,
light.ATTR_PROFILE: profile.name,
light.ATTR_BRIGHTNESS_PCT: 100,
},
blocking=True,
@ -332,8 +333,8 @@ async def test_services(hass, mock_light_profiles):
_, data = ent3.last_call("turn_on")
assert data == {
light.ATTR_BRIGHTNESS: 255,
light.ATTR_HS_COLOR: (prof_h, prof_s),
light.ATTR_TRANSITION: prof_t,
light.ATTR_HS_COLOR: profile.hs_color,
light.ATTR_TRANSITION: profile.transition,
}
await hass.services.async_call(
@ -392,7 +393,7 @@ async def test_services(hass, mock_light_profiles):
SERVICE_TURN_ON,
{
ATTR_ENTITY_ID: ent1.entity_id,
light.ATTR_PROFILE: prof_name,
light.ATTR_PROFILE: profile.name,
light.ATTR_BRIGHTNESS: "bright",
},
blocking=True,
@ -422,13 +423,92 @@ async def test_services(hass, mock_light_profiles):
assert data == {}
async def test_light_profiles(hass, mock_light_profiles):
@pytest.mark.parametrize(
"profile_name, last_call, expected_data",
(
(
"test",
"turn_on",
{
light.ATTR_HS_COLOR: (71.059, 100),
light.ATTR_BRIGHTNESS: 100,
light.ATTR_TRANSITION: 0,
},
),
(
"color_no_brightness_no_transition",
"turn_on",
{
light.ATTR_HS_COLOR: (71.059, 100),
},
),
(
"no color",
"turn_on",
{
light.ATTR_BRIGHTNESS: 110,
light.ATTR_TRANSITION: 0,
},
),
(
"test_off",
"turn_off",
{
light.ATTR_TRANSITION: 0,
},
),
(
"no brightness",
"turn_on",
{
light.ATTR_HS_COLOR: (71.059, 100),
},
),
(
"color_and_brightness",
"turn_on",
{
light.ATTR_HS_COLOR: (71.059, 100),
light.ATTR_BRIGHTNESS: 120,
},
),
(
"color_and_transition",
"turn_on",
{
light.ATTR_HS_COLOR: (71.059, 100),
light.ATTR_TRANSITION: 4.2,
},
),
(
"brightness_and_transition",
"turn_on",
{
light.ATTR_BRIGHTNESS: 130,
light.ATTR_TRANSITION: 5.3,
},
),
),
)
async def test_light_profiles(
hass, mock_light_profiles, profile_name, expected_data, last_call
):
"""Test light profiles."""
platform = getattr(hass.components, "test.light")
platform.init()
mock_light_profiles["test"] = color.color_xy_to_hs(0.4, 0.6) + (100, 0)
mock_light_profiles["test_off"] = 0, 0, 0, 0
profile_mock_data = {
"test": (0.4, 0.6, 100, 0),
"color_no_brightness_no_transition": (0.4, 0.6, None, None),
"no color": (None, None, 110, 0),
"test_off": (0, 0, 0, 0),
"no brightness": (0.4, 0.6, None),
"color_and_brightness": (0.4, 0.6, 120),
"color_and_transition": (0.4, 0.6, None, 4.2),
"brightness_and_transition": (None, None, 130, 5.3),
}
for name, data in profile_mock_data.items():
mock_light_profiles[name] = light.Profile(*(name, *data))
assert await async_setup_component(
hass, light.DOMAIN, {light.DOMAIN: {CONF_PLATFORM: "test"}}
@ -442,29 +522,17 @@ async def test_light_profiles(hass, mock_light_profiles):
SERVICE_TURN_ON,
{
ATTR_ENTITY_ID: ent1.entity_id,
light.ATTR_PROFILE: "test",
light.ATTR_PROFILE: profile_name,
},
blocking=True,
)
_, data = ent1.last_call("turn_on")
assert light.is_on(hass, ent1.entity_id)
assert data == {
light.ATTR_HS_COLOR: (71.059, 100),
light.ATTR_BRIGHTNESS: 100,
light.ATTR_TRANSITION: 0,
}
await hass.services.async_call(
light.DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: ent1.entity_id, light.ATTR_PROFILE: "test_off"},
blocking=True,
)
_, data = ent1.last_call("turn_off")
assert not light.is_on(hass, ent1.entity_id)
assert data == {light.ATTR_TRANSITION: 0}
_, data = ent1.last_call(last_call)
if last_call == "turn_on":
assert light.is_on(hass, ent1.entity_id)
else:
assert not light.is_on(hass, ent1.entity_id)
assert data == expected_data
async def test_default_profiles_group(hass, mock_light_profiles):
@ -477,10 +545,8 @@ async def test_default_profiles_group(hass, mock_light_profiles):
)
await hass.async_block_till_done()
mock_light_profiles["group.all_lights.default"] = color.color_xy_to_hs(0.4, 0.6) + (
99,
2,
)
profile = light.Profile("group.all_lights.default", 0.4, 0.6, 99, 2)
mock_light_profiles[profile.name] = profile
ent, _, _ = platform.ENTITIES
await hass.services.async_call(
@ -505,14 +571,10 @@ async def test_default_profiles_light(hass, mock_light_profiles):
)
await hass.async_block_till_done()
mock_light_profiles["group.all_lights.default"] = color.color_xy_to_hs(0.3, 0.5) + (
200,
0,
)
mock_light_profiles["light.ceiling_2.default"] = color.color_xy_to_hs(0.6, 0.6) + (
100,
3,
)
profile = light.Profile("group.all_lights.default", 0.3, 0.5, 200, 0)
mock_light_profiles[profile.name] = profile
profile = light.Profile("light.ceiling_2.default", 0.6, 0.6, 100, 3)
mock_light_profiles[profile.name] = profile
dev = next(filter(lambda x: x.entity_id == "light.ceiling_2", platform.ENTITIES))
await hass.services.async_call(
@ -693,8 +755,85 @@ async def test_profiles(hass):
profiles = orig_Profiles(hass)
await profiles.async_initialize()
assert profiles.data == {
"concentrate": (35.932, 69.412, 219, 0),
"energize": (43.333, 21.176, 203, 0),
"reading": (38.88, 49.02, 240, 0),
"relax": (35.932, 69.412, 144, 0),
"concentrate": light.Profile("concentrate", 0.5119, 0.4147, 219, None),
"energize": light.Profile("energize", 0.368, 0.3686, 203, None),
"reading": light.Profile("reading", 0.4448, 0.4066, 240, None),
"relax": light.Profile("relax", 0.5119, 0.4147, 144, None),
}
assert profiles.data["concentrate"].hs_color == (35.932, 69.412)
assert profiles.data["energize"].hs_color == (43.333, 21.176)
assert profiles.data["reading"].hs_color == (38.88, 49.02)
assert profiles.data["relax"].hs_color == (35.932, 69.412)
@patch("os.path.isfile", MagicMock(side_effect=(True, False)))
async def test_profile_load_optional_hs_color(hass):
"""Test profile loading with profiles containing no xy color."""
csv_file = """the first line is skipped
no_color,,,100,1
no_color_no_transition,,,110
color,0.5119,0.4147,120,2
color_no_transition,0.4448,0.4066,130
color_and_brightness,0.4448,0.4066,170,
only_brightness,,,140
only_transition,,,,150
transition_float,,,,1.6
invalid_profile_1,
invalid_color_2,,0.1,1,2
invalid_color_3,,0.1,1
invalid_color_4,0.1,,1,3
invalid_color_5,0.1,,1
invalid_brightness,0,0,256,4
invalid_brightness_2,0,0,256
invalid_no_brightness_no_color_no_transition,,,
"""
profiles = orig_Profiles(hass)
with patch("builtins.open", mock_open(read_data=csv_file)):
await profiles.async_initialize()
await hass.async_block_till_done()
assert profiles.data["no_color"].hs_color is None
assert profiles.data["no_color"].brightness == 100
assert profiles.data["no_color"].transition == 1
assert profiles.data["no_color_no_transition"].hs_color is None
assert profiles.data["no_color_no_transition"].brightness == 110
assert profiles.data["no_color_no_transition"].transition is None
assert profiles.data["color"].hs_color == (35.932, 69.412)
assert profiles.data["color"].brightness == 120
assert profiles.data["color"].transition == 2
assert profiles.data["color_no_transition"].hs_color == (38.88, 49.02)
assert profiles.data["color_no_transition"].brightness == 130
assert profiles.data["color_no_transition"].transition is None
assert profiles.data["color_and_brightness"].hs_color == (38.88, 49.02)
assert profiles.data["color_and_brightness"].brightness == 170
assert profiles.data["color_and_brightness"].transition is None
assert profiles.data["only_brightness"].hs_color is None
assert profiles.data["only_brightness"].brightness == 140
assert profiles.data["only_brightness"].transition is None
assert profiles.data["only_transition"].hs_color is None
assert profiles.data["only_transition"].brightness is None
assert profiles.data["only_transition"].transition == 150
assert profiles.data["transition_float"].hs_color is None
assert profiles.data["transition_float"].brightness is None
assert profiles.data["transition_float"].transition == 1.6
for invalid_profile_name in (
"invalid_profile_1",
"invalid_color_2",
"invalid_color_3",
"invalid_color_4",
"invalid_color_5",
"invalid_brightness",
"invalid_brightness_2",
"invalid_no_brightness_no_color_no_transition",
):
assert invalid_profile_name not in profiles.data