Report average of position and tilt_position for cover groups (#52713)

This commit is contained in:
Erik Montnemery 2021-08-25 16:12:29 +02:00 committed by GitHub
parent 35ccad7904
commit 20d8c4da90
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 140 additions and 102 deletions

View file

@ -48,6 +48,7 @@ from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.typing import ConfigType
from . import GroupEntity
from .util import attribute_equal, reduce_attribute
KEY_OPEN_CLOSE = "open_close"
KEY_STOP = "stop"
@ -266,49 +267,33 @@ class CoverGroup(GroupEntity, CoverEntity):
continue
if state.state == STATE_OPEN:
self._attr_is_closed = False
break
continue
if state.state == STATE_CLOSING:
self._attr_is_closing = True
break
continue
if state.state == STATE_OPENING:
self._attr_is_opening = True
break
continue
self._attr_current_cover_position = None
if self._covers[KEY_POSITION]:
position: int | None = -1
self._attr_current_cover_position = 0 if self.is_closed else 100
for entity_id in self._covers[KEY_POSITION]:
state = self.hass.states.get(entity_id)
if state is None:
continue
pos = state.attributes.get(ATTR_CURRENT_POSITION)
if position == -1:
position = pos
elif position != pos:
self._attr_assumed_state = True
break
else:
if position != -1:
self._attr_current_cover_position = position
position_covers = self._covers[KEY_POSITION]
all_position_states = [self.hass.states.get(x) for x in position_covers]
position_states: list[State] = list(filter(None, all_position_states))
self._attr_current_cover_position = reduce_attribute(
position_states, ATTR_CURRENT_POSITION
)
self._attr_assumed_state |= not attribute_equal(
position_states, ATTR_CURRENT_POSITION
)
self._attr_current_cover_tilt_position = None
if self._tilts[KEY_POSITION]:
position = -1
self._attr_current_cover_tilt_position = 100
for entity_id in self._tilts[KEY_POSITION]:
state = self.hass.states.get(entity_id)
if state is None:
continue
pos = state.attributes.get(ATTR_CURRENT_TILT_POSITION)
if position == -1:
position = pos
elif position != pos:
self._attr_assumed_state = True
break
else:
if position != -1:
self._attr_current_cover_tilt_position = position
tilt_covers = self._tilts[KEY_POSITION]
all_tilt_states = [self.hass.states.get(x) for x in tilt_covers]
tilt_states: list[State] = list(filter(None, all_tilt_states))
self._attr_current_cover_tilt_position = reduce_attribute(
tilt_states, ATTR_CURRENT_TILT_POSITION
)
self._attr_assumed_state |= not attribute_equal(
tilt_states, ATTR_CURRENT_TILT_POSITION
)
supported_features = 0
supported_features |= (

View file

@ -2,9 +2,8 @@
from __future__ import annotations
from collections import Counter
from collections.abc import Iterator
import itertools
from typing import Any, Callable, Set, cast
from typing import Any, Set, cast
import voluptuous as vol
@ -51,6 +50,7 @@ from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.typing import ConfigType
from . import GroupEntity
from .util import find_state_attributes, mean_tuple, reduce_attribute
DEFAULT_NAME = "Light Group"
@ -183,36 +183,36 @@ class LightGroup(GroupEntity, light.LightEntity):
self._attr_is_on = len(on_states) > 0
self._attr_available = any(state.state != STATE_UNAVAILABLE for state in states)
self._attr_brightness = _reduce_attribute(on_states, ATTR_BRIGHTNESS)
self._attr_brightness = reduce_attribute(on_states, ATTR_BRIGHTNESS)
self._attr_hs_color = _reduce_attribute(
on_states, ATTR_HS_COLOR, reduce=_mean_tuple
self._attr_hs_color = reduce_attribute(
on_states, ATTR_HS_COLOR, reduce=mean_tuple
)
self._attr_rgb_color = _reduce_attribute(
on_states, ATTR_RGB_COLOR, reduce=_mean_tuple
self._attr_rgb_color = reduce_attribute(
on_states, ATTR_RGB_COLOR, reduce=mean_tuple
)
self._attr_rgbw_color = _reduce_attribute(
on_states, ATTR_RGBW_COLOR, reduce=_mean_tuple
self._attr_rgbw_color = reduce_attribute(
on_states, ATTR_RGBW_COLOR, reduce=mean_tuple
)
self._attr_rgbww_color = _reduce_attribute(
on_states, ATTR_RGBWW_COLOR, reduce=_mean_tuple
self._attr_rgbww_color = reduce_attribute(
on_states, ATTR_RGBWW_COLOR, reduce=mean_tuple
)
self._attr_xy_color = _reduce_attribute(
on_states, ATTR_XY_COLOR, reduce=_mean_tuple
self._attr_xy_color = reduce_attribute(
on_states, ATTR_XY_COLOR, reduce=mean_tuple
)
self._white_value = _reduce_attribute(on_states, ATTR_WHITE_VALUE)
self._white_value = reduce_attribute(on_states, ATTR_WHITE_VALUE)
self._attr_color_temp = _reduce_attribute(on_states, ATTR_COLOR_TEMP)
self._attr_min_mireds = _reduce_attribute(
self._attr_color_temp = reduce_attribute(on_states, ATTR_COLOR_TEMP)
self._attr_min_mireds = reduce_attribute(
states, ATTR_MIN_MIREDS, default=154, reduce=min
)
self._attr_max_mireds = _reduce_attribute(
self._attr_max_mireds = reduce_attribute(
states, ATTR_MAX_MIREDS, default=500, reduce=max
)
self._attr_effect_list = None
all_effect_lists = list(_find_state_attributes(states, ATTR_EFFECT_LIST))
all_effect_lists = list(find_state_attributes(states, ATTR_EFFECT_LIST))
if all_effect_lists:
# Merge all effects from all effect_lists with a union merge.
self._attr_effect_list = list(set().union(*all_effect_lists))
@ -222,14 +222,14 @@ class LightGroup(GroupEntity, light.LightEntity):
self._attr_effect_list.insert(0, "None")
self._attr_effect = None
all_effects = list(_find_state_attributes(on_states, ATTR_EFFECT))
all_effects = list(find_state_attributes(on_states, ATTR_EFFECT))
if all_effects:
# Report the most common effect.
effects_count = Counter(itertools.chain(all_effects))
self._attr_effect = effects_count.most_common(1)[0][0]
self._attr_color_mode = None
all_color_modes = list(_find_state_attributes(on_states, ATTR_COLOR_MODE))
all_color_modes = list(find_state_attributes(on_states, ATTR_COLOR_MODE))
if all_color_modes:
# Report the most common color mode, select brightness and onoff last
color_mode_count = Counter(itertools.chain(all_color_modes))
@ -241,7 +241,7 @@ class LightGroup(GroupEntity, light.LightEntity):
self._attr_supported_color_modes = None
all_supported_color_modes = list(
_find_state_attributes(states, ATTR_SUPPORTED_COLOR_MODES)
find_state_attributes(states, ATTR_SUPPORTED_COLOR_MODES)
)
if all_supported_color_modes:
# Merge all color modes.
@ -250,49 +250,10 @@ class LightGroup(GroupEntity, light.LightEntity):
)
self._attr_supported_features = 0
for support in _find_state_attributes(states, ATTR_SUPPORTED_FEATURES):
for support in find_state_attributes(states, ATTR_SUPPORTED_FEATURES):
# Merge supported features by emulating support for every feature
# we find.
self._attr_supported_features |= support
# Bitwise-and the supported features with the GroupedLight's features
# so that we don't break in the future when a new feature is added.
self._attr_supported_features &= SUPPORT_GROUP_LIGHT
def _find_state_attributes(states: list[State], key: str) -> Iterator[Any]:
"""Find attributes with matching key from states."""
for state in states:
value = state.attributes.get(key)
if value is not None:
yield value
def _mean_int(*args: Any) -> int:
"""Return the mean of the supplied values."""
return int(sum(args) / len(args))
def _mean_tuple(*args: Any) -> tuple[float | Any, ...]:
"""Return the mean values along the columns of the supplied values."""
return tuple(sum(x) / len(x) for x in zip(*args))
def _reduce_attribute(
states: list[State],
key: str,
default: Any | None = None,
reduce: Callable[..., Any] = _mean_int,
) -> Any:
"""Find the first attribute matching key from states.
If none are found, return default.
"""
attrs = list(_find_state_attributes(states, key))
if not attrs:
return default
if len(attrs) == 1:
return attrs[0]
return reduce(*attrs)

View file

@ -0,0 +1,57 @@
"""Utility functions to combine state attributes from multiple entities."""
from __future__ import annotations
from collections.abc import Iterator
from itertools import groupby
from typing import Any, Callable
from homeassistant.core import State
def find_state_attributes(states: list[State], key: str) -> Iterator[Any]:
"""Find attributes with matching key from states."""
for state in states:
value = state.attributes.get(key)
if value is not None:
yield value
def mean_int(*args: Any) -> int:
"""Return the mean of the supplied values."""
return int(sum(args) / len(args))
def mean_tuple(*args: Any) -> tuple[float | Any, ...]:
"""Return the mean values along the columns of the supplied values."""
return tuple(sum(x) / len(x) for x in zip(*args))
def attribute_equal(states: list[State], key: str) -> bool:
"""Return True if all attributes found matching key from states are equal.
Note: Returns True if no matching attribute is found.
"""
attrs = find_state_attributes(states, key)
grp = groupby(attrs)
return bool(next(grp, True) and not next(grp, False))
def reduce_attribute(
states: list[State],
key: str,
default: Any | None = None,
reduce: Callable[..., Any] = mean_int,
) -> Any:
"""Find the first attribute matching key from states.
If none are found, return default.
"""
attrs = list(find_state_attributes(states, key))
if not attrs:
return default
if len(attrs) == 1:
return attrs[0]
return reduce(*attrs)

View file

@ -177,7 +177,7 @@ async def test_attributes(hass, setup_comp):
assert state.state == STATE_OPEN
assert state.attributes[ATTR_ASSUMED_STATE] is True
assert state.attributes[ATTR_SUPPORTED_FEATURES] == 244
assert state.attributes[ATTR_CURRENT_POSITION] == 100
assert state.attributes[ATTR_CURRENT_POSITION] == 85 # (70 + 100) / 2
assert state.attributes[ATTR_CURRENT_TILT_POSITION] == 60
hass.states.async_remove(DEMO_COVER)
@ -204,7 +204,7 @@ async def test_attributes(hass, setup_comp):
assert state.attributes[ATTR_ASSUMED_STATE] is True
assert state.attributes[ATTR_SUPPORTED_FEATURES] == 128
assert ATTR_CURRENT_POSITION not in state.attributes
assert state.attributes[ATTR_CURRENT_TILT_POSITION] == 100
assert state.attributes[ATTR_CURRENT_TILT_POSITION] == 80 # (60 + 100) / 2
hass.states.async_remove(DEMO_COVER_TILT)
hass.states.async_set(DEMO_TILT, STATE_CLOSED)
@ -367,8 +367,8 @@ async def test_stop_covers(hass, setup_comp):
await hass.async_block_till_done()
state = hass.states.get(COVER_GROUP)
assert state.state == STATE_OPEN
assert state.attributes[ATTR_CURRENT_POSITION] == 100
assert state.state == STATE_OPENING
assert state.attributes[ATTR_CURRENT_POSITION] == 50 # (20 + 80) / 2
assert hass.states.get(DEMO_COVER).state == STATE_OPEN
assert hass.states.get(DEMO_COVER_POS).attributes[ATTR_CURRENT_POSITION] == 20
@ -542,6 +542,7 @@ async def test_is_opening_closing(hass, setup_comp):
)
await hass.async_block_till_done()
# Both covers opening -> opening
assert hass.states.get(DEMO_COVER_POS).state == STATE_OPENING
assert hass.states.get(DEMO_COVER_TILT).state == STATE_OPENING
assert hass.states.get(COVER_GROUP).state == STATE_OPENING
@ -555,6 +556,7 @@ async def test_is_opening_closing(hass, setup_comp):
DOMAIN, SERVICE_CLOSE_COVER, {ATTR_ENTITY_ID: COVER_GROUP}, blocking=True
)
# Both covers closing -> closing
assert hass.states.get(DEMO_COVER_POS).state == STATE_CLOSING
assert hass.states.get(DEMO_COVER_TILT).state == STATE_CLOSING
assert hass.states.get(COVER_GROUP).state == STATE_CLOSING
@ -562,11 +564,44 @@ async def test_is_opening_closing(hass, setup_comp):
hass.states.async_set(DEMO_COVER_POS, STATE_OPENING, {ATTR_SUPPORTED_FEATURES: 11})
await hass.async_block_till_done()
# Closing + Opening -> Opening
assert hass.states.get(DEMO_COVER_TILT).state == STATE_CLOSING
assert hass.states.get(DEMO_COVER_POS).state == STATE_OPENING
assert hass.states.get(COVER_GROUP).state == STATE_OPENING
hass.states.async_set(DEMO_COVER_POS, STATE_CLOSING, {ATTR_SUPPORTED_FEATURES: 11})
await hass.async_block_till_done()
# Both covers closing -> closing
assert hass.states.get(DEMO_COVER_TILT).state == STATE_CLOSING
assert hass.states.get(DEMO_COVER_POS).state == STATE_CLOSING
assert hass.states.get(COVER_GROUP).state == STATE_CLOSING
# Closed + Closing -> Closing
hass.states.async_set(DEMO_COVER_POS, STATE_CLOSED, {ATTR_SUPPORTED_FEATURES: 11})
await hass.async_block_till_done()
assert hass.states.get(DEMO_COVER_TILT).state == STATE_CLOSING
assert hass.states.get(DEMO_COVER_POS).state == STATE_CLOSED
assert hass.states.get(COVER_GROUP).state == STATE_CLOSING
# Open + Closing -> Closing
hass.states.async_set(DEMO_COVER_POS, STATE_OPEN, {ATTR_SUPPORTED_FEATURES: 11})
await hass.async_block_till_done()
assert hass.states.get(DEMO_COVER_TILT).state == STATE_CLOSING
assert hass.states.get(DEMO_COVER_POS).state == STATE_OPEN
assert hass.states.get(COVER_GROUP).state == STATE_CLOSING
# Closed + Opening -> Closing
hass.states.async_set(DEMO_COVER_TILT, STATE_OPENING, {ATTR_SUPPORTED_FEATURES: 11})
hass.states.async_set(DEMO_COVER_POS, STATE_CLOSED, {ATTR_SUPPORTED_FEATURES: 11})
await hass.async_block_till_done()
assert hass.states.get(DEMO_COVER_TILT).state == STATE_OPENING
assert hass.states.get(DEMO_COVER_POS).state == STATE_CLOSED
assert hass.states.get(COVER_GROUP).state == STATE_OPENING
# Open + Opening -> Closing
hass.states.async_set(DEMO_COVER_POS, STATE_OPEN, {ATTR_SUPPORTED_FEATURES: 11})
await hass.async_block_till_done()
assert hass.states.get(DEMO_COVER_TILT).state == STATE_OPENING
assert hass.states.get(DEMO_COVER_POS).state == STATE_OPEN
assert hass.states.get(COVER_GROUP).state == STATE_OPENING