diff --git a/homeassistant/components/history.py b/homeassistant/components/history.py index 8ab91b08a3d..b5ac37b1451 100644 --- a/homeassistant/components/history.py +++ b/homeassistant/components/history.py @@ -118,6 +118,30 @@ def state_changes_during_period(hass, start_time, end_time=None, return states_to_json(hass, states, start_time, entity_ids) +def get_last_state_changes(hass, number_of_states, entity_id): + """Return the last number_of_states.""" + from homeassistant.components.recorder.models import States + + start_time = dt_util.utcnow() + + with session_scope(hass=hass) as session: + query = session.query(States).filter( + (States.last_changed == States.last_updated)) + + if entity_id is not None: + query = query.filter_by(entity_id=entity_id.lower()) + + entity_ids = [entity_id] if entity_id is not None else None + + states = execute( + query.order_by(States.last_updated.desc()).limit(number_of_states)) + + return states_to_json(hass, reversed(states), + start_time, + entity_ids, + include_start_time_state=False) + + def get_states(hass, utc_point_in_time, entity_ids=None, run=None, filters=None): """Return the states at a specific point in time.""" diff --git a/homeassistant/components/sensor/filter.py b/homeassistant/components/sensor/filter.py index 3faf51a5f47..27730a8f63e 100644 --- a/homeassistant/components/sensor/filter.py +++ b/homeassistant/components/sensor/filter.py @@ -8,6 +8,9 @@ import logging import statistics from collections import deque, Counter from numbers import Number +from functools import partial +from copy import copy +from datetime import timedelta import voluptuous as vol @@ -20,6 +23,7 @@ import homeassistant.helpers.config_validation as cv from homeassistant.util.decorator import Registry from homeassistant.helpers.entity import Entity from homeassistant.helpers.event import async_track_state_change +import homeassistant.components.history as history import homeassistant.util.dt as dt_util _LOGGER = logging.getLogger(__name__) @@ -40,6 +44,9 @@ CONF_TIME_SMA_TYPE = 'type' TIME_SMA_LAST = 'last' +WINDOW_SIZE_UNIT_NUMBER_EVENTS = 1 +WINDOW_SIZE_UNIT_TIME = 2 + DEFAULT_WINDOW_SIZE = 1 DEFAULT_PRECISION = 2 DEFAULT_FILTER_RADIUS = 2.0 @@ -123,21 +130,22 @@ class SensorFilter(Entity): async def async_added_to_hass(self): """Register callbacks.""" @callback - def filter_sensor_state_listener(entity, old_state, new_state): + def filter_sensor_state_listener(entity, old_state, new_state, + update_ha=True): """Handle device state changes.""" if new_state.state in [STATE_UNKNOWN, STATE_UNAVAILABLE]: return - temp_state = new_state.state + temp_state = new_state try: for filt in self._filters: - filtered_state = filt.filter_state(temp_state) + filtered_state = filt.filter_state(copy(temp_state)) _LOGGER.debug("%s(%s=%s) -> %s", filt.name, self._entity, - temp_state, + temp_state.state, "skip" if filt.skip_processing else - filtered_state) + filtered_state.state) if filt.skip_processing: return temp_state = filtered_state @@ -146,7 +154,7 @@ class SensorFilter(Entity): self._state) return - self._state = temp_state + self._state = temp_state.state if self._icon is None: self._icon = new_state.attributes.get( @@ -156,7 +164,50 @@ class SensorFilter(Entity): self._unit_of_measurement = new_state.attributes.get( ATTR_UNIT_OF_MEASUREMENT) - self.async_schedule_update_ha_state() + if update_ha: + self.async_schedule_update_ha_state() + + if 'recorder' in self.hass.config.components: + history_list = [] + largest_window_items = 0 + largest_window_time = timedelta(0) + + # Determine the largest window_size by type + for filt in self._filters: + if filt.window_unit == WINDOW_SIZE_UNIT_NUMBER_EVENTS\ + and largest_window_items < filt.window_size: + largest_window_items = filt.window_size + elif filt.window_unit == WINDOW_SIZE_UNIT_TIME\ + and largest_window_time < filt.window_size: + largest_window_time = filt.window_size + + # Retrieve the largest window_size of each type + if largest_window_items > 0: + filter_history = await self.hass.async_add_job(partial( + history.get_last_state_changes, self.hass, + largest_window_items, entity_id=self._entity)) + history_list.extend( + [state for state in filter_history[self._entity]]) + if largest_window_time > timedelta(seconds=0): + start = dt_util.utcnow() - largest_window_time + filter_history = await self.hass.async_add_job(partial( + history.state_changes_during_period, self.hass, + start, entity_id=self._entity)) + history_list.extend( + [state for state in filter_history[self._entity] + if state not in history_list]) + + # Sort the window states + history_list = sorted(history_list, key=lambda s: s.last_updated) + _LOGGER.debug("Loading from history: %s", + [(s.state, s.last_updated) for s in history_list]) + + # Replay history through the filter chain + prev_state = None + for state in history_list: + filter_sensor_state_listener( + self._entity, prev_state, state, False) + prev_state = state async_track_state_change( self.hass, self._entity, filter_sensor_state_listener) @@ -195,6 +246,31 @@ class SensorFilter(Entity): return state_attr +class FilterState(object): + """State abstraction for filter usage.""" + + def __init__(self, state): + """Initialize with HA State object.""" + self.timestamp = state.last_updated + try: + self.state = float(state.state) + except ValueError: + self.state = state.state + + def set_precision(self, precision): + """Set precision of Number based states.""" + if isinstance(self.state, Number): + self.state = round(float(self.state), precision) + + def __str__(self): + """Return state as the string representation of FilterState.""" + return str(self.state) + + def __repr__(self): + """Return timestamp and state as the representation of FilterState.""" + return "{} : {}".format(self.timestamp, self.state) + + class Filter(object): """Filter skeleton. @@ -207,11 +283,22 @@ class Filter(object): def __init__(self, name, window_size=1, precision=None, entity=None): """Initialize common attributes.""" - self.states = deque(maxlen=window_size) + if isinstance(window_size, int): + self.states = deque(maxlen=window_size) + self.window_unit = WINDOW_SIZE_UNIT_NUMBER_EVENTS + else: + self.states = deque(maxlen=0) + self.window_unit = WINDOW_SIZE_UNIT_TIME self.precision = precision self._name = name self._entity = entity self._skip_processing = False + self._window_size = window_size + + @property + def window_size(self): + """Return window size.""" + return self._window_size @property def name(self): @@ -229,11 +316,11 @@ class Filter(object): def filter_state(self, new_state): """Implement a common interface for filters.""" - filtered = self._filter_state(new_state) - if isinstance(filtered, Number): - filtered = round(float(filtered), self.precision) - self.states.append(filtered) - return filtered + filtered = self._filter_state(FilterState(new_state)) + filtered.set_precision(self.precision) + self.states.append(copy(filtered)) + new_state.state = filtered.state + return new_state @FILTERS.register(FILTER_NAME_OUTLIER) @@ -254,11 +341,10 @@ class OutlierFilter(Filter): def _filter_state(self, new_state): """Implement the outlier filter.""" - new_state = float(new_state) - if (self.states and - abs(new_state - statistics.median(self.states)) - > self._radius): + abs(new_state.state - + statistics.median([s.state for s in self.states])) > + self._radius): self._stats_internal['erasures'] += 1 @@ -284,16 +370,15 @@ class LowPassFilter(Filter): def _filter_state(self, new_state): """Implement the low pass filter.""" - new_state = float(new_state) - if not self.states: return new_state new_weight = 1.0 / self._time_constant prev_weight = 1.0 - new_weight - filtered = prev_weight * self.states[-1] + new_weight * new_state + new_state.state = prev_weight * self.states[-1].state +\ + new_weight * new_state.state - return filtered + return new_state @FILTERS.register(FILTER_NAME_TIME_SMA) @@ -308,35 +393,36 @@ class TimeSMAFilter(Filter): def __init__(self, window_size, precision, entity, type): """Initialize Filter.""" - super().__init__(FILTER_NAME_TIME_SMA, 0, precision, entity) - self._time_window = int(window_size.total_seconds()) + super().__init__(FILTER_NAME_TIME_SMA, window_size, precision, entity) + self._time_window = window_size self.last_leak = None self.queue = deque() - def _leak(self, now): + def _leak(self, left_boundary): """Remove timeouted elements.""" while self.queue: - timestamp, _ = self.queue[0] - if timestamp + self._time_window <= now: + if self.queue[0].timestamp + self._time_window <= left_boundary: self.last_leak = self.queue.popleft() else: return def _filter_state(self, new_state): - now = int(dt_util.utcnow().timestamp()) + """Implement the Simple Moving Average filter.""" + self._leak(new_state.timestamp) + self.queue.append(copy(new_state)) - self._leak(now) - self.queue.append((now, float(new_state))) moving_sum = 0 - start = now - self._time_window - _, prev_val = self.last_leak or (0, float(new_state)) + start = new_state.timestamp - self._time_window + prev_state = self.last_leak or self.queue[0] + for state in self.queue: + moving_sum += (state.timestamp-start).total_seconds()\ + * prev_state.state + start = state.timestamp + prev_state = state - for timestamp, val in self.queue: - moving_sum += (timestamp-start)*prev_val - start, prev_val = timestamp, val - moving_sum += (now-start)*prev_val + new_state.state = moving_sum / self._time_window.total_seconds() - return moving_sum/self._time_window + return new_state @FILTERS.register(FILTER_NAME_THROTTLE) diff --git a/tests/components/sensor/test_filter.py b/tests/components/sensor/test_filter.py index 0d4082731ab..8b8e7607b07 100644 --- a/tests/components/sensor/test_filter.py +++ b/tests/components/sensor/test_filter.py @@ -7,7 +7,9 @@ from homeassistant.components.sensor.filter import ( LowPassFilter, OutlierFilter, ThrottleFilter, TimeSMAFilter) import homeassistant.util.dt as dt_util from homeassistant.setup import setup_component -from tests.common import get_test_home_assistant, assert_setup_component +import homeassistant.core as ha +from tests.common import (get_test_home_assistant, assert_setup_component, + init_recorder_component) class TestFilterSensor(unittest.TestCase): @@ -16,12 +18,24 @@ class TestFilterSensor(unittest.TestCase): def setup_method(self, method): """Setup things to be run when tests are started.""" self.hass = get_test_home_assistant() - self.values = [20, 19, 18, 21, 22, 0] + raw_values = [20, 19, 18, 21, 22, 0] + self.values = [] + + timestamp = dt_util.utcnow() + for val in raw_values: + self.values.append(ha.State('sensor.test_monitored', + val, last_updated=timestamp)) + timestamp += timedelta(minutes=1) def teardown_method(self, method): """Stop everything that was started.""" self.hass.stop() + def init_recorder(self): + """Initialize the recorder.""" + init_recorder_component(self.hass) + self.hass.start() + def test_setup_fail(self): """Test if filter doesn't exist.""" config = { @@ -36,31 +50,52 @@ class TestFilterSensor(unittest.TestCase): def test_chain(self): """Test if filter chaining works.""" + self.init_recorder() config = { + 'history': { + }, 'sensor': { 'platform': 'filter', 'name': 'test', 'entity_id': 'sensor.test_monitored', + 'history_period': '00:05', 'filters': [{ 'filter': 'outlier', + 'window_size': 10, 'radius': 4.0 }, { 'filter': 'lowpass', - 'window_size': 4, 'time_constant': 10, 'precision': 2 }] } } - with assert_setup_component(1): - assert setup_component(self.hass, 'sensor', config) + t_0 = dt_util.utcnow() - timedelta(minutes=1) + t_1 = dt_util.utcnow() - timedelta(minutes=2) + t_2 = dt_util.utcnow() - timedelta(minutes=3) - for value in self.values: - self.hass.states.set(config['sensor']['entity_id'], value) - self.hass.block_till_done() + fake_states = { + 'sensor.test_monitored': [ + ha.State('sensor.test_monitored', 18.0, last_changed=t_0), + ha.State('sensor.test_monitored', 19.0, last_changed=t_1), + ha.State('sensor.test_monitored', 18.2, last_changed=t_2), + ] + } - state = self.hass.states.get('sensor.test') - self.assertEqual('20.25', state.state) + with patch('homeassistant.components.history.' + 'state_changes_during_period', return_value=fake_states): + with patch('homeassistant.components.history.' + 'get_last_state_changes', return_value=fake_states): + with assert_setup_component(1, 'sensor'): + assert setup_component(self.hass, 'sensor', config) + + for value in self.values: + self.hass.states.set( + config['sensor']['entity_id'], value.state) + self.hass.block_till_done() + + state = self.hass.states.get('sensor.test') + self.assertEqual('19.25', state.state) def test_outlier(self): """Test if outlier filter works.""" @@ -70,7 +105,7 @@ class TestFilterSensor(unittest.TestCase): radius=4.0) for state in self.values: filtered = filt.filter_state(state) - self.assertEqual(22, filtered) + self.assertEqual(22, filtered.state) def test_lowpass(self): """Test if lowpass filter works.""" @@ -80,7 +115,7 @@ class TestFilterSensor(unittest.TestCase): time_constant=10) for state in self.values: filtered = filt.filter_state(state) - self.assertEqual(18.05, filtered) + self.assertEqual(18.05, filtered.state) def test_throttle(self): """Test if lowpass filter works.""" @@ -92,7 +127,7 @@ class TestFilterSensor(unittest.TestCase): new_state = filt.filter_state(state) if not filt.skip_processing: filtered.append(new_state) - self.assertEqual([20, 21], filtered) + self.assertEqual([20, 21], [f.state for f in filtered]) def test_time_sma(self): """Test if time_sma filter works.""" @@ -100,9 +135,6 @@ class TestFilterSensor(unittest.TestCase): precision=2, entity=None, type='last') - past = dt_util.utcnow() - timedelta(minutes=5) for state in self.values: - with patch('homeassistant.util.dt.utcnow', return_value=past): - filtered = filt.filter_state(state) - past += timedelta(minutes=1) - self.assertEqual(21.5, filtered) + filtered = filt.filter_state(state) + self.assertEqual(21.5, filtered.state) diff --git a/tests/components/test_history.py b/tests/components/test_history.py index bea2af396cb..5d909492380 100644 --- a/tests/components/test_history.py +++ b/tests/components/test_history.py @@ -131,6 +131,39 @@ class TestComponentHistory(unittest.TestCase): self.assertEqual(states, hist[entity_id]) + def test_get_last_state_changes(self): + """Test number of state changes.""" + self.init_recorder() + entity_id = 'sensor.test' + + def set_state(state): + """Set the state.""" + self.hass.states.set(entity_id, state) + self.wait_recording_done() + return self.hass.states.get(entity_id) + + start = dt_util.utcnow() - timedelta(minutes=2) + point = start + timedelta(minutes=1) + point2 = point + timedelta(minutes=1) + + with patch('homeassistant.components.recorder.dt_util.utcnow', + return_value=start): + set_state('1') + + states = [] + with patch('homeassistant.components.recorder.dt_util.utcnow', + return_value=point): + states.append(set_state('2')) + + with patch('homeassistant.components.recorder.dt_util.utcnow', + return_value=point2): + states.append(set_state('3')) + + hist = history.get_last_state_changes( + self.hass, 2, entity_id) + + self.assertEqual(states, hist[entity_id]) + def test_get_significant_states(self): """Test that only significant states are returned.