Extract helpers.event from core + misc cleanup

This commit is contained in:
Paulus Schoutsen 2015-07-26 10:45:49 +02:00
parent 0c56fde5a9
commit e0468f8b8e
5 changed files with 477 additions and 361 deletions

View file

@ -13,6 +13,7 @@ import threading
import enum
import re
import functools as ft
from collections import namedtuple
from homeassistant.const import (
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP,
@ -41,6 +42,9 @@ ENTITY_ID_PATTERN = re.compile(r"^(?P<domain>\w+)\.(?P<entity>\w+)$")
_LOGGER = logging.getLogger(__name__)
# Temporary addition to proxy deprecated methods
_MockHA = namedtuple("MockHomeAssistant", ['bus'])
class HomeAssistant(object):
""" Core class to route all communication to right components. """
@ -52,39 +56,12 @@ class HomeAssistant(object):
self.states = StateMachine(self.bus)
self.config = Config()
@property
def components(self):
""" DEPRECATED 3/21/2015. Use hass.config.components """
_LOGGER.warning(
'hass.components is deprecated. Use hass.config.components')
return self.config.components
@property
def local_api(self):
""" DEPRECATED 3/21/2015. Use hass.config.api """
_LOGGER.warning(
'hass.local_api is deprecated. Use hass.config.api')
return self.config.api
@property
def config_dir(self):
""" DEPRECATED 3/18/2015. Use hass.config.config_dir """
_LOGGER.warning(
'hass.config_dir is deprecated. Use hass.config.config_dir')
return self.config.config_dir
def get_config_path(self, path):
""" DEPRECATED 3/18/2015. Use hass.config.path """
_LOGGER.warning(
'hass.get_config_path is deprecated. Use hass.config.path')
return self.config.path(path)
def start(self):
""" Start home assistant. """
_LOGGER.info(
"Starting Home Assistant (%d threads)", self.pool.worker_count)
Timer(self)
create_timer(self)
self.bus.fire(EVENT_HOMEASSISTANT_START)
@ -105,98 +82,6 @@ class HomeAssistant(object):
self.stop()
def track_point_in_time(self, action, point_in_time):
"""
Adds a listener that fires once after a spefic point in time.
"""
utc_point_in_time = date_util.as_utc(point_in_time)
@ft.wraps(action)
def utc_converter(utc_now):
""" Converts passed in UTC now to local now. """
action(date_util.as_local(utc_now))
self.track_point_in_utc_time(utc_converter, utc_point_in_time)
def track_point_in_utc_time(self, action, point_in_time):
"""
Adds a listener that fires once after a specific point in UTC time.
"""
@ft.wraps(action)
def point_in_time_listener(event):
""" Listens for matching time_changed events. """
now = event.data[ATTR_NOW]
if now >= point_in_time and \
not hasattr(point_in_time_listener, 'run'):
# Set variable so that we will never run twice.
# Because the event bus might have to wait till a thread comes
# available to execute this listener it might occur that the
# listener gets lined up twice to be executed. This will make
# sure the second time it does nothing.
point_in_time_listener.run = True
self.bus.remove_listener(EVENT_TIME_CHANGED,
point_in_time_listener)
action(now)
self.bus.listen(EVENT_TIME_CHANGED, point_in_time_listener)
return point_in_time_listener
# pylint: disable=too-many-arguments
def track_utc_time_change(self, action,
year=None, month=None, day=None,
hour=None, minute=None, second=None):
""" Adds a listener that will fire if time matches a pattern. """
self.track_time_change(
action, year, month, day, hour, minute, second, utc=True)
# pylint: disable=too-many-arguments
def track_time_change(self, action,
year=None, month=None, day=None,
hour=None, minute=None, second=None, utc=False):
""" Adds a listener that will fire if UTC time matches a pattern. """
# We do not have to wrap the function with time pattern matching logic
# if no pattern given
if any((val is not None for val in
(year, month, day, hour, minute, second))):
pmp = _process_match_param
year, month, day = pmp(year), pmp(month), pmp(day)
hour, minute, second = pmp(hour), pmp(minute), pmp(second)
@ft.wraps(action)
def time_listener(event):
""" Listens for matching time_changed events. """
now = event.data[ATTR_NOW]
if not utc:
now = date_util.as_local(now)
mat = _matcher
if mat(now.year, year) and \
mat(now.month, month) and \
mat(now.day, day) and \
mat(now.hour, hour) and \
mat(now.minute, minute) and \
mat(now.second, second):
action(now)
else:
@ft.wraps(action)
def time_listener(event):
""" Fires every time event that comes in. """
action(event.data[ATTR_NOW])
self.bus.listen(EVENT_TIME_CHANGED, time_listener)
return time_listener
def stop(self):
""" Stops Home Assistant and shuts down all threads. """
_LOGGER.info("Stopping")
@ -208,76 +93,45 @@ class HomeAssistant(object):
self.pool.stop()
def get_entity_ids(self, domain_filter=None):
"""
Returns known entity ids.
THIS METHOD IS DEPRECATED. Use hass.states.entity_ids
"""
def track_point_in_time(self, action, point_in_time):
"""Deprecated method to track point in time."""
_LOGGER.warning(
"hass.get_entiy_ids is deprecated. Use hass.states.entity_ids")
'hass.track_point_in_time is deprecated. '
'Please use homeassistant.helpers.event.track_point_in_time')
import homeassistant.helpers.event as helper
helper.track_point_in_time(self, action, point_in_time)
return self.states.entity_ids(domain_filter)
def listen_once_event(self, event_type, listener):
""" Listen once for event of a specific type.
To listen to all events specify the constant ``MATCH_ALL``
as event_type.
Note: at the moment it is impossible to remove a one time listener.
THIS METHOD IS DEPRECATED. Please use hass.events.listen_once.
"""
def track_point_in_utc_time(self, action, point_in_time):
"""Deprecated method to track point in UTC time."""
_LOGGER.warning(
"hass.listen_once_event is deprecated. Use hass.bus.listen_once")
'hass.track_point_in_utc_time is deprecated. '
'Please use homeassistant.helpers.event.track_point_in_utc_time')
import homeassistant.helpers.event as helper
helper.track_point_in_utc_time(self, action, point_in_time)
self.bus.listen_once(event_type, listener)
def track_utc_time_change(self, action,
year=None, month=None, day=None,
hour=None, minute=None, second=None):
"""Deprecated method to track UTC time change."""
# pylint: disable=too-many-arguments
_LOGGER.warning(
'hass.track_utc_time_change is deprecated. '
'Please use homeassistant.helpers.event.track_utc_time_change')
import homeassistant.helpers.event as helper
helper.track_utc_time_change(self, action, year, month, day, hour,
minute, second)
def track_state_change(self, entity_ids, action,
from_state=None, to_state=None):
"""
Track specific state changes.
entity_ids, from_state and to_state can be string or list.
Use list to match multiple.
THIS METHOD IS DEPRECATED. Use hass.states.track_change
"""
_LOGGER.warning((
"hass.track_state_change is deprecated. "
"Use hass.states.track_change"))
self.states.track_change(entity_ids, action, from_state, to_state)
def call_service(self, domain, service, service_data=None):
"""
Fires event to call specified service.
THIS METHOD IS DEPRECATED. Use hass.services.call
"""
_LOGGER.warning((
"hass.services.call is deprecated. "
"Use hass.services.call"))
self.services.call(domain, service, service_data)
def _process_match_param(parameter):
""" Wraps parameter in a list if it is not one and returns it. """
if parameter is None or parameter == MATCH_ALL:
return MATCH_ALL
elif isinstance(parameter, str) or not hasattr(parameter, '__iter__'):
return (parameter,)
else:
return tuple(parameter)
def _matcher(subject, pattern):
""" Returns True if subject matches the pattern.
Pattern is either a list of allowed subjects or a `MATCH_ALL`.
"""
return MATCH_ALL == pattern or subject in pattern
def track_time_change(self, action,
year=None, month=None, day=None,
hour=None, minute=None, second=None, utc=False):
"""Deprecated method to track time change."""
# pylint: disable=too-many-arguments
_LOGGER.warning(
'hass.track_time_change is deprecated. '
'Please use homeassistant.helpers.event.track_time_change')
import homeassistant.helpers.event as helper
helper.track_time_change(self, action, year, month, day, hour,
minute, second)
class JobPriority(util.OrderedEnum):
@ -305,33 +159,6 @@ class JobPriority(util.OrderedEnum):
return JobPriority.EVENT_DEFAULT
def create_worker_pool():
""" Creates a worker pool to be used. """
def job_handler(job):
""" Called whenever a job is available to do. """
try:
func, arg = job
func(arg)
except Exception: # pylint: disable=broad-except
# Catch any exception our service/event_listener might throw
# We do not want to crash our ThreadPool
_LOGGER.exception("BusHandler:Exception doing job")
def busy_callback(worker_count, current_jobs, pending_jobs_count):
""" Callback to be called when the pool queue gets too big. """
_LOGGER.warning(
"WorkerPool:All %d threads are busy and %d jobs pending",
worker_count, pending_jobs_count)
for start, job in current_jobs:
_LOGGER.warning("WorkerPool:Current job from %s: %s",
date_util.datetime_to_local_str(start), job)
return util.ThreadPool(job_handler, MIN_WORKER_THREAD, busy_callback)
class EventOrigin(enum.Enum):
""" Distinguish between origin of event. """
# pylint: disable=no-init,too-few-public-methods
@ -446,12 +273,13 @@ class EventBus(object):
To listen to all events specify the constant ``MATCH_ALL``
as event_type.
Note: at the moment it is impossible to remove a one time listener.
Returns registered listener that can be used with remove_listener.
"""
@ft.wraps(listener)
def onetime_listener(event):
""" Removes listener from eventbus and then fires listener. """
if not hasattr(onetime_listener, 'run'):
if hasattr(onetime_listener, 'run'):
return
# Set variable so that we will never run twice.
# Because the event bus might have to wait till a thread comes
# available to execute this listener it might occur that the
@ -465,6 +293,8 @@ class EventBus(object):
self.listen(event_type, onetime_listener)
return onetime_listener
def remove_listener(self, event_type, listener):
""" Removes a listener of a specific event_type. """
with self._lock:
@ -596,17 +426,18 @@ class StateMachine(object):
def entity_ids(self, domain_filter=None):
""" List of entity ids that are being tracked. """
if domain_filter is not None:
if domain_filter is None:
return list(self._states.keys())
domain_filter = domain_filter.lower()
return [state.entity_id for key, state
in self._states.items()
if util.split_entity_id(key)[0] == domain_filter]
else:
return list(self._states.keys())
def all(self):
""" Returns a list of all states. """
with self._lock:
return [state.copy() for state in self._states.values()]
def get(self, entity_id):
@ -616,16 +447,6 @@ class StateMachine(object):
# Make a copy so people won't mutate the state
return state.copy() if state else None
def get_since(self, point_in_time):
"""
Returns all states that have been changed since point_in_time.
"""
point_in_time = date_util.strip_microseconds(point_in_time)
with self._lock:
return [state for state in self._states.values()
if state.last_updated >= point_in_time]
def is_state(self, entity_id, state):
""" Returns True if entity exists and is specified state. """
entity_id = entity_id.lower()
@ -661,8 +482,10 @@ class StateMachine(object):
same_state = is_existing and old_state.state == new_state
same_attr = is_existing and old_state.attributes == attributes
if same_state and same_attr:
return
# If state did not exist or is different, set it
if not (same_state and same_attr):
last_changed = old_state.last_changed if same_state else None
state = State(entity_id, new_state, attributes, last_changed)
@ -677,43 +500,14 @@ class StateMachine(object):
def track_change(self, entity_ids, action, from_state=None, to_state=None):
"""
Track specific state changes.
entity_ids, from_state and to_state can be string or list.
Use list to match multiple.
Returns the listener that listens on the bus for EVENT_STATE_CHANGED.
Pass the return value into hass.bus.remove_listener to remove it.
DEPRECATED
"""
from_state = _process_match_param(from_state)
to_state = _process_match_param(to_state)
# Ensure it is a lowercase list with entity ids we want to match on
if isinstance(entity_ids, str):
entity_ids = (entity_ids.lower(),)
else:
entity_ids = tuple(entity_id.lower() for entity_id in entity_ids)
@ft.wraps(action)
def state_listener(event):
""" The listener that listens for specific state changes. """
if event.data['entity_id'] not in entity_ids:
return
if 'old_state' in event.data:
old_state = event.data['old_state'].state
else:
old_state = None
if _matcher(old_state, from_state) and \
_matcher(event.data['new_state'].state, to_state):
action(event.data['entity_id'],
event.data.get('old_state'),
event.data['new_state'])
self._bus.listen(EVENT_STATE_CHANGED, state_listener)
return state_listener
_LOGGER.warning(
'hass.states.track_change is deprecated. '
'Use homeassistant.helpers.event.track_state_change instead.')
import homeassistant.helpers.event as helper
helper.track_state_change(_MockHA(self._bus), entity_ids, action,
from_state, to_state)
# pylint: disable=too-few-public-methods
@ -826,15 +620,16 @@ class ServiceRegistry(object):
domain = service_data.pop(ATTR_DOMAIN, None)
service = service_data.pop(ATTR_SERVICE, None)
with self._lock:
if domain in self._services and service in self._services[domain]:
if not self.has_service(domain, service):
return
service_handler = self._services[domain][service]
service_call = ServiceCall(domain, service, service_data)
# Add a job to the pool that calls _execute_service
self._pool.add_job(JobPriority.EVENT_SERVICE,
(self._execute_service,
(self._services[domain][service],
service_call)))
(service_handler, service_call)))
def _execute_service(self, service_and_call):
""" Executes a service and fires a SERVICE_EXECUTED event. """
@ -843,9 +638,8 @@ class ServiceRegistry(object):
service(call)
self._bus.fire(
EVENT_SERVICE_EXECUTED, {
ATTR_SERVICE_CALL_ID: call.data[ATTR_SERVICE_CALL_ID]
})
EVENT_SERVICE_EXECUTED,
{ATTR_SERVICE_CALL_ID: call.data[ATTR_SERVICE_CALL_ID]})
def _generate_unique_id(self):
""" Generates a unique service call id. """
@ -853,70 +647,6 @@ class ServiceRegistry(object):
return "{}-{}".format(id(self), self._cur_id)
class Timer(threading.Thread):
""" Timer will sent out an event every TIMER_INTERVAL seconds. """
def __init__(self, hass, interval=None):
threading.Thread.__init__(self)
self.daemon = True
self.hass = hass
self.interval = interval or TIMER_INTERVAL
self._stop_event = threading.Event()
# We want to be able to fire every time a minute starts (seconds=0).
# We want this so other modules can use that to make sure they fire
# every minute.
assert 60 % self.interval == 0, "60 % TIMER_INTERVAL should be 0!"
hass.bus.listen_once(EVENT_HOMEASSISTANT_START,
lambda event: self.start())
def run(self):
""" Start the timer. """
self.hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP,
lambda event: self._stop_event.set())
_LOGGER.info("Timer:starting")
last_fired_on_second = -1
calc_now = date_util.utcnow
interval = self.interval
while not self._stop_event.isSet():
now = calc_now()
# First check checks if we are not on a second matching the
# timer interval. Second check checks if we did not already fire
# this interval.
if now.second % interval or \
now.second == last_fired_on_second:
# Sleep till it is the next time that we have to fire an event.
# Aim for halfway through the second that fits TIMER_INTERVAL.
# If TIMER_INTERVAL is 10 fire at .5, 10.5, 20.5, etc seconds.
# This will yield the best results because time.sleep() is not
# 100% accurate because of non-realtime OS's
slp_seconds = interval - now.second % interval + \
.5 - now.microsecond/1000000.0
time.sleep(slp_seconds)
now = calc_now()
last_fired_on_second = now.second
# Event might have been set while sleeping
if not self._stop_event.isSet():
try:
self.hass.bus.fire(EVENT_TIME_CHANGED, {ATTR_NOW: now})
except HomeAssistantError:
# HA raises error if firing event after it has shut down
break
class Config(object):
""" Configuration settings for Home Assistant. """
@ -986,3 +716,93 @@ class InvalidEntityFormatError(HomeAssistantError):
class NoEntitySpecifiedError(HomeAssistantError):
""" When no entity is specified. """
pass
def create_timer(hass, interval=TIMER_INTERVAL):
""" Creates a timer. Timer will start on HOMEASSISTANT_START. """
# We want to be able to fire every time a minute starts (seconds=0).
# We want this so other modules can use that to make sure they fire
# every minute.
assert 60 % interval == 0, "60 % TIMER_INTERVAL should be 0!"
def timer():
"""Send an EVENT_TIME_CHANGED on interval."""
stop_event = threading.Event()
def stop_timer(event):
"""Stop the timer."""
stop_event.set()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_timer)
_LOGGER.info("Timer:starting")
last_fired_on_second = -1
calc_now = date_util.utcnow
while not stop_event.isSet():
now = calc_now()
# First check checks if we are not on a second matching the
# timer interval. Second check checks if we did not already fire
# this interval.
if now.second % interval or \
now.second == last_fired_on_second:
# Sleep till it is the next time that we have to fire an event.
# Aim for halfway through the second that fits TIMER_INTERVAL.
# If TIMER_INTERVAL is 10 fire at .5, 10.5, 20.5, etc seconds.
# This will yield the best results because time.sleep() is not
# 100% accurate because of non-realtime OS's
slp_seconds = interval - now.second % interval + \
.5 - now.microsecond/1000000.0
time.sleep(slp_seconds)
now = calc_now()
last_fired_on_second = now.second
# Event might have been set while sleeping
if not stop_event.isSet():
try:
hass.bus.fire(EVENT_TIME_CHANGED, {ATTR_NOW: now})
except HomeAssistantError:
# HA raises error if firing event after it has shut down
break
def start_timer(event):
"""Start the timer."""
thread = threading.Thread(target=timer)
thread.daemon = True
thread.start()
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_timer)
def create_worker_pool():
""" Creates a worker pool to be used. """
def job_handler(job):
""" Called whenever a job is available to do. """
try:
func, arg = job
func(arg)
except Exception: # pylint: disable=broad-except
# Catch any exception our service/event_listener might throw
# We do not want to crash our ThreadPool
_LOGGER.exception("BusHandler:Exception doing job")
def busy_callback(worker_count, current_jobs, pending_jobs_count):
""" Callback to be called when the pool queue gets too big. """
_LOGGER.warning(
"WorkerPool:All %d threads are busy and %d jobs pending",
worker_count, pending_jobs_count)
for start, job in current_jobs:
_LOGGER.warning("WorkerPool:Current job from %s: %s",
date_util.datetime_to_local_str(start), job)
return util.ThreadPool(job_handler, MIN_WORKER_THREAD, busy_callback)

View file

@ -0,0 +1,161 @@
"""
Helpers for listening to events
"""
import functools as ft
from ..util import dt as dt_util
from ..const import (
ATTR_NOW, EVENT_STATE_CHANGED, EVENT_TIME_CHANGED, MATCH_ALL)
def track_state_change(hass, entity_ids, action, from_state=None,
to_state=None):
"""
Track specific state changes.
entity_ids, from_state and to_state can be string or list.
Use list to match multiple.
Returns the listener that listens on the bus for EVENT_STATE_CHANGED.
Pass the return value into hass.bus.remove_listener to remove it.
"""
from_state = _process_match_param(from_state)
to_state = _process_match_param(to_state)
# Ensure it is a lowercase list with entity ids we want to match on
if isinstance(entity_ids, str):
entity_ids = (entity_ids.lower(),)
else:
entity_ids = tuple(entity_id.lower() for entity_id in entity_ids)
@ft.wraps(action)
def state_change_listener(event):
""" The listener that listens for specific state changes. """
if event.data['entity_id'] not in entity_ids:
return
if 'old_state' in event.data:
old_state = event.data['old_state'].state
else:
old_state = None
if _matcher(old_state, from_state) and \
_matcher(event.data['new_state'].state, to_state):
action(event.data['entity_id'],
event.data.get('old_state'),
event.data['new_state'])
hass.bus.listen(EVENT_STATE_CHANGED, state_change_listener)
return state_change_listener
def track_point_in_time(hass, action, point_in_time):
"""
Adds a listener that fires once after a spefic point in time.
"""
utc_point_in_time = dt_util.as_utc(point_in_time)
@ft.wraps(action)
def utc_converter(utc_now):
""" Converts passed in UTC now to local now. """
action(dt_util.as_local(utc_now))
return track_point_in_utc_time(hass, utc_converter, utc_point_in_time)
def track_point_in_utc_time(hass, action, point_in_time):
"""
Adds a listener that fires once after a specific point in UTC time.
"""
@ft.wraps(action)
def point_in_time_listener(event):
""" Listens for matching time_changed events. """
now = event.data[ATTR_NOW]
if now >= point_in_time and \
not hasattr(point_in_time_listener, 'run'):
# Set variable so that we will never run twice.
# Because the event bus might have to wait till a thread comes
# available to execute this listener it might occur that the
# listener gets lined up twice to be executed. This will make
# sure the second time it does nothing.
point_in_time_listener.run = True
hass.bus.remove_listener(EVENT_TIME_CHANGED,
point_in_time_listener)
action(now)
hass.bus.listen(EVENT_TIME_CHANGED, point_in_time_listener)
return point_in_time_listener
# pylint: disable=too-many-arguments
def track_utc_time_change(hass, action, year=None, month=None, day=None,
hour=None, minute=None, second=None, local=False):
""" Adds a listener that will fire if time matches a pattern. """
# We do not have to wrap the function with time pattern matching logic
# if no pattern given
if all(val is None for val in (year, month, day, hour, minute, second)):
@ft.wraps(action)
def time_change_listener(event):
""" Fires every time event that comes in. """
action(event.data[ATTR_NOW])
hass.bus.listen(EVENT_TIME_CHANGED, time_change_listener)
return time_change_listener
pmp = _process_match_param
year, month, day = pmp(year), pmp(month), pmp(day)
hour, minute, second = pmp(hour), pmp(minute), pmp(second)
@ft.wraps(action)
def pattern_time_change_listener(event):
""" Listens for matching time_changed events. """
now = event.data[ATTR_NOW]
if local:
now = dt_util.as_local(now)
mat = _matcher
if mat(now.year, year) and \
mat(now.month, month) and \
mat(now.day, day) and \
mat(now.hour, hour) and \
mat(now.minute, minute) and \
mat(now.second, second):
action(now)
hass.bus.listen(EVENT_TIME_CHANGED, pattern_time_change_listener)
return pattern_time_change_listener
# pylint: disable=too-many-arguments
def track_time_change(hass, action, year=None, month=None, day=None,
hour=None, minute=None, second=None):
""" Adds a listener that will fire if UTC time matches a pattern. """
track_utc_time_change(hass, action, year, month, day, hour, minute, second,
local=True)
def _process_match_param(parameter):
""" Wraps parameter in a tuple if it is not one and returns it. """
if parameter is None or parameter == MATCH_ALL:
return MATCH_ALL
elif isinstance(parameter, str) or not hasattr(parameter, '__iter__'):
return (parameter,)
else:
return tuple(parameter)
def _matcher(subject, pattern):
""" Returns True if subject matches the pattern.
Pattern is either a tuple of allowed subjects or a `MATCH_ALL`.
"""
return MATCH_ALL == pattern or subject in pattern

View file

@ -30,7 +30,16 @@ class TrackStates(object):
return self.states
def __exit__(self, exc_type, exc_value, traceback):
self.states.extend(self.hass.states.get_since(self.now))
self.states.extend(get_changed_since(self.hass.states.all(), self.now))
def get_changed_since(states, utc_point_in_time):
"""
Returns all states that have been changed since utc_point_in_time.
"""
point_in_time = dt_util.strip_microseconds(utc_point_in_time)
return [state for state in states if state.last_updated >= point_in_time]
def reproduce_state(hass, states, blocking=False):

View file

@ -124,7 +124,7 @@ class HomeAssistant(ha.HomeAssistant):
raise ha.HomeAssistantError(
'Unable to setup local API to receive events')
ha.Timer(self)
ha.create_timer(self)
self.bus.fire(ha.EVENT_HOMEASSISTANT_START,
origin=ha.EventOrigin.remote)

126
tests/helpers/test_event.py Normal file
View file

@ -0,0 +1,126 @@
"""
tests.helpers.event_test
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests event helpers.
"""
# pylint: disable=protected-access,too-many-public-methods
# pylint: disable=too-few-public-methods
import unittest
from datetime import datetime
import homeassistant as ha
from homeassistant.helpers.event import *
class TestEventHelpers(unittest.TestCase):
"""
Tests the Home Assistant event helpers.
"""
def setUp(self): # pylint: disable=invalid-name
""" things to be run when tests are started. """
self.hass = ha.HomeAssistant()
self.hass.states.set("light.Bowl", "on")
self.hass.states.set("switch.AC", "off")
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass.stop()
def test_track_point_in_time(self):
""" Test track point in time. """
before_birthday = datetime(1985, 7, 9, 12, 0, 0)
birthday_paulus = datetime(1986, 7, 9, 12, 0, 0)
after_birthday = datetime(1987, 7, 9, 12, 0, 0)
runs = []
track_point_in_utc_time(
self.hass, lambda x: runs.append(1), birthday_paulus)
self._send_time_changed(before_birthday)
self.hass.pool.block_till_done()
self.assertEqual(0, len(runs))
self._send_time_changed(birthday_paulus)
self.hass.pool.block_till_done()
self.assertEqual(1, len(runs))
# A point in time tracker will only fire once, this should do nothing
self._send_time_changed(birthday_paulus)
self.hass.pool.block_till_done()
self.assertEqual(1, len(runs))
track_point_in_utc_time(
self.hass, lambda x: runs.append(1), birthday_paulus)
self._send_time_changed(after_birthday)
self.hass.pool.block_till_done()
self.assertEqual(2, len(runs))
def test_track_time_change(self):
""" Test tracking time change. """
wildcard_runs = []
specific_runs = []
track_time_change(self.hass, lambda x: wildcard_runs.append(1))
track_time_change(
self.hass, lambda x: specific_runs.append(1), second=[0, 30])
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 0))
self.hass.pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(1, len(wildcard_runs))
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 15))
self.hass.pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(2, len(wildcard_runs))
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 30))
self.hass.pool.block_till_done()
self.assertEqual(2, len(specific_runs))
self.assertEqual(3, len(wildcard_runs))
def test_track_state_change(self):
""" Test states.track_change. """
# 2 lists to track how often our callbacks get called
specific_runs = []
wildcard_runs = []
track_state_change(
self.hass, 'light.Bowl', lambda a, b, c: specific_runs.append(1),
'on', 'off')
track_state_change(
self.hass, 'light.Bowl', lambda a, b, c: wildcard_runs.append(1),
ha.MATCH_ALL, ha.MATCH_ALL)
# Set same state should not trigger a state change/listener
self.hass.states.set('light.Bowl', 'on')
self.hass.pool.block_till_done()
self.assertEqual(0, len(specific_runs))
self.assertEqual(0, len(wildcard_runs))
# State change off -> on
self.hass.states.set('light.Bowl', 'off')
self.hass.pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(1, len(wildcard_runs))
# State change off -> off
self.hass.states.set('light.Bowl', 'off', {"some_attr": 1})
self.hass.pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(2, len(wildcard_runs))
# State change off -> on
self.hass.states.set('light.Bowl', 'on')
self.hass.pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(3, len(wildcard_runs))
def _send_time_changed(self, now):
""" Send a time changed event. """
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: now})