214 lines
6.4 KiB
214 lines
6.4 KiB
Provides functionality to group devices that can be turned on or off.
import logging
import homeassistant as ha
import homeassistant.util as util
from homeassistant.const import (
DOMAIN = "group"
ATTR_AUTO = "auto"
# List of ON/OFF state tuples for groupable states
_GROUPS = {}
def _get_group_on_off(state):
""" Determine the group on/off states based on a state. """
for states in _GROUP_TYPES:
if state in states:
return states
return None, None
def is_on(hass, entity_id):
""" Returns if the group state is in its ON-state. """
state = hass.states.get(entity_id)
if state:
group_on, _ = _get_group_on_off(state.state)
# If we found a group_type, compare to ON-state
return group_on is not None and state.state == group_on
return False
def expand_entity_ids(hass, entity_ids):
""" Returns the given list of entity ids and expands group ids into
the entity ids it represents if found. """
found_ids = []
for entity_id in entity_ids:
# If entity_id points at a group, expand it
domain, _ = util.split_entity_id(entity_id)
if domain == DOMAIN:
ent_id for ent_id
in get_entity_ids(hass, entity_id)
if ent_id not in found_ids)
if entity_id not in found_ids:
except AttributeError:
# Raised by util.split_entity_id if entity_id is not a string
return found_ids
def get_entity_ids(hass, entity_id, domain_filter=None):
""" Get the entity ids that make up this group. """
entity_ids = hass.states.get(entity_id).attributes[ATTR_ENTITY_ID]
if domain_filter:
return [ent_id for ent_id in entity_ids
if ent_id.startswith(domain_filter)]
return entity_ids
except (AttributeError, KeyError):
# AttributeError if state did not exist
# KeyError if key did not exist in attributes
return []
def setup(hass, config):
""" Sets up all groups found definded in the configuration. """
for name, entity_ids in config.get(DOMAIN, {}).items():
entity_ids = entity_ids.split(",")
setup_group(hass, name, entity_ids)
return True
def setup_group(hass, name, entity_ids, user_defined=True):
""" Sets up a group state that is the combined state of
several states. Supports ON/OFF and DEVICE_HOME/DEVICE_NOT_HOME. """
logger = logging.getLogger(__name__)
# In case an iterable is passed in
entity_ids = list(entity_ids)
if not entity_ids:
'Error setting up group %s: no entities passed in to track', name)
return False
# Loop over the given entities to:
# - determine which group type this is (on_off, device_home)
# - determine which states exist and have groupable states
# - determine the current state of the group
warnings = []
group_ids = []
group_on, group_off = None, None
group_state = False
for entity_id in entity_ids:
state = hass.states.get(entity_id)
# Try to determine group type if we didn't yet
if group_on is None and state:
group_on, group_off = _get_group_on_off(state.state)
if group_on is None:
# We did not find a matching group_type
"Entity {} has ungroupable state '{}'".format(
name, state.state))
# Check if entity exists
if not state:
warnings.append("Entity {} does not exist".format(entity_id))
# Check if entity is invalid state
elif state.state != group_off and state.state != group_on:
warnings.append("State of {} is {} (expected: {} or {})".format(
entity_id, state.state, group_off, group_on))
# We have a valid group state
# Keep track of the group state to init later on
group_state = group_state or state.state == group_on
# If none of the entities could be found during setup
if not group_ids:
logger.error('Unable to find any entities to track for group %s', name)
return False
elif warnings:
'Warnings during setting up group %s: %s',
name, ", ".join(warnings))
group_entity_id = ENTITY_ID_FORMAT.format(util.slugify(name))
state = group_on if group_state else group_off
state_attr = {ATTR_ENTITY_ID: entity_ids, ATTR_AUTO: not user_defined}
# pylint: disable=unused-argument
def update_group_state(entity_id, old_state, new_state):
""" Updates the group state based on a state change by
a tracked entity. """
cur_gr_state = hass.states.get(group_entity_id).state
# if cur_gr_state = OFF and new_state = ON: set ON
# if cur_gr_state = ON and new_state = OFF: research
# else: ignore
if cur_gr_state == group_off and new_state.state == group_on:
hass.states.set(group_entity_id, group_on, state_attr)
elif cur_gr_state == group_on and new_state.state == group_off:
# Check if any of the other states is still on
if not any([hass.states.is_state(ent_id, group_on)
for ent_id in group_ids
if entity_id != ent_id]):
hass.states.set(group_entity_id, group_off, state_attr)
_GROUPS[group_entity_id] = hass.states.track_change(
group_ids, update_group_state)
hass.states.set(group_entity_id, state, state_attr)
return True
def remove_group(hass, name):
""" Remove a group and its state listener from Home Assistant. """
group_entity_id = ENTITY_ID_FORMAT.format(util.slugify(name))
if hass.states.get(group_entity_id) is not None:
if group_entity_id in _GROUPS:
ha.EVENT_STATE_CHANGED, _GROUPS.pop(group_entity_id))