EventBus is now Bus that handles Events and Services

This commit is contained in:
Paulus Schoutsen 2013-11-19 23:48:08 -08:00
parent 05e68c3e1e
commit 3641d2ca24
10 changed files with 592 additions and 225 deletions

View file

@ -59,6 +59,35 @@ Returns a list of categories for which a state is available
}
```
**/api/events - GET**<br>
Returns a dict with as keys the events and as value the number of listeners.
```json
{
"event_listeners": {
"state_changed": 5,
"time_changed": 2
}
}
```
**/api/services - GET**<br>
Returns a dict with as keys the domain and as value a list of published services.
```json
{
"services": {
"browser": [
"browse_url"
],
"keyboard": [
"volume_up",
"volume_down"
]
}
}
```
**/api/states/&lt;category>** - GET<br>
Returns the current state from a category
@ -101,6 +130,16 @@ optional parameter: event_data - JSON encoded object
}
```
**/api/services/&lt;domain>/&lt;service>** - POST<br>
Calls a service within a specific domain.<br>
optional parameter: service_data - JSON encoded object
```json
{
"message": "Service keyboard/volume_up called."
}
```
Android remote control
----------------------

Binary file not shown.

Before

Width:  |  Height:  |  Size: 28 KiB

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 90 KiB

After

Width:  |  Height:  |  Size: 222 KiB

View file

@ -15,8 +15,12 @@ from datetime import datetime
logging.basicConfig(level=logging.INFO)
ALL_EVENTS = '*'
DOMAIN_HOMEASSISTANT = "homeassistant"
SERVICE_HOMEASSISTANT_STOP = "stop"
EVENT_HOMEASSISTANT_START = "homeassistant.start"
EVENT_HOMEASSISTANT_STOP = "homeassistant.stop"
EVENT_STATE_CHANGED = "state_changed"
EVENT_TIME_CHANGED = "time_changed"
@ -30,16 +34,16 @@ assert 60 % TIMER_INTERVAL == 0, "60 % TIMER_INTERVAL should be 0!"
DATE_STR_FORMAT = "%H:%M:%S %d-%m-%Y"
def start_home_assistant(eventbus):
def start_home_assistant(bus):
""" Start home assistant. """
request_shutdown = threading.Event()
eventbus.listen_once(EVENT_HOMEASSISTANT_STOP,
lambda event: request_shutdown.set())
bus.register_service(DOMAIN_HOMEASSISTANT, SERVICE_HOMEASSISTANT_STOP,
lambda service: request_shutdown.set())
Timer(eventbus)
Timer(bus)
eventbus.fire(EVENT_HOMEASSISTANT_START)
bus.fire_event(EVENT_HOMEASSISTANT_START)
while True:
try:
@ -68,7 +72,7 @@ def str_to_datetime(dt_str):
return datetime.strptime(dt_str, DATE_STR_FORMAT)
def ensure_list(parameter):
def _ensure_list(parameter):
""" Wraps parameter in a list if it is not one and returns it.
@rtype : list
@ -76,7 +80,7 @@ def ensure_list(parameter):
return parameter if isinstance(parameter, list) else [parameter]
def matcher(subject, pattern):
def _matcher(subject, pattern):
""" Returns True if subject matches the pattern.
Pattern is either a list of allowed subjects or a '*'.
@ -95,33 +99,35 @@ def create_state(state, attributes=None, last_changed=None):
'last_changed': datetime_to_str(last_changed)}
def track_state_change(eventbus, category, from_state, to_state, action):
def track_state_change(bus, category, from_state, to_state, action):
""" Helper method to track specific state changes. """
from_state = ensure_list(from_state)
to_state = ensure_list(to_state)
from_state = _ensure_list(from_state)
to_state = _ensure_list(to_state)
def listener(event):
""" State change listener that listens for specific state changes. """
if category == event.data['category'] and \
matcher(event.data['old_state']['state'], from_state) and \
matcher(event.data['new_state']['state'], to_state):
_matcher(event.data['old_state']['state'], from_state) and \
_matcher(event.data['new_state']['state'], to_state):
action(event.data['category'],
event.data['old_state'],
event.data['new_state'])
eventbus.listen(EVENT_STATE_CHANGED, listener)
bus.listen_event(EVENT_STATE_CHANGED, listener)
# pylint: disable=too-many-arguments
def track_time_change(eventbus, action,
def track_time_change(bus, action,
year='*', month='*', day='*',
hour='*', minute='*', second='*',
point_in_time=None, listen_once=False):
""" Adds a listener that will listen for a specified or matching time. """
year, month, day = ensure_list(year), ensure_list(month), ensure_list(day)
hour, minute = ensure_list(hour), ensure_list(minute)
second = ensure_list(second)
year, month = _ensure_list(year), _ensure_list(month)
day = _ensure_list(day)
hour, minute = _ensure_list(hour), _ensure_list(minute)
second = _ensure_list(second)
def listener(event):
""" Listens for matching time_changed events. """
@ -129,47 +135,92 @@ def track_time_change(eventbus, action,
if (point_in_time and now > point_in_time) or \
(not point_in_time and
matcher(now.year, year) and
matcher(now.month, month) and
matcher(now.day, day) and
matcher(now.hour, hour) and
matcher(now.minute, minute) and
matcher(now.second, second)):
_matcher(now.year, year) and
_matcher(now.month, month) and
_matcher(now.day, day) and
_matcher(now.hour, hour) and
_matcher(now.minute, minute) and
_matcher(now.second, second)):
# point_in_time are exact points in time
# so we always remove it after fire
if listen_once or point_in_time:
event.eventbus.remove_listener(EVENT_TIME_CHANGED, listener)
event.bus.remove_event_listener(EVENT_TIME_CHANGED, listener)
action(now)
eventbus.listen(EVENT_TIME_CHANGED, listener)
bus.listen_event(EVENT_TIME_CHANGED, listener)
ServiceCall = namedtuple("ServiceCall", ["bus", "domain", "service", "data"])
Event = namedtuple("Event", ["bus", "event_type", "data"])
Event = namedtuple("Event", ["eventbus", "event_type", "data"])
class EventBus(object):
""" Class that allows code to listen for- and fire events. """
class Bus(object):
""" Class that allows different components to communicate via services
and events.
"""
def __init__(self):
self._listeners = defaultdict(list)
self._event_listeners = defaultdict(list)
self._services = {}
self.logger = logging.getLogger(__name__)
@property
def listeners(self):
""" List of events that is being listened for. """
return {key: len(self._listeners[key])
for key in self._listeners.keys()
if len(self._listeners[key]) > 0}
def services(self):
""" Dict with per domain a list of available services. """
return {domain: self._services[domain].keys()
for domain in self._services}
def fire(self, event_type, event_data=None):
@property
def event_listeners(self):
""" Dict with events that is being listened for and the number
of listeners.
"""
return {key: len(self._event_listeners[key])
for key in self._event_listeners.keys()
if len(self._event_listeners[key]) > 0}
def call_service(self, domain, service, service_data=None):
""" Calls a service. """
try:
self._services[domain][service]
except KeyError:
# Domain or Service does not exist
raise ServiceDoesNotExistException(
"Service does not exist: {}/{}".format(domain, service))
if not service_data:
service_data = {}
def run():
""" Executes a service. """
service_call = ServiceCall(self, domain, service, service_data)
try:
self._services[domain][service](service_call)
except Exception: # pylint: disable=broad-except
self.logger.exception("Bus:Exception in service {}/{}".format(
domain, service))
# We dont want the eventbus to be blocking - run in a thread.
threading.Thread(target=run).start()
def register_service(self, domain, service, service_callback):
""" Register a service. """
try:
self._services[domain][service] = service_callback
except KeyError:
# Domain does not exist yet
self._services[domain] = {service: service_callback}
def fire_event(self, event_type, event_data=None):
""" Fire an event. """
if not event_data:
event_data = {}
self.logger.info("EventBus:Event {}: {}".format(
self.logger.info("Bus:Event {}: {}".format(
event_type, event_data))
def run():
@ -178,26 +229,26 @@ class EventBus(object):
# We do not use itertools.chain() because some listeners might
# choose to remove themselves as a listener while being executed
for listener in self._listeners[ALL_EVENTS] + \
self._listeners[event.event_type]:
for listener in self._event_listeners[ALL_EVENTS] + \
self._event_listeners[event.event_type]:
try:
listener(event)
except Exception: # pylint: disable=broad-except
self.logger.exception("EventBus:Exception in listener")
self.logger.exception("Bus:Exception in event listener")
# We dont want the eventbus to be blocking - run in a thread.
# We dont want the bus to be blocking - run in a thread.
threading.Thread(target=run).start()
def listen(self, event_type, listener):
def listen_event(self, event_type, listener):
""" Listen for all events or events of a specific type.
To listen to all events specify the constant ``ALL_EVENTS``
as event_type.
"""
self._listeners[event_type].append(listener)
self._event_listeners[event_type].append(listener)
def listen_once(self, event_type, listener):
def listen_once_event(self, event_type, listener):
""" Listen once for event of a specific type.
To listen to all events specify the constant ``ALL_EVENTS``
@ -208,19 +259,19 @@ class EventBus(object):
def onetime_listener(event):
""" Removes listener from eventbus and then fires listener. """
self.remove_listener(event_type, onetime_listener)
self.remove_event_listener(event_type, onetime_listener)
listener(event)
self.listen(event_type, onetime_listener)
self.listen_event(event_type, onetime_listener)
def remove_listener(self, event_type, listener):
def remove_event_listener(self, event_type, listener):
""" Removes a listener of a specific event_type. """
try:
self._listeners[event_type].remove(listener)
self._event_listeners[event_type].remove(listener)
if len(self._listeners[event_type]) == 0:
del self._listeners[event_type]
if len(self._event_listeners[event_type]) == 0:
del self._event_listeners[event_type]
except ValueError:
pass
@ -229,9 +280,9 @@ class EventBus(object):
class StateMachine(object):
""" Helper class that tracks the state of different categories. """
def __init__(self, eventbus):
def __init__(self, bus):
self.states = dict()
self.eventbus = eventbus
self.bus = bus
self.lock = threading.Lock()
@property
@ -274,10 +325,10 @@ class StateMachine(object):
self.states[category] = create_state(new_state, attributes)
self.eventbus.fire(EVENT_STATE_CHANGED,
{'category': category,
'old_state': old_state,
'new_state': self.states[category]})
self.bus.fire_event(EVENT_STATE_CHANGED,
{'category': category,
'old_state': old_state,
'new_state': self.states[category]})
self.lock.release()
@ -302,14 +353,14 @@ class StateMachine(object):
class Timer(threading.Thread):
""" Timer will sent out an event every TIMER_INTERVAL seconds. """
def __init__(self, eventbus):
def __init__(self, bus):
threading.Thread.__init__(self)
self.daemon = True
self.eventbus = eventbus
self.bus = bus
eventbus.listen_once(EVENT_HOMEASSISTANT_START,
lambda event: self.start())
bus.listen_once_event(EVENT_HOMEASSISTANT_START,
lambda event: self.start())
def run(self):
""" Start the timer. """
@ -338,9 +389,13 @@ class Timer(threading.Thread):
last_fired_on_second = now.second
self.eventbus.fire(EVENT_TIME_CHANGED,
{'now': datetime_to_str(now)})
self.bus.fire_event(EVENT_TIME_CHANGED,
{'now': datetime_to_str(now)})
class HomeAssistantException(Exception):
""" General Home Assistant exception occured. """
class ServiceDoesNotExistException(HomeAssistantException):
""" A service has been referenced that deos not exist. """

View file

@ -23,17 +23,23 @@ from homeassistant.observers import (
LIGHT_TRANSITION_TIME = timedelta(minutes=15)
EVENT_DOWNLOAD_FILE = "download_file"
EVENT_BROWSE_URL = "browse_url"
EVENT_CHROMECAST_YOUTUBE_VIDEO = "chromecast.play_youtube_video"
EVENT_TURN_LIGHT_ON = "turn_light_on"
EVENT_TURN_LIGHT_OFF = "turn_light_off"
EVENT_KEYBOARD_VOLUME_UP = "keyboard.volume_up"
EVENT_KEYBOARD_VOLUME_DOWN = "keyboard.volume_down"
EVENT_KEYBOARD_VOLUME_MUTE = "keyboard.volume_mute"
EVENT_KEYBOARD_MEDIA_PLAY_PAUSE = "keyboard.media_play_pause"
EVENT_KEYBOARD_MEDIA_NEXT_TRACK = "keyboard.media_next_track"
EVENT_KEYBOARD_MEDIA_PREV_TRACK = "keyboard.media_prev_track"
DOMAIN_DOWNLOADER = "downloader"
DOMAIN_BROWSER = "browser"
DOMAIN_CHROMECAST = "chromecast"
DOMAIN_KEYBOARD = "keyboard"
DOMAIN_LIGHT_CONTROL = "light_control"
SERVICE_DOWNLOAD_FILE = "download_file"
SERVICE_BROWSE_URL = "browse_url"
SERVICE_CHROMECAST_YOUTUBE_VIDEO = "play_youtube_video"
SERVICE_TURN_LIGHT_ON = "turn_light_on"
SERVICE_TURN_LIGHT_OFF = "turn_light_off"
SERVICE_KEYBOARD_VOLUME_UP = "volume_up"
SERVICE_KEYBOARD_VOLUME_DOWN = "volume_down"
SERVICE_KEYBOARD_VOLUME_MUTE = "volume_mute"
SERVICE_KEYBOARD_MEDIA_PLAY_PAUSE = "media_play_pause"
SERVICE_KEYBOARD_MEDIA_NEXT_TRACK = "media_next_track"
SERVICE_KEYBOARD_MEDIA_PREV_TRACK = "media_prev_track"
def _hue_process_transition_time(transition_seconds):
@ -49,8 +55,8 @@ class LightTrigger(object):
""" Class to turn on lights based on state of devices and the sun
or triggered by light events. """
def __init__(self, eventbus, statemachine, device_tracker, light_control):
self.eventbus = eventbus
def __init__(self, bus, statemachine, device_tracker, light_control):
self.bus = bus
self.statemachine = statemachine
self.light_control = light_control
@ -58,18 +64,18 @@ class LightTrigger(object):
# Track home coming of each seperate device
for category in device_tracker.device_state_categories:
ha.track_state_change(eventbus, category,
ha.track_state_change(bus, category,
DEVICE_STATE_NOT_HOME, DEVICE_STATE_HOME,
self._handle_device_state_change)
# Track when all devices are gone to shut down lights
ha.track_state_change(eventbus, STATE_CATEGORY_ALL_DEVICES,
ha.track_state_change(bus, STATE_CATEGORY_ALL_DEVICES,
DEVICE_STATE_HOME, DEVICE_STATE_NOT_HOME,
self._handle_device_state_change)
# Track every time sun rises so we can schedule a time-based
# pre-sun set event
ha.track_state_change(eventbus, STATE_CATEGORY_SUN,
ha.track_state_change(bus, STATE_CATEGORY_SUN,
SUN_STATE_BELOW_HORIZON, SUN_STATE_ABOVE_HORIZON,
self._handle_sun_rising)
@ -78,20 +84,6 @@ class LightTrigger(object):
if statemachine.is_state(STATE_CATEGORY_SUN, SUN_STATE_ABOVE_HORIZON):
self._handle_sun_rising(None, None, None)
def handle_light_event(event):
""" Hande a turn light on or off event. """
light_id = event.data.get("light_id", None)
transition_seconds = event.data.get("transition_seconds", None)
if event.event_type == EVENT_TURN_LIGHT_ON:
self.light_control.turn_light_on(light_id, transition_seconds)
else:
self.light_control.turn_light_off(light_id, transition_seconds)
# Listen for light on and light off events
eventbus.listen(EVENT_TURN_LIGHT_ON, handle_light_event)
eventbus.listen(EVENT_TURN_LIGHT_OFF, handle_light_event)
# pylint: disable=unused-argument
def _handle_sun_rising(self, category, old_state, new_state):
"""The moment sun sets we want to have all the lights on.
@ -107,7 +99,7 @@ class LightTrigger(object):
return lambda now: self._turn_light_on_before_sunset(light)
for index, light_id in enumerate(self.light_control.light_ids):
ha.track_time_change(self.eventbus, turn_on(light_id),
ha.track_time_change(self.bus, turn_on(light_id),
point_in_time=(start_point +
index * LIGHT_TRANSITION_TIME))
@ -243,7 +235,30 @@ class HueLightControl(object):
self.bridge.set_light(light_id, command)
def setup_file_downloader(eventbus, download_path):
def setup_light_control_services(bus, light_control):
""" Exposes light control via services. """
def handle_light_event(service):
""" Hande a turn light on or off service call. """
light_id = service.data.get("light_id", None)
transition_seconds = service.data.get("transition_seconds", None)
if service.service == SERVICE_TURN_LIGHT_ON:
light_control.turn_light_on(light_id, transition_seconds)
else:
light_control.turn_light_off(light_id, transition_seconds)
# Listen for light on and light off events
bus.register_service(DOMAIN_LIGHT_CONTROL, SERVICE_TURN_LIGHT_ON,
handle_light_event)
bus.register_service(DOMAIN_LIGHT_CONTROL, SERVICE_TURN_LIGHT_OFF,
handle_light_event)
return True
def setup_file_downloader(bus, download_path):
""" Listens for download events to download files. """
logger = logging.getLogger(__name__)
@ -257,11 +272,11 @@ def setup_file_downloader(eventbus, download_path):
return False
def download_file(event):
def download_file(service):
""" Downloads file specified in the url. """
try:
req = requests.get(event.data['url'], stream=True)
req = requests.get(service.data['url'], stream=True)
if req.status_code == 200:
filename = None
@ -273,7 +288,7 @@ def setup_file_downloader(eventbus, download_path):
filename = match[0].strip("'\" ")
if not filename:
filename = os.path.basename(event.data['url']).strip()
filename = os.path.basename(service.data['url']).strip()
if not filename:
filename = "ha_download"
@ -296,7 +311,7 @@ def setup_file_downloader(eventbus, download_path):
break
logger.info("FileDownloader:{} -> {}".format(
event.data['url'], final_path))
service.data['url'], final_path))
with open(final_path, 'wb') as fil:
for chunk in req.iter_content(1024):
@ -304,45 +319,47 @@ def setup_file_downloader(eventbus, download_path):
except requests.exceptions.ConnectionError:
logger.exception("FileDownloader:ConnectionError occured for {}".
format(event.data['url']))
format(service.data['url']))
eventbus.listen(EVENT_DOWNLOAD_FILE, download_file)
bus.register_service(DOMAIN_DOWNLOADER, SERVICE_DOWNLOAD_FILE,
download_file)
return True
def setup_webbrowser(eventbus):
def setup_webbrowser(bus):
""" Listen for browse_url events and open
the url in the default webbrowser. """
import webbrowser
eventbus.listen(EVENT_BROWSE_URL,
lambda event: webbrowser.open(event.data['url']))
bus.register_service(DOMAIN_BROWSER, SERVICE_BROWSE_URL,
lambda event: webbrowser.open(event.data['url']))
return True
def setup_chromecast(eventbus, host):
def setup_chromecast(bus, host):
""" Listen for chromecast events. """
from homeassistant.packages import pychromecast
eventbus.listen("start_fireplace",
lambda event:
pychromecast.play_youtube_video(host, "eyU3bRy2x44"))
bus.register_service(DOMAIN_CHROMECAST, "start_fireplace",
lambda event:
pychromecast.play_youtube_video(host, "eyU3bRy2x44"))
eventbus.listen("start_epic_sax",
lambda event:
pychromecast.play_youtube_video(host, "kxopViU98Xo"))
bus.register_service(DOMAIN_CHROMECAST, "start_epic_sax",
lambda event:
pychromecast.play_youtube_video(host, "kxopViU98Xo"))
eventbus.listen(EVENT_CHROMECAST_YOUTUBE_VIDEO,
lambda event:
pychromecast.play_youtube_video(host, event.data['video']))
bus.register_service(DOMAIN_CHROMECAST, SERVICE_CHROMECAST_YOUTUBE_VIDEO,
lambda event:
pychromecast.play_youtube_video(host,
event.data['video']))
return True
def setup_media_buttons(eventbus):
def setup_media_buttons(bus):
""" Listen for keyboard events. """
try:
import pykeyboard
@ -355,28 +372,28 @@ def setup_media_buttons(eventbus):
keyboard = pykeyboard.PyKeyboard()
keyboard.special_key_assignment()
eventbus.listen(EVENT_KEYBOARD_VOLUME_UP,
lambda event:
keyboard.tap_key(keyboard.volume_up_key))
bus.register_service(DOMAIN_KEYBOARD, SERVICE_KEYBOARD_VOLUME_UP,
lambda event:
keyboard.tap_key(keyboard.volume_up_key))
eventbus.listen(EVENT_KEYBOARD_VOLUME_DOWN,
lambda event:
keyboard.tap_key(keyboard.volume_down_key))
bus.register_service(DOMAIN_KEYBOARD, SERVICE_KEYBOARD_VOLUME_DOWN,
lambda event:
keyboard.tap_key(keyboard.volume_down_key))
eventbus.listen(EVENT_KEYBOARD_VOLUME_MUTE,
lambda event:
keyboard.tap_key(keyboard.volume_mute_key))
bus.register_service(DOMAIN_KEYBOARD, SERVICE_KEYBOARD_VOLUME_MUTE,
lambda event:
keyboard.tap_key(keyboard.volume_mute_key))
eventbus.listen(EVENT_KEYBOARD_MEDIA_PLAY_PAUSE,
lambda event:
keyboard.tap_key(keyboard.media_play_pause_key))
bus.register_service(DOMAIN_KEYBOARD, SERVICE_KEYBOARD_MEDIA_PLAY_PAUSE,
lambda event:
keyboard.tap_key(keyboard.media_play_pause_key))
eventbus.listen(EVENT_KEYBOARD_MEDIA_NEXT_TRACK,
lambda event:
keyboard.tap_key(keyboard.media_next_track_key))
bus.register_service(DOMAIN_KEYBOARD, SERVICE_KEYBOARD_MEDIA_NEXT_TRACK,
lambda event:
keyboard.tap_key(keyboard.media_next_track_key))
eventbus.listen(EVENT_KEYBOARD_MEDIA_PREV_TRACK,
lambda event:
keyboard.tap_key(keyboard.media_prev_track_key))
bus.register_service(DOMAIN_KEYBOARD, SERVICE_KEYBOARD_MEDIA_PREV_TRACK,
lambda event:
keyboard.tap_key(keyboard.media_prev_track_key))
return True

View file

@ -23,8 +23,8 @@ def from_config_file(config_path):
config.read(config_path)
# Init core
eventbus = ha.EventBus()
statemachine = ha.StateMachine(eventbus)
bus = ha.Bus()
statemachine = ha.StateMachine(bus)
# Init observers
# Device scanner
@ -53,7 +53,7 @@ def from_config_file(config_path):
# Device Tracker
if device_scanner:
device_tracker = observers.DeviceTracker(
eventbus, statemachine, device_scanner)
bus, statemachine, device_scanner)
statusses.append(("Device Tracker", True))
@ -66,7 +66,7 @@ def from_config_file(config_path):
statusses.append(("Weather - Ephem",
observers.track_sun(
eventbus, statemachine,
bus, statemachine,
config.get("common", "latitude"),
config.get("common", "longitude"))))
@ -86,7 +86,9 @@ def from_config_file(config_path):
# Light trigger
if light_control:
actors.LightTrigger(eventbus, statemachine,
actors.setup_light_control_services(bus, light_control)
actors.LightTrigger(bus, statemachine,
device_tracker, light_control)
statusses.append(("Light Trigger", True))
@ -94,22 +96,22 @@ def from_config_file(config_path):
if config.has_option("chromecast", "host"):
statusses.append(("Chromecast",
actors.setup_chromecast(
eventbus, config.get("chromecast", "host"))))
bus, config.get("chromecast", "host"))))
if config.has_option("downloader", "download_dir"):
result = actors.setup_file_downloader(
eventbus, config.get("downloader", "download_dir"))
bus, config.get("downloader", "download_dir"))
statusses.append(("Downloader", result))
statusses.append(("Webbrowser", actors.setup_webbrowser(eventbus)))
statusses.append(("Webbrowser", actors.setup_webbrowser(bus)))
statusses.append(("Media Buttons", actors.setup_media_buttons(eventbus)))
statusses.append(("Media Buttons", actors.setup_media_buttons(bus)))
# Init HTTP interface
if config.has_option("httpinterface", "api_password"):
httpinterface.HTTPInterface(
eventbus, statemachine,
bus, statemachine,
config.get("httpinterface", "api_password"))
statusses.append(("HTTPInterface", True))
@ -121,4 +123,4 @@ def from_config_file(config_path):
logger.info("{}: {}".format(component, status))
ha.start_home_assistant(eventbus)
ha.start_home_assistant(bus)

View file

@ -97,6 +97,8 @@ URL_API_STATES = "/api/states"
URL_API_STATES_CATEGORY = "/api/states/{}"
URL_API_EVENTS = "/api/events"
URL_API_EVENTS_EVENT = "/api/events/{}"
URL_API_SERVICES = "/api/services"
URL_API_SERVICES_SERVICE = "/api/services/{}/{}"
URL_STATIC = "/static/{}"
@ -105,7 +107,7 @@ class HTTPInterface(threading.Thread):
""" Provides an HTTP interface for Home Assistant. """
# pylint: disable=too-many-arguments
def __init__(self, eventbus, statemachine, api_password,
def __init__(self, bus, statemachine, api_password,
server_port=None, server_host=None):
threading.Thread.__init__(self)
@ -122,12 +124,12 @@ class HTTPInterface(threading.Thread):
self.server.flash_message = None
self.server.logger = logging.getLogger(__name__)
self.server.eventbus = eventbus
self.server.bus = bus
self.server.statemachine = statemachine
self.server.api_password = api_password
eventbus.listen_once(ha.EVENT_HOMEASSISTANT_START,
lambda event: self.start())
bus.listen_once_event(ha.EVENT_HOMEASSISTANT_START,
lambda event: self.start())
def run(self):
""" Start the HTTP interface. """
@ -143,6 +145,7 @@ class RequestHandler(BaseHTTPRequestHandler):
('GET', '/', '_handle_get_root'),
('POST', re.compile(r'/change_state'), '_handle_change_state'),
('POST', re.compile(r'/fire_event'), '_handle_fire_event'),
('POST', re.compile(r'/call_service'), '_handle_call_service'),
# /states
('GET', '/api/states', '_handle_get_api_states'),
@ -159,6 +162,14 @@ class RequestHandler(BaseHTTPRequestHandler):
re.compile(r'/api/events/(?P<event_type>[a-zA-Z\._0-9]+)'),
'_handle_fire_event'),
# /services
('GET', '/api/services', '_handle_get_api_services'),
('POST',
re.compile((r'/api/services/'
r'(?P<domain>[a-zA-Z\._0-9]+)/'
r'(?P<service>[a-zA-Z\._0-9]+)')),
'_handle_call_service'),
# Statis files
('GET', re.compile(r'/static/(?P<file>[a-zA-Z\._\-0-9/]+)'),
'_handle_get_static')
@ -301,7 +312,7 @@ class RequestHandler(BaseHTTPRequestHandler):
if self.server.flash_message:
write(("<div class='row'><div class='col-xs-12'>"
"<div class='alert alert-success'>"
"{}</div></div></div>").format(self.server.flash_message))
"{}</div></div></div>").format(self.server.flash_message))
self.server.flash_message = None
@ -356,18 +367,82 @@ class RequestHandler(BaseHTTPRequestHandler):
"</div></div>"))
# Describe event bus:
# Describe bus/services:
write(("<div class='row'>"
"<div class='col-xs-6'>"
"<div class='panel panel-primary'>"
"<div class='panel-heading'><h2 class='panel-title'>"
"Services</h2></div>"
"<table class='table'>"
"<tr><th>Domain</th><th>Service</th></tr>"))
for domain, services in sorted(
self.server.bus.services.items()):
write("<tr><td>{}</td><td>{}</td></tr>".format(
domain, ", ".join(services)))
write(("</table></div></div>"
"<div class='col-xs-6'>"
"<div class='panel panel-primary'>"
"<div class='panel-heading'><h2 class='panel-title'>"
"Call Service</h2></div>"
"<div class='panel-body'>"
"<form method='post' action='/call_service' "
"class='form-horizontal form-fire-event'>"
"<input type='hidden' name='api_password' value='{}'>"
"<div class='form-group'>"
"<label for='domain' class='col-xs-3 control-label'>"
"Domain</label>"
"<div class='col-xs-9'>"
"<input type='text' class='form-control' id='domain'"
" name='domain' placeholder='Service Domain'>"
"</div>"
"</div>"
"<div class='form-group'>"
"<label for='service' class='col-xs-3 control-label'>"
"Service</label>"
"<div class='col-xs-9'>"
"<input type='text' class='form-control' id='service'"
" name='service' placeholder='Service name'>"
"</div>"
"</div>"
"<div class='form-group'>"
"<label for='service_data' class='col-xs-3 control-label'>"
"Service data</label>"
"<div class='col-xs-9'>"
"<textarea rows='3' class='form-control' id='service_data'"
" name='service_data' placeholder='Service Data "
"(JSON, optional)'></textarea>"
"</div>"
"</div>"
"<div class='form-group'>"
"<div class='col-xs-offset-3 col-xs-9'>"
"<button type='submit' class='btn btn-default'>"
"Call Service</button>"
"</div>"
"</div>"
"</form>"
"</div></div></div>"
"</div>").format(self.server.api_password))
# Describe bus/events:
write(("<div class='row'>"
"<div class='col-xs-6'>"
"<div class='panel panel-primary'>"
"<div class='panel-heading'><h2 class='panel-title'>"
"Events</h2></div>"
"<table class='table'>"
"<tr><th>Event Type</th><th>Listeners</th></tr>"))
"<tr><th>Event</th><th>Listeners</th></tr>"))
for event_type, count in sorted(
self.server.eventbus.listeners.items()):
write("<tr><td>{}</td><td>{}</td></tr>".format(event_type, count))
for event, listener_count in sorted(
self.server.bus.event_listeners.items()):
write("<tr><td>{}</td><td>{}</td></tr>".format(
event, listener_count))
write(("</table></div></div>"
@ -483,7 +558,7 @@ class RequestHandler(BaseHTTPRequestHandler):
# Happens if key 'event_data' does not exist
event_data = None
self.server.eventbus.fire(event_type, event_data)
self.server.bus.fire_event(event_type, event_data)
self._message("Event {} fired.".format(event_type))
@ -496,6 +571,41 @@ class RequestHandler(BaseHTTPRequestHandler):
self._message(
"Invalid JSON for event_data", HTTP_UNPROCESSABLE_ENTITY)
def _handle_call_service(self, path_match, data):
""" Handles calling a service.
This handles the following paths:
/call_service
/api/services/<domain>/<service>
"""
try:
try:
domain = path_match.group('domain')
service = path_match.group('service')
except IndexError:
# If group domain or service does not exist in path_match
domain = data['domain'][0]
service = data['service'][0]
try:
service_data = json.loads(data['service_data'][0])
except KeyError:
# Happens if key 'service_data' does not exist
service_data = None
self.server.bus.call_service(domain, service, service_data)
self._message("Service {}/{} called.".format(domain, service))
except KeyError:
# Occurs if domain or service does not exist in data
self._message("No domain or service received.", HTTP_BAD_REQUEST)
except ValueError:
# Occurs during error parsing json
self._message(
"Invalid JSON for service_data", HTTP_UNPROCESSABLE_ENTITY)
# pylint: disable=unused-argument
def _handle_get_api_states(self, path_match, data):
""" Returns the categories which state is being tracked. """
@ -519,7 +629,11 @@ class RequestHandler(BaseHTTPRequestHandler):
def _handle_get_api_events(self, path_match, data):
""" Handles getting overview of event listeners. """
self._write_json({'listeners': self.server.eventbus.listeners})
self._write_json({'event_listeners': self.server.bus.event_listeners})
def _handle_get_api_services(self, path_match, data):
""" Handles getting overview of services. """
self._write_json({'services': self.server.bus.services})
def _handle_get_static(self, path_match, data):
""" Returns a static file. """

View file

@ -19,7 +19,8 @@ import requests
import homeassistant as ha
EVENT_DEVICE_TRACKER_RELOAD = "device_tracker.reload_devices_csv"
DOMAIN_DEVICE_TRACKER = "device_tracker"
SERVICE_DEVICE_TRACKER_RELOAD = "reload_devices_csv"
STATE_CATEGORY_SUN = "weather.sun"
STATE_ATTRIBUTE_NEXT_SUN_RISING = "next_rising"
@ -44,7 +45,7 @@ TOMATO_MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
KNOWN_DEVICES_FILE = "known_devices.csv"
def track_sun(eventbus, statemachine, latitude, longitude):
def track_sun(bus, statemachine, latitude, longitude):
""" Tracks the state of the sun. """
logger = logging.getLogger(__name__)
@ -86,7 +87,7 @@ def track_sun(eventbus, statemachine, latitude, longitude):
statemachine.set_state(STATE_CATEGORY_SUN, new_state, state_attributes)
# +10 seconds to be sure that the change has occured
ha.track_time_change(eventbus, update_sun_state,
ha.track_time_change(bus, update_sun_state,
point_in_time=next_change + timedelta(seconds=10))
update_sun_state(None)
@ -97,9 +98,9 @@ def track_sun(eventbus, statemachine, latitude, longitude):
class DeviceTracker(object):
""" Class that tracks which devices are home and which are not. """
def __init__(self, eventbus, statemachine, device_scanner):
def __init__(self, bus, statemachine, device_scanner):
self.statemachine = statemachine
self.eventbus = eventbus
self.bus = bus
self.device_scanner = device_scanner
self.logger = logging.getLogger(__name__)
@ -113,13 +114,14 @@ class DeviceTracker(object):
self._read_known_devices_file()
ha.track_time_change(eventbus,
ha.track_time_change(bus,
lambda time:
self.update_devices(
device_scanner.scan_devices()))
eventbus.listen(EVENT_DEVICE_TRACKER_RELOAD,
lambda event: self._read_known_devices_file())
bus.register_service(DOMAIN_DEVICE_TRACKER,
SERVICE_DEVICE_TRACKER_RELOAD,
lambda service: self._read_known_devices_file())
@property
def device_state_categories(self):

View file

@ -36,28 +36,58 @@ def _setup_call_api(host, port, api_password):
url = urlparse.urljoin(base_url, path)
if method == METHOD_GET:
return requests.get(url, params=data)
else:
return requests.request(method, url, data=data)
try:
if method == METHOD_GET:
return requests.get(url, params=data)
else:
return requests.request(method, url, data=data)
except requests.exceptions.ConnectionError:
logging.getLogger(__name__).exception("Error connecting to server")
raise ha.HomeAssistantException("Error connecting to server")
return _call_api
class EventBus(ha.EventBus):
""" Drop-in replacement for a normal eventbus that will forward events to
a remote eventbus.
class Bus(ha.Bus):
""" Drop-in replacement for a normal bus that will forward interaction to
a remote bus.
"""
def __init__(self, host, api_password, port=None):
ha.EventBus.__init__(self)
self._call_api = _setup_call_api(host, port, api_password)
ha.Bus.__init__(self)
self.logger = logging.getLogger(__name__)
self._call_api = _setup_call_api(host, port, api_password)
@property
def listeners(self):
def services(self):
""" List the available services. """
try:
req = self._call_api(METHOD_GET, hah.URL_API_SERVICES)
if req.status_code == 200:
data = req.json()
return data['services']
else:
raise ha.HomeAssistantException(
"Got unexpected result (3): {}.".format(req.text))
except ValueError: # If req.json() can't parse the json
self.logger.exception("Bus:Got unexpected result")
raise ha.HomeAssistantException(
"Got unexpected result: {}".format(req.text))
except KeyError: # If not all expected keys are in the returned JSON
self.logger.exception("Bus:Got unexpected result (2)")
raise ha.HomeAssistantException(
"Got unexpected result (2): {}".format(req.text))
@property
def event_listeners(self):
""" List of events that is being listened for. """
try:
req = self._call_api(METHOD_GET, hah.URL_API_EVENTS)
@ -65,54 +95,72 @@ class EventBus(ha.EventBus):
if req.status_code == 200:
data = req.json()
return data['listeners']
return data['event_listeners']
else:
raise ha.HomeAssistantException(
"Got unexpected result (3): {}.".format(req.text))
except requests.exceptions.ConnectionError:
self.logger.exception("EventBus:Error connecting to server")
raise ha.HomeAssistantException("Error connecting to server")
except ValueError: # If req.json() can't parse the json
self.logger.exception("EventBus:Got unexpected result")
self.logger.exception("Bus:Got unexpected result")
raise ha.HomeAssistantException(
"Got unexpected result: {}".format(req.text))
except KeyError: # If not all expected keys are in the returned JSON
self.logger.exception("EventBus:Got unexpected result (2)")
self.logger.exception("Bus:Got unexpected result (2)")
raise ha.HomeAssistantException(
"Got unexpected result (2): {}".format(req.text))
def fire(self, event_type, event_data=None):
""" Fire an event. """
def call_service(self, domain, service, service_data=None):
""" Calls a service. """
data = {'event_data': json.dumps(event_data)} if event_data else None
if service_data:
data = {'service_data': json.dumps(service_data)}
else:
data = None
try:
req = self._call_api(METHOD_POST,
hah.URL_API_EVENTS_EVENT.format(event_type),
data)
req = self._call_api(METHOD_POST,
hah.URL_API_SERVICES_SERVICE.format(
domain, service),
data)
if req.status_code != 200:
error = "Error firing event: {} - {}".format(
req.status_code, req.text)
if req.status_code != 200:
error = "Error calling service: {} - {}".format(
req.status_code, req.text)
self.logger.error("EventBus:{}".format(error))
raise ha.HomeAssistantException(error)
self.logger.error("Bus:{}".format(error))
raise ha.HomeAssistantException(error)
except requests.exceptions.ConnectionError:
self.logger.exception("EventBus:Error connecting to server")
def listen(self, event_type, listener):
""" Not implemented for remote eventbus.
def register_service(self, domain, service, service_callback):
""" Not implemented for remote bus.
Will throw NotImplementedError. """
raise NotImplementedError
def remove_listener(self, event_type, listener):
""" Not implemented for remote eventbus.
def fire_event(self, event_type, event_data=None):
""" Fire an event. """
data = {'event_data': json.dumps(event_data)} if event_data else None
req = self._call_api(METHOD_POST,
hah.URL_API_EVENTS_EVENT.format(event_type),
data)
if req.status_code != 200:
error = "Error firing event: {} - {}".format(
req.status_code, req.text)
self.logger.error("Bus:{}".format(error))
raise ha.HomeAssistantException(error)
def listen_event(self, event_type, listener):
""" Not implemented for remote bus.
Will throw NotImplementedError. """
raise NotImplementedError
def remove_event_listener(self, event_type, listener):
""" Not implemented for remote bus.
Will throw NotImplementedError. """

View file

@ -34,23 +34,23 @@ def ensure_homeassistant_started():
""" Ensures home assistant is started. """
if not HAHelper.core:
core = {'eventbus': ha.EventBus()}
core['statemachine'] = ha.StateMachine(core['eventbus'])
core = {'bus': ha.Bus()}
core['statemachine'] = ha.StateMachine(core['bus'])
core['eventbus'].listen('test_event', len)
core['bus'].listen_event('test_event', len)
core['statemachine'].set_state('test', 'a_state')
hah.HTTPInterface(core['eventbus'], core['statemachine'],
hah.HTTPInterface(core['bus'], core['statemachine'],
API_PASSWORD)
core['eventbus'].fire(ha.EVENT_HOMEASSISTANT_START)
core['bus'].fire_event(ha.EVENT_HOMEASSISTANT_START)
# Give objects time to startup
time.sleep(1)
HAHelper.core = core
return HAHelper.core['eventbus'], HAHelper.core['statemachine']
return HAHelper.core['bus'], HAHelper.core['statemachine']
# pylint: disable=too-many-public-methods
@ -60,7 +60,7 @@ class TestHTTPInterface(unittest.TestCase):
@classmethod
def setUpClass(cls): # pylint: disable=invalid-name
""" things to be run when tests are started. """
cls.eventbus, cls.statemachine = ensure_homeassistant_started()
cls.bus, cls.statemachine = ensure_homeassistant_started()
def test_debug_interface(self):
""" Test if we can login by comparing not logged in screen to
@ -109,7 +109,7 @@ class TestHTTPInterface(unittest.TestCase):
if "test" in event.data:
test_value.append(1)
self.eventbus.listen_once("test_event_with_data", listener)
self.bus.listen_once_event("test_event_with_data", listener)
requests.post(
_url(hah.URL_FIRE_EVENT),
@ -195,7 +195,7 @@ class TestHTTPInterface(unittest.TestCase):
""" Helper method that will verify our event got called. """
test_value.append(1)
self.eventbus.listen_once("test.event_no_data", listener)
self.bus.listen_once_event("test.event_no_data", listener)
requests.post(
_url(hah.URL_API_EVENTS_EVENT.format("test.event_no_data")),
@ -217,7 +217,7 @@ class TestHTTPInterface(unittest.TestCase):
if "test" in event.data:
test_value.append(1)
self.eventbus.listen_once("test_event_with_data", listener)
self.bus.listen_once_event("test_event_with_data", listener)
requests.post(
_url(hah.URL_API_EVENTS_EVENT.format("test_event_with_data")),
@ -238,7 +238,7 @@ class TestHTTPInterface(unittest.TestCase):
""" Helper method that will verify our event got called. """
test_value.append(1)
self.eventbus.listen_once("test_event_with_bad_data", listener)
self.bus.listen_once_event("test_event_with_bad_data", listener)
req = requests.post(
_url(hah.URL_API_EVENTS_EVENT.format("test_event")),
@ -258,7 +258,59 @@ class TestHTTPInterface(unittest.TestCase):
data = req.json()
self.assertEqual(data['listeners'], self.eventbus.listeners)
self.assertEqual(data['event_listeners'], self.bus.event_listeners)
def test_api_get_services(self):
""" Test if we can get a dict describing current services. """
req = requests.get(_url(hah.URL_API_SERVICES),
params={"api_password": API_PASSWORD})
data = req.json()
self.assertEqual(data['services'], self.bus.services)
def test_api_call_service_no_data(self):
""" Test if the API allows us to call a service. """
test_value = []
def listener(service_call): # pylint: disable=unused-argument
""" Helper method that will verify that our service got called. """
test_value.append(1)
self.bus.register_service("test_domain", "test_service", listener)
requests.post(
_url(hah.URL_API_SERVICES_SERVICE.format(
"test_domain", "test_service")),
data={"api_password": API_PASSWORD})
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
def test_api_call_service_with_data(self):
""" Test if the API allows us to call a service. """
test_value = []
def listener(service_call): # pylint: disable=unused-argument
""" Helper method that will verify that our service got called and
that test if our data came through. """
if "test" in service_call.data:
test_value.append(1)
self.bus.register_service("test_domain", "test_service", listener)
requests.post(
_url(hah.URL_API_SERVICES_SERVICE.format(
"test_domain", "test_service")),
data={"service_data": '{"test": 1}',
"api_password": API_PASSWORD})
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
class TestRemote(unittest.TestCase):
@ -267,10 +319,10 @@ class TestRemote(unittest.TestCase):
@classmethod
def setUpClass(cls): # pylint: disable=invalid-name
""" things to be run when tests are started. """
cls.eventbus, cls.statemachine = ensure_homeassistant_started()
cls.bus, cls.statemachine = ensure_homeassistant_started()
cls.remote_sm = remote.StateMachine("127.0.0.1", API_PASSWORD)
cls.remote_eb = remote.EventBus("127.0.0.1", API_PASSWORD)
cls.remote_eb = remote.Bus("127.0.0.1", API_PASSWORD)
cls.sm_with_remote_eb = ha.StateMachine(cls.remote_eb)
cls.sm_with_remote_eb.set_state("test", "a_state")
@ -307,20 +359,21 @@ class TestRemote(unittest.TestCase):
def test_remote_eb_listening_for_same(self):
""" Test if remote EB correctly reports listener overview. """
self.assertEqual(self.eventbus.listeners, self.remote_eb.listeners)
self.assertEqual(self.bus.event_listeners,
self.remote_eb.event_listeners)
# pylint: disable=invalid-name
def test_remote_eb_fire_event_with_no_data(self):
""" Test if the remote eventbus allows us to fire an event. """
""" Test if the remote bus allows us to fire an event. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify our event got called. """
test_value.append(1)
self.eventbus.listen_once("test_event_no_data", listener)
self.bus.listen_once_event("test_event_no_data", listener)
self.remote_eb.fire("test_event_no_data")
self.remote_eb.fire_event("test_event_no_data")
# Allow the event to take place
time.sleep(1)
@ -329,7 +382,7 @@ class TestRemote(unittest.TestCase):
# pylint: disable=invalid-name
def test_remote_eb_fire_event_with_data(self):
""" Test if the remote eventbus allows us to fire an event. """
""" Test if the remote bus allows us to fire an event. """
test_value = []
def listener(event): # pylint: disable=unused-argument
@ -337,9 +390,46 @@ class TestRemote(unittest.TestCase):
if event.data["test"] == 1:
test_value.append(1)
self.eventbus.listen_once("test_event_with_data", listener)
self.bus.listen_once_event("test_event_with_data", listener)
self.remote_eb.fire("test_event_with_data", {"test": 1})
self.remote_eb.fire_event("test_event_with_data", {"test": 1})
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
# pylint: disable=invalid-name
def test_remote_eb_call_service_with_no_data(self):
""" Test if the remote bus allows us to fire a service. """
test_value = []
def listener(service_call): # pylint: disable=unused-argument
""" Helper method that will verify our service got called. """
test_value.append(1)
self.bus.register_service("test_domain", "test_service", listener)
self.remote_eb.call_service("test_domain", "test_service")
# Allow the service call to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
# pylint: disable=invalid-name
def test_remote_eb_call_service_with_data(self):
""" Test if the remote bus allows us to fire an event. """
test_value = []
def listener(service_call): # pylint: disable=unused-argument
""" Helper method that will verify our service got called. """
if service_call.data["test"] == 1:
test_value.append(1)
self.bus.register_service("test_domain", "test_service", listener)
self.remote_eb.call_service("test_domain", "test_service", {"test": 1})
# Allow the event to take place
time.sleep(1)
@ -348,14 +438,14 @@ class TestRemote(unittest.TestCase):
def test_local_sm_with_remote_eb(self):
""" Test if we get the event if we change a state on a
StateMachine connected to a remote eventbus. """
StateMachine connected to a remote bus. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify our event got called. """
test_value.append(1)
self.eventbus.listen_once(ha.EVENT_STATE_CHANGED, listener)
self.bus.listen_once_event(ha.EVENT_STATE_CHANGED, listener)
self.sm_with_remote_eb.set_state("test", "local sm with remote eb")