Remote instances are now 100% operational

This commit is contained in:
Paulus Schoutsen 2014-04-29 00:30:31 -07:00
parent 8e65afa994
commit 50b492c64a
12 changed files with 770 additions and 529 deletions

View file

@ -9,6 +9,7 @@ of entities and react to changes.
import time
import logging
import threading
import enum
import datetime as dt
import functools as ft
@ -40,24 +41,21 @@ class HomeAssistant(object):
""" Core class to route all communication to right components. """
def __init__(self):
self._pool = pool = _create_worker_pool()
self._pool = pool = create_worker_pool()
self.bus = EventBus(pool)
self.states = StateMachine(self.bus)
self.services = ServiceRegistry(self.bus, pool)
self.states = StateMachine(self.bus)
def start(self, non_blocking=False):
""" Start home assistant.
Set non_blocking to True if you don't want this method to block
as long as Home Assistant is running. """
def start(self):
""" Start home assistant. """
Timer(self)
self.bus.fire(EVENT_HOMEASSISTANT_START)
if non_blocking:
return
def block_till_stopped(self):
""" Will register service homeassistant/stop and
will block until called. """
request_shutdown = threading.Event()
self.services.register(DOMAIN, SERVICE_HOMEASSISTANT_STOP,
@ -96,6 +94,7 @@ class HomeAssistant(object):
def state_listener(event):
""" The listener that listens for specific state changes. """
if entity_id == event.data['entity_id'] and \
'old_state' in event.data and \
_matcher(event.data['old_state'].state, from_state) and \
_matcher(event.data['new_state'].state, to_state):
@ -235,7 +234,7 @@ class JobPriority(util.OrderedEnum):
return JobPriority.EVENT_DEFAULT
def _create_worker_pool(thread_count=POOL_NUM_THREAD):
def create_worker_pool(thread_count=POOL_NUM_THREAD):
""" Creates a worker pool to be used. """
logger = logging.getLogger(__name__)
@ -264,22 +263,37 @@ def _create_worker_pool(thread_count=POOL_NUM_THREAD):
return util.ThreadPool(thread_count, job_handler, busy_callback)
class EventOrigin(enum.Enum):
""" Distinguish between origin of event. """
# pylint: disable=no-init
local = "LOCAL"
remote = "REMOTE"
def __str__(self):
return self.value
# pylint: disable=too-few-public-methods
class Event(object):
""" Represents an event within the Bus. """
__slots__ = ['event_type', 'data']
__slots__ = ['event_type', 'data', 'origin']
def __init__(self, event_type, data=None):
def __init__(self, event_type, data=None, origin=EventOrigin.local):
self.event_type = event_type
self.data = data or {}
self.origin = origin
def __repr__(self):
# pylint: disable=maybe-no-member
if self.data:
return "<Event {}: {}>".format(
self.event_type, util.repr_helper(self.data))
return "<Event {}[{}]: {}>".format(
self.event_type, self.origin.value[0],
util.repr_helper(self.data))
else:
return "<Event {}>".format(self.event_type)
return "<Event {}[{}]>".format(self.event_type,
self.origin.value[0])
class EventBus(object):
@ -291,7 +305,7 @@ class EventBus(object):
self._listeners = {}
self._logger = logging.getLogger(__name__)
self._lock = threading.Lock()
self._pool = pool or _create_worker_pool()
self._pool = pool or create_worker_pool()
@property
def listeners(self):
@ -302,7 +316,7 @@ class EventBus(object):
return {key: len(self._listeners[key])
for key in self._listeners}
def fire(self, event_type, event_data=None):
def fire(self, event_type, event_data=None, origin=EventOrigin.local):
""" Fire an event. """
with self._lock:
# Copy the list of the current listeners because some listeners
@ -311,7 +325,7 @@ class EventBus(object):
get = self._listeners.get
listeners = get(MATCH_ALL, []) + get(event_type, [])
event = Event(event_type, event_data)
event = Event(event_type, event_data, origin)
self._logger.info("Bus:Handling {}".format(event))
@ -390,7 +404,9 @@ class State(object):
""" Static method to create a state from a dict.
Ensures: state == State.from_json_dict(state.to_json_dict()) """
if 'entity_id' not in json_dict and 'state' not in json_dict:
if not (json_dict and
'entity_id' in json_dict and
'state' in json_dict):
return None
last_changed = json_dict.get('last_changed')
@ -429,6 +445,11 @@ class StateMachine(object):
""" List of entity ids that are being tracked. """
return list(self._states.keys())
def all(self):
""" Returns a dict mapping all entity_ids to their state. """
return {entity_id: state.copy() for entity_id, state
in self._states.items()}
def get(self, entity_id):
""" Returns the state of the specified entity. """
state = self._states.get(entity_id)
@ -456,24 +477,22 @@ class StateMachine(object):
attributes = attributes or {}
with self._lock:
if entity_id in self._states:
old_state = self._states[entity_id]
old_state = self._states.get(entity_id)
if old_state.state != new_state or \
old_state.attributes != attributes:
# If state did not exist or is different, set it
if not old_state or \
old_state.state != new_state or \
old_state.attributes != attributes:
state = self._states[entity_id] = \
State(entity_id, new_state, attributes)
state = self._states[entity_id] = \
State(entity_id, new_state, attributes)
self._bus.fire(EVENT_STATE_CHANGED,
{'entity_id': entity_id,
'old_state': old_state,
'new_state': state})
event_data = {'entity_id': entity_id, 'new_state': state}
else:
# If state did not exist yet
self._states[entity_id] = State(entity_id, new_state,
attributes)
if old_state:
event_data['old_state'] = old_state
self._bus.fire(EVENT_STATE_CHANGED, event_data)
# pylint: disable=too-few-public-methods
@ -501,7 +520,7 @@ class ServiceRegistry(object):
def __init__(self, bus, pool=None):
self._services = {}
self._lock = threading.Lock()
self._pool = pool or _create_worker_pool()
self._pool = pool or create_worker_pool()
bus.listen(EVENT_CALL_SERVICE, self._event_to_service_call)
@property