diff --git a/homeassistant/components/group.py b/homeassistant/components/group.py index a851726e69d..3d157f32eea 100644 --- a/homeassistant/components/group.py +++ b/homeassistant/components/group.py @@ -9,7 +9,8 @@ https://home-assistant.io/components/group/ import homeassistant.core as ha from homeassistant.const import ( ATTR_ENTITY_ID, CONF_ICON, CONF_NAME, STATE_CLOSED, STATE_HOME, - STATE_NOT_HOME, STATE_OFF, STATE_ON, STATE_OPEN, STATE_UNKNOWN) + STATE_NOT_HOME, STATE_OFF, STATE_ON, STATE_OPEN, STATE_UNKNOWN, + ATTR_ASSUMED_STATE, ) from homeassistant.helpers.entity import ( Entity, generate_entity_id, split_entity_id) from homeassistant.helpers.event import track_state_change @@ -144,6 +145,7 @@ class Group(Entity): self.tracking = [] self.group_on = None self.group_off = None + self._assumed_state = False if entity_ids is not None: self.update_tracked_entity_ids(entity_ids) @@ -182,6 +184,11 @@ class Group(Entity): data[ATTR_VIEW] = True return data + @property + def assumed_state(self): + """Return True if unable to access real state of entity.""" + return self._assumed_state + def update_tracked_entity_ids(self, entity_ids): """ Update the tracked entity IDs. """ self.stop() @@ -207,47 +214,77 @@ class Group(Entity): def update(self): """ Query all the tracked states and determine current group state. """ self._state = STATE_UNKNOWN + self._update_group_state() + + def _state_changed_listener(self, entity_id, old_state, new_state): + """ Listener to receive state changes of tracked entities. """ + self._update_group_state(new_state) + self.update_ha_state() + + @property + def _tracking_states(self): + """States that the group is tracking.""" + states = [] for entity_id in self.tracking: state = self.hass.states.get(entity_id) if state is not None: - self._process_tracked_state(state) + states.append(state) - def _state_changed_listener(self, entity_id, old_state, new_state): - """ Listener to receive state changes of tracked entities. """ - self._process_tracked_state(new_state) - self.update_ha_state() + return states - def _process_tracked_state(self, tr_state): - """ Updates group state based on a new state of a tracked entity. """ + def _update_group_state(self, tr_state=None): + """Update group state. + + Optionally you can provide the only state changed since last update + allowing this method to take shortcuts. + """ + # pylint: disable=too-many-branches + # To store current states of group entities. Might not be needed. + states = None + gr_state, gr_on, gr_off = self._state, self.group_on, self.group_off # We have not determined type of group yet - if self.group_on is None: - self.group_on, self.group_off = _get_group_on_off(tr_state.state) + if gr_on is None: + if tr_state is None: + states = self._tracking_states - if self.group_on is not None: - # New state of the group is going to be based on the first - # state that we can recognize - self._state = tr_state.state + for state in states: + gr_on, gr_off = \ + _get_group_on_off(state.state) + if gr_on is not None: + break + else: + gr_on, gr_off = _get_group_on_off(tr_state.state) + if gr_on is not None: + self.group_on, self.group_off = gr_on, gr_off + + # We cannot determine state of the group + if gr_on is None: return - # There is already a group state - cur_gr_state = self._state - group_on, group_off = self.group_on, self.group_off + if tr_state is None or (gr_state == gr_on and + tr_state.state == gr_off): + if states is None: + states = self._tracking_states - # if cur_gr_state = OFF and tr_state = ON: set ON - # if cur_gr_state = ON and tr_state = OFF: research - # else: ignore + if any(state.state == gr_on for state in states): + self._state = gr_on + else: + self._state = gr_off - if cur_gr_state == group_off and tr_state.state == group_on: - self._state = group_on + elif tr_state.state in (gr_on, gr_off): + self._state = tr_state.state - elif cur_gr_state == group_on and tr_state.state == group_off: + if tr_state is None or self._assumed_state and \ + not tr_state.attributes.get(ATTR_ASSUMED_STATE): + if states is None: + states = self._tracking_states - # Set to off if no other states are on - if not any(self.hass.states.is_state(ent_id, group_on) - for ent_id in self.tracking - if tr_state.entity_id != ent_id): - self._state = group_off + self._assumed_state = any(state.attributes.get(ATTR_ASSUMED_STATE) + for state in states) + + elif tr_state.attributes.get(ATTR_ASSUMED_STATE): + self._assumed_state = True diff --git a/tests/components/test_group.py b/tests/components/test_group.py index 8a3eeadcb14..8e5a5d8e0c8 100644 --- a/tests/components/test_group.py +++ b/tests/components/test_group.py @@ -8,7 +8,8 @@ Tests the group compoments. import unittest from homeassistant.const import ( - STATE_ON, STATE_OFF, STATE_HOME, STATE_UNKNOWN, ATTR_ICON, ATTR_HIDDEN) + STATE_ON, STATE_OFF, STATE_HOME, STATE_UNKNOWN, ATTR_ICON, ATTR_HIDDEN, + ATTR_ASSUMED_STATE, ) import homeassistant.components.group as group from tests.common import get_test_home_assistant @@ -21,19 +22,13 @@ class TestComponentsGroup(unittest.TestCase): """ Init needed objects. """ self.hass = get_test_home_assistant() - self.hass.states.set('light.Bowl', STATE_ON) - self.hass.states.set('light.Ceiling', STATE_OFF) - test_group = group.Group( - self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False) - - self.group_entity_id = test_group.entity_id - def tearDown(self): # pylint: disable=invalid-name """ Stop down stuff we started. """ self.hass.stop() def test_setup_group_with_mixed_groupable_states(self): """ Try to setup a group with mixed groupable states """ + self.hass.states.set('light.Bowl', STATE_ON) self.hass.states.set('device_tracker.Paulus', STATE_HOME) group.Group( self.hass, 'person_and_light', @@ -46,6 +41,8 @@ class TestComponentsGroup(unittest.TestCase): def test_setup_group_with_a_non_existing_state(self): """ Try to setup a group with a non existing state """ + self.hass.states.set('light.Bowl', STATE_ON) + grp = group.Group( self.hass, 'light_and_nothing', ['light.Bowl', 'non.existing']) @@ -70,11 +67,15 @@ class TestComponentsGroup(unittest.TestCase): def test_monitor_group(self): """ Test if the group keeps track of states. """ + self.hass.states.set('light.Bowl', STATE_ON) + self.hass.states.set('light.Ceiling', STATE_OFF) + test_group = group.Group( + self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False) # Test if group setup in our init mode is ok - self.assertIn(self.group_entity_id, self.hass.states.entity_ids()) + self.assertIn(test_group.entity_id, self.hass.states.entity_ids()) - group_state = self.hass.states.get(self.group_entity_id) + group_state = self.hass.states.get(test_group.entity_id) self.assertEqual(STATE_ON, group_state.state) self.assertTrue(group_state.attributes.get(group.ATTR_AUTO)) @@ -83,54 +84,73 @@ class TestComponentsGroup(unittest.TestCase): Test if the group turns off if the last device that was on turns off. """ self.hass.states.set('light.Bowl', STATE_OFF) + self.hass.states.set('light.Ceiling', STATE_OFF) + test_group = group.Group( + self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False) self.hass.pool.block_till_done() - group_state = self.hass.states.get(self.group_entity_id) + group_state = self.hass.states.get(test_group.entity_id) self.assertEqual(STATE_OFF, group_state.state) def test_group_turns_on_if_all_are_off_and_one_turns_on(self): """ Test if group turns on if all devices were turned off and one turns on. """ - # Make sure all are off. self.hass.states.set('light.Bowl', STATE_OFF) - self.hass.pool.block_till_done() + self.hass.states.set('light.Ceiling', STATE_OFF) + test_group = group.Group( + self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False) # Turn one on self.hass.states.set('light.Ceiling', STATE_ON) self.hass.pool.block_till_done() - group_state = self.hass.states.get(self.group_entity_id) + group_state = self.hass.states.get(test_group.entity_id) self.assertEqual(STATE_ON, group_state.state) def test_is_on(self): """ Test is_on method. """ - self.assertTrue(group.is_on(self.hass, self.group_entity_id)) + self.hass.states.set('light.Bowl', STATE_ON) + self.hass.states.set('light.Ceiling', STATE_OFF) + test_group = group.Group( + self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False) + + self.assertTrue(group.is_on(self.hass, test_group.entity_id)) self.hass.states.set('light.Bowl', STATE_OFF) self.hass.pool.block_till_done() - self.assertFalse(group.is_on(self.hass, self.group_entity_id)) + self.assertFalse(group.is_on(self.hass, test_group.entity_id)) # Try on non existing state self.assertFalse(group.is_on(self.hass, 'non.existing')) def test_expand_entity_ids(self): """ Test expand_entity_ids method. """ + self.hass.states.set('light.Bowl', STATE_ON) + self.hass.states.set('light.Ceiling', STATE_OFF) + test_group = group.Group( + self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False) + self.assertEqual(sorted(['light.ceiling', 'light.bowl']), sorted(group.expand_entity_ids( - self.hass, [self.group_entity_id]))) + self.hass, [test_group.entity_id]))) def test_expand_entity_ids_does_not_return_duplicates(self): """ Test that expand_entity_ids does not return duplicates. """ - self.assertEqual( - ['light.bowl', 'light.ceiling'], - sorted(group.expand_entity_ids( - self.hass, [self.group_entity_id, 'light.Ceiling']))) + self.hass.states.set('light.Bowl', STATE_ON) + self.hass.states.set('light.Ceiling', STATE_OFF) + test_group = group.Group( + self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False) self.assertEqual( ['light.bowl', 'light.ceiling'], sorted(group.expand_entity_ids( - self.hass, ['light.bowl', self.group_entity_id]))) + self.hass, [test_group.entity_id, 'light.Ceiling']))) + + self.assertEqual( + ['light.bowl', 'light.ceiling'], + sorted(group.expand_entity_ids( + self.hass, ['light.bowl', test_group.entity_id]))) def test_expand_entity_ids_ignores_non_strings(self): """ Test that non string elements in lists are ignored. """ @@ -138,9 +158,14 @@ class TestComponentsGroup(unittest.TestCase): def test_get_entity_ids(self): """ Test get_entity_ids method. """ + self.hass.states.set('light.Bowl', STATE_ON) + self.hass.states.set('light.Ceiling', STATE_OFF) + test_group = group.Group( + self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False) + self.assertEqual( ['light.bowl', 'light.ceiling'], - sorted(group.get_entity_ids(self.hass, self.group_entity_id))) + sorted(group.get_entity_ids(self.hass, test_group.entity_id))) def test_get_entity_ids_with_domain_filter(self): """ Test if get_entity_ids works with a domain_filter. """ @@ -190,13 +215,18 @@ class TestComponentsGroup(unittest.TestCase): def test_setup(self): """ Test setup method. """ + self.hass.states.set('light.Bowl', STATE_ON) + self.hass.states.set('light.Ceiling', STATE_OFF) + test_group = group.Group( + self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False) + self.assertTrue( group.setup( self.hass, { group.DOMAIN: { 'second_group': { - 'entities': 'light.Bowl, ' + self.group_entity_id, + 'entities': 'light.Bowl, ' + test_group.entity_id, 'icon': 'mdi:work', 'view': True, }, @@ -207,7 +237,7 @@ class TestComponentsGroup(unittest.TestCase): group_state = self.hass.states.get( group.ENTITY_ID_FORMAT.format('second_group')) self.assertEqual(STATE_ON, group_state.state) - self.assertEqual(set((self.group_entity_id, 'light.bowl')), + self.assertEqual(set((test_group.entity_id, 'light.bowl')), set(group_state.attributes['entity_id'])) self.assertIsNone(group_state.attributes.get(group.ATTR_AUTO)) self.assertEqual('mdi:work', @@ -242,3 +272,27 @@ class TestComponentsGroup(unittest.TestCase): ['light.test_1', 'light.test_2', 'switch.test_1', 'switch.test_2'], sorted(group.expand_entity_ids(self.hass, ['group.group_of_groups']))) + + def test_set_assumed_state_based_on_tracked(self): + self.hass.states.set('light.Bowl', STATE_ON) + self.hass.states.set('light.Ceiling', STATE_OFF) + test_group = group.Group( + self.hass, 'init_group', + ['light.Bowl', 'light.Ceiling', 'sensor.no_exist']) + + state = self.hass.states.get(test_group.entity_id) + self.assertIsNone(state.attributes.get(ATTR_ASSUMED_STATE)) + + self.hass.states.set('light.Bowl', STATE_ON, { + ATTR_ASSUMED_STATE: True + }) + self.hass.pool.block_till_done() + + state = self.hass.states.get(test_group.entity_id) + self.assertTrue(state.attributes.get(ATTR_ASSUMED_STATE)) + + self.hass.states.set('light.Bowl', STATE_ON) + self.hass.pool.block_till_done() + + state = self.hass.states.get(test_group.entity_id) + self.assertIsNone(state.attributes.get(ATTR_ASSUMED_STATE))