Initialise filter_sensor with historical values (#13075)
* Initialise filter with historical values Added get_last_state_changes() * fix test * Major changes to accommodate history + time_SMA # Conflicts: # homeassistant/components/sensor/filter.py * hail the hound! * lint fixed * less debug * ups * get state from the proper entity * sensible default * No defaults in get_last_state_changes * list_reverseiterator instead of list * prev_state to state * Initialise filter with historical values Added get_last_state_changes() * fix test * Major changes to accommodate history + time_SMA # Conflicts: # homeassistant/components/sensor/filter.py * hail the hound! * lint fixed * less debug * ups * get state from the proper entity * sensible default * No defaults in get_last_state_changes * list_reverseiterator instead of list * prev_state to state * update * added window_unit * replace isinstance with window_unit
This commit is contained in:
parent
fdf93d1829
commit
286476f0d6
4 changed files with 229 additions and 54 deletions
|
@ -118,6 +118,30 @@ def state_changes_during_period(hass, start_time, end_time=None,
|
|||
return states_to_json(hass, states, start_time, entity_ids)
|
||||
|
||||
|
||||
def get_last_state_changes(hass, number_of_states, entity_id):
|
||||
"""Return the last number_of_states."""
|
||||
from homeassistant.components.recorder.models import States
|
||||
|
||||
start_time = dt_util.utcnow()
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
query = session.query(States).filter(
|
||||
(States.last_changed == States.last_updated))
|
||||
|
||||
if entity_id is not None:
|
||||
query = query.filter_by(entity_id=entity_id.lower())
|
||||
|
||||
entity_ids = [entity_id] if entity_id is not None else None
|
||||
|
||||
states = execute(
|
||||
query.order_by(States.last_updated.desc()).limit(number_of_states))
|
||||
|
||||
return states_to_json(hass, reversed(states),
|
||||
start_time,
|
||||
entity_ids,
|
||||
include_start_time_state=False)
|
||||
|
||||
|
||||
def get_states(hass, utc_point_in_time, entity_ids=None, run=None,
|
||||
filters=None):
|
||||
"""Return the states at a specific point in time."""
|
||||
|
|
|
@ -8,6 +8,9 @@ import logging
|
|||
import statistics
|
||||
from collections import deque, Counter
|
||||
from numbers import Number
|
||||
from functools import partial
|
||||
from copy import copy
|
||||
from datetime import timedelta
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
|
@ -20,6 +23,7 @@ import homeassistant.helpers.config_validation as cv
|
|||
from homeassistant.util.decorator import Registry
|
||||
from homeassistant.helpers.entity import Entity
|
||||
from homeassistant.helpers.event import async_track_state_change
|
||||
import homeassistant.components.history as history
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
@ -40,6 +44,9 @@ CONF_TIME_SMA_TYPE = 'type'
|
|||
|
||||
TIME_SMA_LAST = 'last'
|
||||
|
||||
WINDOW_SIZE_UNIT_NUMBER_EVENTS = 1
|
||||
WINDOW_SIZE_UNIT_TIME = 2
|
||||
|
||||
DEFAULT_WINDOW_SIZE = 1
|
||||
DEFAULT_PRECISION = 2
|
||||
DEFAULT_FILTER_RADIUS = 2.0
|
||||
|
@ -123,21 +130,22 @@ class SensorFilter(Entity):
|
|||
async def async_added_to_hass(self):
|
||||
"""Register callbacks."""
|
||||
@callback
|
||||
def filter_sensor_state_listener(entity, old_state, new_state):
|
||||
def filter_sensor_state_listener(entity, old_state, new_state,
|
||||
update_ha=True):
|
||||
"""Handle device state changes."""
|
||||
if new_state.state in [STATE_UNKNOWN, STATE_UNAVAILABLE]:
|
||||
return
|
||||
|
||||
temp_state = new_state.state
|
||||
temp_state = new_state
|
||||
|
||||
try:
|
||||
for filt in self._filters:
|
||||
filtered_state = filt.filter_state(temp_state)
|
||||
filtered_state = filt.filter_state(copy(temp_state))
|
||||
_LOGGER.debug("%s(%s=%s) -> %s", filt.name,
|
||||
self._entity,
|
||||
temp_state,
|
||||
temp_state.state,
|
||||
"skip" if filt.skip_processing else
|
||||
filtered_state)
|
||||
filtered_state.state)
|
||||
if filt.skip_processing:
|
||||
return
|
||||
temp_state = filtered_state
|
||||
|
@ -146,7 +154,7 @@ class SensorFilter(Entity):
|
|||
self._state)
|
||||
return
|
||||
|
||||
self._state = temp_state
|
||||
self._state = temp_state.state
|
||||
|
||||
if self._icon is None:
|
||||
self._icon = new_state.attributes.get(
|
||||
|
@ -156,7 +164,50 @@ class SensorFilter(Entity):
|
|||
self._unit_of_measurement = new_state.attributes.get(
|
||||
ATTR_UNIT_OF_MEASUREMENT)
|
||||
|
||||
self.async_schedule_update_ha_state()
|
||||
if update_ha:
|
||||
self.async_schedule_update_ha_state()
|
||||
|
||||
if 'recorder' in self.hass.config.components:
|
||||
history_list = []
|
||||
largest_window_items = 0
|
||||
largest_window_time = timedelta(0)
|
||||
|
||||
# Determine the largest window_size by type
|
||||
for filt in self._filters:
|
||||
if filt.window_unit == WINDOW_SIZE_UNIT_NUMBER_EVENTS\
|
||||
and largest_window_items < filt.window_size:
|
||||
largest_window_items = filt.window_size
|
||||
elif filt.window_unit == WINDOW_SIZE_UNIT_TIME\
|
||||
and largest_window_time < filt.window_size:
|
||||
largest_window_time = filt.window_size
|
||||
|
||||
# Retrieve the largest window_size of each type
|
||||
if largest_window_items > 0:
|
||||
filter_history = await self.hass.async_add_job(partial(
|
||||
history.get_last_state_changes, self.hass,
|
||||
largest_window_items, entity_id=self._entity))
|
||||
history_list.extend(
|
||||
[state for state in filter_history[self._entity]])
|
||||
if largest_window_time > timedelta(seconds=0):
|
||||
start = dt_util.utcnow() - largest_window_time
|
||||
filter_history = await self.hass.async_add_job(partial(
|
||||
history.state_changes_during_period, self.hass,
|
||||
start, entity_id=self._entity))
|
||||
history_list.extend(
|
||||
[state for state in filter_history[self._entity]
|
||||
if state not in history_list])
|
||||
|
||||
# Sort the window states
|
||||
history_list = sorted(history_list, key=lambda s: s.last_updated)
|
||||
_LOGGER.debug("Loading from history: %s",
|
||||
[(s.state, s.last_updated) for s in history_list])
|
||||
|
||||
# Replay history through the filter chain
|
||||
prev_state = None
|
||||
for state in history_list:
|
||||
filter_sensor_state_listener(
|
||||
self._entity, prev_state, state, False)
|
||||
prev_state = state
|
||||
|
||||
async_track_state_change(
|
||||
self.hass, self._entity, filter_sensor_state_listener)
|
||||
|
@ -195,6 +246,31 @@ class SensorFilter(Entity):
|
|||
return state_attr
|
||||
|
||||
|
||||
class FilterState(object):
|
||||
"""State abstraction for filter usage."""
|
||||
|
||||
def __init__(self, state):
|
||||
"""Initialize with HA State object."""
|
||||
self.timestamp = state.last_updated
|
||||
try:
|
||||
self.state = float(state.state)
|
||||
except ValueError:
|
||||
self.state = state.state
|
||||
|
||||
def set_precision(self, precision):
|
||||
"""Set precision of Number based states."""
|
||||
if isinstance(self.state, Number):
|
||||
self.state = round(float(self.state), precision)
|
||||
|
||||
def __str__(self):
|
||||
"""Return state as the string representation of FilterState."""
|
||||
return str(self.state)
|
||||
|
||||
def __repr__(self):
|
||||
"""Return timestamp and state as the representation of FilterState."""
|
||||
return "{} : {}".format(self.timestamp, self.state)
|
||||
|
||||
|
||||
class Filter(object):
|
||||
"""Filter skeleton.
|
||||
|
||||
|
@ -207,11 +283,22 @@ class Filter(object):
|
|||
|
||||
def __init__(self, name, window_size=1, precision=None, entity=None):
|
||||
"""Initialize common attributes."""
|
||||
self.states = deque(maxlen=window_size)
|
||||
if isinstance(window_size, int):
|
||||
self.states = deque(maxlen=window_size)
|
||||
self.window_unit = WINDOW_SIZE_UNIT_NUMBER_EVENTS
|
||||
else:
|
||||
self.states = deque(maxlen=0)
|
||||
self.window_unit = WINDOW_SIZE_UNIT_TIME
|
||||
self.precision = precision
|
||||
self._name = name
|
||||
self._entity = entity
|
||||
self._skip_processing = False
|
||||
self._window_size = window_size
|
||||
|
||||
@property
|
||||
def window_size(self):
|
||||
"""Return window size."""
|
||||
return self._window_size
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
|
@ -229,11 +316,11 @@ class Filter(object):
|
|||
|
||||
def filter_state(self, new_state):
|
||||
"""Implement a common interface for filters."""
|
||||
filtered = self._filter_state(new_state)
|
||||
if isinstance(filtered, Number):
|
||||
filtered = round(float(filtered), self.precision)
|
||||
self.states.append(filtered)
|
||||
return filtered
|
||||
filtered = self._filter_state(FilterState(new_state))
|
||||
filtered.set_precision(self.precision)
|
||||
self.states.append(copy(filtered))
|
||||
new_state.state = filtered.state
|
||||
return new_state
|
||||
|
||||
|
||||
@FILTERS.register(FILTER_NAME_OUTLIER)
|
||||
|
@ -254,11 +341,10 @@ class OutlierFilter(Filter):
|
|||
|
||||
def _filter_state(self, new_state):
|
||||
"""Implement the outlier filter."""
|
||||
new_state = float(new_state)
|
||||
|
||||
if (self.states and
|
||||
abs(new_state - statistics.median(self.states))
|
||||
> self._radius):
|
||||
abs(new_state.state -
|
||||
statistics.median([s.state for s in self.states])) >
|
||||
self._radius):
|
||||
|
||||
self._stats_internal['erasures'] += 1
|
||||
|
||||
|
@ -284,16 +370,15 @@ class LowPassFilter(Filter):
|
|||
|
||||
def _filter_state(self, new_state):
|
||||
"""Implement the low pass filter."""
|
||||
new_state = float(new_state)
|
||||
|
||||
if not self.states:
|
||||
return new_state
|
||||
|
||||
new_weight = 1.0 / self._time_constant
|
||||
prev_weight = 1.0 - new_weight
|
||||
filtered = prev_weight * self.states[-1] + new_weight * new_state
|
||||
new_state.state = prev_weight * self.states[-1].state +\
|
||||
new_weight * new_state.state
|
||||
|
||||
return filtered
|
||||
return new_state
|
||||
|
||||
|
||||
@FILTERS.register(FILTER_NAME_TIME_SMA)
|
||||
|
@ -308,35 +393,36 @@ class TimeSMAFilter(Filter):
|
|||
|
||||
def __init__(self, window_size, precision, entity, type):
|
||||
"""Initialize Filter."""
|
||||
super().__init__(FILTER_NAME_TIME_SMA, 0, precision, entity)
|
||||
self._time_window = int(window_size.total_seconds())
|
||||
super().__init__(FILTER_NAME_TIME_SMA, window_size, precision, entity)
|
||||
self._time_window = window_size
|
||||
self.last_leak = None
|
||||
self.queue = deque()
|
||||
|
||||
def _leak(self, now):
|
||||
def _leak(self, left_boundary):
|
||||
"""Remove timeouted elements."""
|
||||
while self.queue:
|
||||
timestamp, _ = self.queue[0]
|
||||
if timestamp + self._time_window <= now:
|
||||
if self.queue[0].timestamp + self._time_window <= left_boundary:
|
||||
self.last_leak = self.queue.popleft()
|
||||
else:
|
||||
return
|
||||
|
||||
def _filter_state(self, new_state):
|
||||
now = int(dt_util.utcnow().timestamp())
|
||||
"""Implement the Simple Moving Average filter."""
|
||||
self._leak(new_state.timestamp)
|
||||
self.queue.append(copy(new_state))
|
||||
|
||||
self._leak(now)
|
||||
self.queue.append((now, float(new_state)))
|
||||
moving_sum = 0
|
||||
start = now - self._time_window
|
||||
_, prev_val = self.last_leak or (0, float(new_state))
|
||||
start = new_state.timestamp - self._time_window
|
||||
prev_state = self.last_leak or self.queue[0]
|
||||
for state in self.queue:
|
||||
moving_sum += (state.timestamp-start).total_seconds()\
|
||||
* prev_state.state
|
||||
start = state.timestamp
|
||||
prev_state = state
|
||||
|
||||
for timestamp, val in self.queue:
|
||||
moving_sum += (timestamp-start)*prev_val
|
||||
start, prev_val = timestamp, val
|
||||
moving_sum += (now-start)*prev_val
|
||||
new_state.state = moving_sum / self._time_window.total_seconds()
|
||||
|
||||
return moving_sum/self._time_window
|
||||
return new_state
|
||||
|
||||
|
||||
@FILTERS.register(FILTER_NAME_THROTTLE)
|
||||
|
|
|
@ -7,7 +7,9 @@ from homeassistant.components.sensor.filter import (
|
|||
LowPassFilter, OutlierFilter, ThrottleFilter, TimeSMAFilter)
|
||||
import homeassistant.util.dt as dt_util
|
||||
from homeassistant.setup import setup_component
|
||||
from tests.common import get_test_home_assistant, assert_setup_component
|
||||
import homeassistant.core as ha
|
||||
from tests.common import (get_test_home_assistant, assert_setup_component,
|
||||
init_recorder_component)
|
||||
|
||||
|
||||
class TestFilterSensor(unittest.TestCase):
|
||||
|
@ -16,12 +18,24 @@ class TestFilterSensor(unittest.TestCase):
|
|||
def setup_method(self, method):
|
||||
"""Setup things to be run when tests are started."""
|
||||
self.hass = get_test_home_assistant()
|
||||
self.values = [20, 19, 18, 21, 22, 0]
|
||||
raw_values = [20, 19, 18, 21, 22, 0]
|
||||
self.values = []
|
||||
|
||||
timestamp = dt_util.utcnow()
|
||||
for val in raw_values:
|
||||
self.values.append(ha.State('sensor.test_monitored',
|
||||
val, last_updated=timestamp))
|
||||
timestamp += timedelta(minutes=1)
|
||||
|
||||
def teardown_method(self, method):
|
||||
"""Stop everything that was started."""
|
||||
self.hass.stop()
|
||||
|
||||
def init_recorder(self):
|
||||
"""Initialize the recorder."""
|
||||
init_recorder_component(self.hass)
|
||||
self.hass.start()
|
||||
|
||||
def test_setup_fail(self):
|
||||
"""Test if filter doesn't exist."""
|
||||
config = {
|
||||
|
@ -36,31 +50,52 @@ class TestFilterSensor(unittest.TestCase):
|
|||
|
||||
def test_chain(self):
|
||||
"""Test if filter chaining works."""
|
||||
self.init_recorder()
|
||||
config = {
|
||||
'history': {
|
||||
},
|
||||
'sensor': {
|
||||
'platform': 'filter',
|
||||
'name': 'test',
|
||||
'entity_id': 'sensor.test_monitored',
|
||||
'history_period': '00:05',
|
||||
'filters': [{
|
||||
'filter': 'outlier',
|
||||
'window_size': 10,
|
||||
'radius': 4.0
|
||||
}, {
|
||||
'filter': 'lowpass',
|
||||
'window_size': 4,
|
||||
'time_constant': 10,
|
||||
'precision': 2
|
||||
}]
|
||||
}
|
||||
}
|
||||
with assert_setup_component(1):
|
||||
assert setup_component(self.hass, 'sensor', config)
|
||||
t_0 = dt_util.utcnow() - timedelta(minutes=1)
|
||||
t_1 = dt_util.utcnow() - timedelta(minutes=2)
|
||||
t_2 = dt_util.utcnow() - timedelta(minutes=3)
|
||||
|
||||
for value in self.values:
|
||||
self.hass.states.set(config['sensor']['entity_id'], value)
|
||||
self.hass.block_till_done()
|
||||
fake_states = {
|
||||
'sensor.test_monitored': [
|
||||
ha.State('sensor.test_monitored', 18.0, last_changed=t_0),
|
||||
ha.State('sensor.test_monitored', 19.0, last_changed=t_1),
|
||||
ha.State('sensor.test_monitored', 18.2, last_changed=t_2),
|
||||
]
|
||||
}
|
||||
|
||||
state = self.hass.states.get('sensor.test')
|
||||
self.assertEqual('20.25', state.state)
|
||||
with patch('homeassistant.components.history.'
|
||||
'state_changes_during_period', return_value=fake_states):
|
||||
with patch('homeassistant.components.history.'
|
||||
'get_last_state_changes', return_value=fake_states):
|
||||
with assert_setup_component(1, 'sensor'):
|
||||
assert setup_component(self.hass, 'sensor', config)
|
||||
|
||||
for value in self.values:
|
||||
self.hass.states.set(
|
||||
config['sensor']['entity_id'], value.state)
|
||||
self.hass.block_till_done()
|
||||
|
||||
state = self.hass.states.get('sensor.test')
|
||||
self.assertEqual('19.25', state.state)
|
||||
|
||||
def test_outlier(self):
|
||||
"""Test if outlier filter works."""
|
||||
|
@ -70,7 +105,7 @@ class TestFilterSensor(unittest.TestCase):
|
|||
radius=4.0)
|
||||
for state in self.values:
|
||||
filtered = filt.filter_state(state)
|
||||
self.assertEqual(22, filtered)
|
||||
self.assertEqual(22, filtered.state)
|
||||
|
||||
def test_lowpass(self):
|
||||
"""Test if lowpass filter works."""
|
||||
|
@ -80,7 +115,7 @@ class TestFilterSensor(unittest.TestCase):
|
|||
time_constant=10)
|
||||
for state in self.values:
|
||||
filtered = filt.filter_state(state)
|
||||
self.assertEqual(18.05, filtered)
|
||||
self.assertEqual(18.05, filtered.state)
|
||||
|
||||
def test_throttle(self):
|
||||
"""Test if lowpass filter works."""
|
||||
|
@ -92,7 +127,7 @@ class TestFilterSensor(unittest.TestCase):
|
|||
new_state = filt.filter_state(state)
|
||||
if not filt.skip_processing:
|
||||
filtered.append(new_state)
|
||||
self.assertEqual([20, 21], filtered)
|
||||
self.assertEqual([20, 21], [f.state for f in filtered])
|
||||
|
||||
def test_time_sma(self):
|
||||
"""Test if time_sma filter works."""
|
||||
|
@ -100,9 +135,6 @@ class TestFilterSensor(unittest.TestCase):
|
|||
precision=2,
|
||||
entity=None,
|
||||
type='last')
|
||||
past = dt_util.utcnow() - timedelta(minutes=5)
|
||||
for state in self.values:
|
||||
with patch('homeassistant.util.dt.utcnow', return_value=past):
|
||||
filtered = filt.filter_state(state)
|
||||
past += timedelta(minutes=1)
|
||||
self.assertEqual(21.5, filtered)
|
||||
filtered = filt.filter_state(state)
|
||||
self.assertEqual(21.5, filtered.state)
|
||||
|
|
|
@ -131,6 +131,39 @@ class TestComponentHistory(unittest.TestCase):
|
|||
|
||||
self.assertEqual(states, hist[entity_id])
|
||||
|
||||
def test_get_last_state_changes(self):
|
||||
"""Test number of state changes."""
|
||||
self.init_recorder()
|
||||
entity_id = 'sensor.test'
|
||||
|
||||
def set_state(state):
|
||||
"""Set the state."""
|
||||
self.hass.states.set(entity_id, state)
|
||||
self.wait_recording_done()
|
||||
return self.hass.states.get(entity_id)
|
||||
|
||||
start = dt_util.utcnow() - timedelta(minutes=2)
|
||||
point = start + timedelta(minutes=1)
|
||||
point2 = point + timedelta(minutes=1)
|
||||
|
||||
with patch('homeassistant.components.recorder.dt_util.utcnow',
|
||||
return_value=start):
|
||||
set_state('1')
|
||||
|
||||
states = []
|
||||
with patch('homeassistant.components.recorder.dt_util.utcnow',
|
||||
return_value=point):
|
||||
states.append(set_state('2'))
|
||||
|
||||
with patch('homeassistant.components.recorder.dt_util.utcnow',
|
||||
return_value=point2):
|
||||
states.append(set_state('3'))
|
||||
|
||||
hist = history.get_last_state_changes(
|
||||
self.hass, 2, entity_id)
|
||||
|
||||
self.assertEqual(states, hist[entity_id])
|
||||
|
||||
def test_get_significant_states(self):
|
||||
"""Test that only significant states are returned.
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue