Migrate core from threads to async awesomeness (#3248)
* Add event loop to the core * Add block_till_done to HA core object * Fix some tests * Linting core * Fix statemachine tests * Core test fixes * fix block_till_done to wait for loop and queue to empty * fix test_core for passing, and correct start/stop/block_till_done * Fix remote tests * Fix tests: block_till_done * Fix linting * Fix more tests * Fix final linting * Fix remote test * remove unnecessary import * reduce sleep to avoid slowing down the tests excessively * fix remaining tests to wait for non-threadsafe operations * Add async_ doc strings for event loop / coroutine info * Fix command line test to block for the right timeout * Fix py3.4.2 loop var access * Fix SERVICE_CALL_LIMIT being in effect for other tests * Fix lint errors * Fix lint error with proper placement * Fix slave start to not start a timer * Add asyncio compatible listeners. * Increase min Python version to 3.4.2 * Move async backports to util * Add backported async tests * Fix linting * Simplify Python version check * Fix lint * Remove unneeded try/except and queue listener appproriately. * Fix tuple vs. list unorderable error on version compare. * Fix version tests
This commit is contained in:
parent
24f1bff7f1
commit
609d7ebea5
98 changed files with 1680 additions and 1109 deletions
|
@ -4,18 +4,20 @@ Core components of Home Assistant.
|
|||
Home Assistant is a Home Automation framework for observing the state
|
||||
of entities and react to changes.
|
||||
"""
|
||||
|
||||
# pylint: disable=unused-import, too-many-lines
|
||||
import asyncio
|
||||
import enum
|
||||
import functools as ft
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import signal
|
||||
import threading
|
||||
import sys
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from types import MappingProxyType
|
||||
|
||||
# pylint: disable=unused-import
|
||||
from typing import Optional, Any, Callable, List # NOQA
|
||||
|
||||
import voluptuous as vol
|
||||
|
@ -30,6 +32,8 @@ from homeassistant.const import (
|
|||
SERVICE_HOMEASSISTANT_RESTART, SERVICE_HOMEASSISTANT_STOP, __version__)
|
||||
from homeassistant.exceptions import (
|
||||
HomeAssistantError, InvalidEntityFormatError)
|
||||
from homeassistant.util.async import (
|
||||
run_coroutine_threadsafe, run_callback_threadsafe)
|
||||
import homeassistant.util as util
|
||||
import homeassistant.util.dt as dt_util
|
||||
import homeassistant.util.location as location
|
||||
|
@ -103,14 +107,20 @@ class JobPriority(util.OrderedEnum):
|
|||
class HomeAssistant(object):
|
||||
"""Root object of the Home Assistant home automation."""
|
||||
|
||||
def __init__(self):
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
||||
def __init__(self, loop=None):
|
||||
"""Initialize new Home Assistant object."""
|
||||
self.loop = loop or asyncio.get_event_loop()
|
||||
self.executer = ThreadPoolExecutor(max_workers=5)
|
||||
self.loop.set_default_executor(self.executer)
|
||||
self.pool = pool = create_worker_pool()
|
||||
self.bus = EventBus(pool)
|
||||
self.services = ServiceRegistry(self.bus, self.add_job)
|
||||
self.states = StateMachine(self.bus)
|
||||
self.bus = EventBus(pool, self.loop)
|
||||
self.services = ServiceRegistry(self.bus, self.add_job, self.loop)
|
||||
self.states = StateMachine(self.bus, self.loop)
|
||||
self.config = Config() # type: Config
|
||||
self.state = CoreState.not_running
|
||||
self.exit_code = None
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
|
@ -123,9 +133,65 @@ class HomeAssistant(object):
|
|||
"Starting Home Assistant (%d threads)", self.pool.worker_count)
|
||||
self.state = CoreState.starting
|
||||
|
||||
# Register the async start
|
||||
self.loop.create_task(self.async_start())
|
||||
|
||||
@asyncio.coroutine
|
||||
def stop_homeassistant(*args):
|
||||
"""Stop Home Assistant."""
|
||||
self.exit_code = 0
|
||||
yield from self.async_stop()
|
||||
|
||||
@asyncio.coroutine
|
||||
def restart_homeassistant(*args):
|
||||
"""Restart Home Assistant."""
|
||||
self.exit_code = RESTART_EXIT_CODE
|
||||
yield from self.async_stop()
|
||||
|
||||
# Register the restart/stop event
|
||||
self.loop.call_soon(
|
||||
self.services.async_register,
|
||||
DOMAIN, SERVICE_HOMEASSISTANT_STOP, stop_homeassistant
|
||||
)
|
||||
self.loop.call_soon(
|
||||
self.services.async_register,
|
||||
DOMAIN, SERVICE_HOMEASSISTANT_RESTART, restart_homeassistant
|
||||
)
|
||||
|
||||
# Setup signal handling
|
||||
try:
|
||||
signal.signal(signal.SIGTERM, stop_homeassistant)
|
||||
except ValueError:
|
||||
_LOGGER.warning(
|
||||
'Could not bind to SIGTERM. Are you running in a thread?')
|
||||
try:
|
||||
signal.signal(signal.SIGHUP, restart_homeassistant)
|
||||
except ValueError:
|
||||
_LOGGER.warning(
|
||||
'Could not bind to SIGHUP. Are you running in a thread?')
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
# Run forever and catch keyboard interrupt
|
||||
try:
|
||||
# Block until stopped
|
||||
_LOGGER.info("Starting Home Assistant core loop")
|
||||
self.loop.run_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
self.loop.create_task(stop_homeassistant())
|
||||
self.loop.run_forever()
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_start(self):
|
||||
"""Finalize startup from inside the event loop.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
create_timer(self)
|
||||
self.bus.fire(EVENT_HOMEASSISTANT_START)
|
||||
self.pool.block_till_done()
|
||||
self.bus.async_fire(EVENT_HOMEASSISTANT_START)
|
||||
yield from self.loop.run_in_executor(None, self.pool.block_till_done)
|
||||
self.state = CoreState.running
|
||||
|
||||
def add_job(self,
|
||||
|
@ -139,55 +205,66 @@ class HomeAssistant(object):
|
|||
"""
|
||||
self.pool.add_job(priority, (target,) + args)
|
||||
|
||||
def block_till_stopped(self) -> int:
|
||||
"""Register service homeassistant/stop and will block until called."""
|
||||
request_shutdown = threading.Event()
|
||||
request_restart = threading.Event()
|
||||
def _loop_empty(self):
|
||||
"""Python 3.4.2 empty loop compatibility function."""
|
||||
# pylint: disable=protected-access
|
||||
if sys.version_info < (3, 4, 3):
|
||||
return len(self.loop._scheduled) == 0 and \
|
||||
len(self.loop._ready) == 0
|
||||
else:
|
||||
return self.loop._current_handle is None and \
|
||||
len(self.loop._ready) == 0
|
||||
|
||||
def stop_homeassistant(*args):
|
||||
"""Stop Home Assistant."""
|
||||
request_shutdown.set()
|
||||
def block_till_done(self):
|
||||
"""Block till all pending work is done."""
|
||||
import threading
|
||||
|
||||
def restart_homeassistant(*args):
|
||||
"""Reset Home Assistant."""
|
||||
_LOGGER.warning('Home Assistant requested a restart.')
|
||||
request_restart.set()
|
||||
request_shutdown.set()
|
||||
complete = threading.Event()
|
||||
|
||||
self.services.register(
|
||||
DOMAIN, SERVICE_HOMEASSISTANT_STOP, stop_homeassistant)
|
||||
self.services.register(
|
||||
DOMAIN, SERVICE_HOMEASSISTANT_RESTART, restart_homeassistant)
|
||||
@asyncio.coroutine
|
||||
def sleep_wait():
|
||||
"""Sleep in thread pool."""
|
||||
yield from self.loop.run_in_executor(None, time.sleep, 0)
|
||||
|
||||
try:
|
||||
signal.signal(signal.SIGTERM, stop_homeassistant)
|
||||
except ValueError:
|
||||
_LOGGER.warning(
|
||||
'Could not bind to SIGTERM. Are you running in a thread?')
|
||||
try:
|
||||
signal.signal(signal.SIGHUP, restart_homeassistant)
|
||||
except ValueError:
|
||||
_LOGGER.warning(
|
||||
'Could not bind to SIGHUP. Are you running in a thread?')
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
while not request_shutdown.is_set():
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
self.stop()
|
||||
def notify_when_done():
|
||||
"""Notify event loop when pool done."""
|
||||
while True:
|
||||
# Wait for the work queue to empty
|
||||
self.pool.block_till_done()
|
||||
|
||||
return RESTART_EXIT_CODE if request_restart.is_set() else 0
|
||||
# Verify the loop is empty
|
||||
if self._loop_empty():
|
||||
break
|
||||
|
||||
# sleep in the loop executor, this forces execution back into
|
||||
# the event loop to avoid the block thread from starving the
|
||||
# async loop
|
||||
run_coroutine_threadsafe(
|
||||
sleep_wait(),
|
||||
self.loop
|
||||
).result()
|
||||
|
||||
complete.set()
|
||||
|
||||
threading.Thread(name="BlockThread", target=notify_when_done).start()
|
||||
complete.wait()
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop Home Assistant and shuts down all threads."""
|
||||
_LOGGER.info("Stopping")
|
||||
run_coroutine_threadsafe(self.async_stop(), self.loop)
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_stop(self) -> None:
|
||||
"""Stop Home Assistant and shuts down all threads.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
self.state = CoreState.stopping
|
||||
self.bus.fire(EVENT_HOMEASSISTANT_STOP)
|
||||
self.pool.stop()
|
||||
self.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
|
||||
yield from self.loop.run_in_executor(None, self.pool.block_till_done)
|
||||
yield from self.loop.run_in_executor(None, self.pool.stop)
|
||||
self.state = CoreState.not_running
|
||||
self.loop.stop()
|
||||
|
||||
|
||||
class EventOrigin(enum.Enum):
|
||||
|
@ -247,43 +324,69 @@ class Event(object):
|
|||
class EventBus(object):
|
||||
"""Allows firing of and listening for events."""
|
||||
|
||||
def __init__(self, pool: util.ThreadPool) -> None:
|
||||
def __init__(self, pool: util.ThreadPool,
|
||||
loop: asyncio.AbstractEventLoop) -> None:
|
||||
"""Initialize a new event bus."""
|
||||
self._listeners = {}
|
||||
self._lock = threading.Lock()
|
||||
self._pool = pool
|
||||
self._loop = loop
|
||||
|
||||
def async_listeners(self):
|
||||
"""Dict with events and the number of listeners.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
return {key: len(self._listeners[key])
|
||||
for key in self._listeners}
|
||||
|
||||
@property
|
||||
def listeners(self):
|
||||
"""Dict with events and the number of listeners."""
|
||||
with self._lock:
|
||||
return {key: len(self._listeners[key])
|
||||
for key in self._listeners}
|
||||
return run_callback_threadsafe(
|
||||
self._loop, self.async_listeners
|
||||
).result()
|
||||
|
||||
def fire(self, event_type: str, event_data=None, origin=EventOrigin.local):
|
||||
"""Fire an event."""
|
||||
if not self._pool.running:
|
||||
raise HomeAssistantError('Home Assistant has shut down.')
|
||||
|
||||
with self._lock:
|
||||
# Copy the list of the current listeners because some listeners
|
||||
# remove themselves as a listener while being executed which
|
||||
# causes the iterator to be confused.
|
||||
get = self._listeners.get
|
||||
listeners = get(MATCH_ALL, []) + get(event_type, [])
|
||||
self._loop.call_soon_threadsafe(self.async_fire, event_type,
|
||||
event_data, origin)
|
||||
return
|
||||
|
||||
event = Event(event_type, event_data, origin)
|
||||
def async_fire(self, event_type: str, event_data=None,
|
||||
origin=EventOrigin.local, wait=False):
|
||||
"""Fire an event.
|
||||
|
||||
if event_type != EVENT_TIME_CHANGED:
|
||||
_LOGGER.info("Bus:Handling %s", event)
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
# Copy the list of the current listeners because some listeners
|
||||
# remove themselves as a listener while being executed which
|
||||
# causes the iterator to be confused.
|
||||
get = self._listeners.get
|
||||
listeners = get(MATCH_ALL, []) + get(event_type, [])
|
||||
|
||||
if not listeners:
|
||||
return
|
||||
event = Event(event_type, event_data, origin)
|
||||
|
||||
job_priority = JobPriority.from_event_type(event_type)
|
||||
if event_type != EVENT_TIME_CHANGED:
|
||||
_LOGGER.info("Bus:Handling %s", event)
|
||||
|
||||
for func in listeners:
|
||||
self._pool.add_job(job_priority, (func, event))
|
||||
if not listeners:
|
||||
return
|
||||
|
||||
job_priority = JobPriority.from_event_type(event_type)
|
||||
|
||||
sync_jobs = []
|
||||
for func in listeners:
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
self._loop.create_task(func(event))
|
||||
else:
|
||||
sync_jobs.append((job_priority, (func, event)))
|
||||
|
||||
# Send all the sync jobs at once
|
||||
if sync_jobs:
|
||||
self._pool.add_many_jobs(sync_jobs)
|
||||
|
||||
def listen(self, event_type, listener):
|
||||
"""Listen for all events or events of a specific type.
|
||||
|
@ -291,11 +394,9 @@ class EventBus(object):
|
|||
To listen to all events specify the constant ``MATCH_ALL``
|
||||
as event_type.
|
||||
"""
|
||||
with self._lock:
|
||||
if event_type in self._listeners:
|
||||
self._listeners[event_type].append(listener)
|
||||
else:
|
||||
self._listeners[event_type] = [listener]
|
||||
future = run_callback_threadsafe(
|
||||
self._loop, self.async_listen, event_type, listener)
|
||||
future.result()
|
||||
|
||||
def remove_listener():
|
||||
"""Remove the listener."""
|
||||
|
@ -303,6 +404,25 @@ class EventBus(object):
|
|||
|
||||
return remove_listener
|
||||
|
||||
def async_listen(self, event_type, listener):
|
||||
"""Listen for all events or events of a specific type.
|
||||
|
||||
To listen to all events specify the constant ``MATCH_ALL``
|
||||
as event_type.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
if event_type in self._listeners:
|
||||
self._listeners[event_type].append(listener)
|
||||
else:
|
||||
self._listeners[event_type] = [listener]
|
||||
|
||||
def remove_listener():
|
||||
"""Remove the listener."""
|
||||
self.async_remove_listener(event_type, listener)
|
||||
|
||||
return remove_listener
|
||||
|
||||
def listen_once(self, event_type, listener):
|
||||
"""Listen once for event of a specific type.
|
||||
|
||||
|
@ -331,6 +451,41 @@ class EventBus(object):
|
|||
|
||||
return remove_listener
|
||||
|
||||
def async_listen_once(self, event_type, listener):
|
||||
"""Listen once for event of a specific type.
|
||||
|
||||
To listen to all events specify the constant ``MATCH_ALL``
|
||||
as event_type.
|
||||
|
||||
Returns registered listener that can be used with remove_listener.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
@ft.wraps(listener)
|
||||
@asyncio.coroutine
|
||||
def onetime_listener(event):
|
||||
"""Remove listener from eventbus and then fire listener."""
|
||||
if hasattr(onetime_listener, 'run'):
|
||||
return
|
||||
# Set variable so that we will never run twice.
|
||||
# Because the event bus loop might have async_fire queued multiple
|
||||
# times, its possible this listener may already be lined up
|
||||
# multiple times as well.
|
||||
# This will make sure the second time it does nothing.
|
||||
setattr(onetime_listener, 'run', True)
|
||||
|
||||
self.async_remove_listener(event_type, onetime_listener)
|
||||
|
||||
if asyncio.iscoroutinefunction(listener):
|
||||
yield from listener(event)
|
||||
else:
|
||||
job_priority = JobPriority.from_event_type(event.event_type)
|
||||
self._pool.add_job(job_priority, (listener, event))
|
||||
|
||||
self.async_listen(event_type, onetime_listener)
|
||||
|
||||
return onetime_listener
|
||||
|
||||
def remove_listener(self, event_type, listener):
|
||||
"""Remove a listener of a specific event_type. (DEPRECATED 0.28)."""
|
||||
_LOGGER.warning('bus.remove_listener has been deprecated. Please use '
|
||||
|
@ -339,19 +494,28 @@ class EventBus(object):
|
|||
|
||||
def _remove_listener(self, event_type, listener):
|
||||
"""Remove a listener of a specific event_type."""
|
||||
with self._lock:
|
||||
try:
|
||||
self._listeners[event_type].remove(listener)
|
||||
future = run_callback_threadsafe(
|
||||
self._loop,
|
||||
self.async_remove_listener, event_type, listener
|
||||
)
|
||||
future.result()
|
||||
|
||||
# delete event_type list if empty
|
||||
if not self._listeners[event_type]:
|
||||
self._listeners.pop(event_type)
|
||||
def async_remove_listener(self, event_type, listener):
|
||||
"""Remove a listener of a specific event_type.
|
||||
|
||||
except (KeyError, ValueError):
|
||||
# KeyError is key event_type listener did not exist
|
||||
# ValueError if listener did not exist within event_type
|
||||
_LOGGER.warning('Unable to remove unknown listener %s',
|
||||
listener)
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
try:
|
||||
self._listeners[event_type].remove(listener)
|
||||
|
||||
# delete event_type list if empty
|
||||
if not self._listeners[event_type]:
|
||||
self._listeners.pop(event_type)
|
||||
except (KeyError, ValueError):
|
||||
# KeyError is key event_type listener did not exist
|
||||
# ValueError if listener did not exist within event_type
|
||||
_LOGGER.warning('Unable to remove unknown listener %s',
|
||||
listener)
|
||||
|
||||
|
||||
class State(object):
|
||||
|
@ -455,27 +619,39 @@ class State(object):
|
|||
class StateMachine(object):
|
||||
"""Helper class that tracks the state of different entities."""
|
||||
|
||||
def __init__(self, bus):
|
||||
def __init__(self, bus, loop):
|
||||
"""Initialize state machine."""
|
||||
self._states = {}
|
||||
self._bus = bus
|
||||
self._lock = threading.Lock()
|
||||
self._loop = loop
|
||||
|
||||
def entity_ids(self, domain_filter=None):
|
||||
"""List of entity ids that are being tracked."""
|
||||
future = run_callback_threadsafe(
|
||||
self._loop, self.async_entity_ids, domain_filter
|
||||
)
|
||||
return future.result()
|
||||
|
||||
def async_entity_ids(self, domain_filter=None):
|
||||
"""List of entity ids that are being tracked."""
|
||||
if domain_filter is None:
|
||||
return list(self._states.keys())
|
||||
|
||||
domain_filter = domain_filter.lower()
|
||||
|
||||
with self._lock:
|
||||
return [state.entity_id for state in self._states.values()
|
||||
if state.domain == domain_filter]
|
||||
return [state.entity_id for state in self._states.values()
|
||||
if state.domain == domain_filter]
|
||||
|
||||
def all(self):
|
||||
"""Create a list of all states."""
|
||||
with self._lock:
|
||||
return list(self._states.values())
|
||||
return run_callback_threadsafe(self._loop, self.async_all).result()
|
||||
|
||||
def async_all(self):
|
||||
"""Create a list of all states.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
return list(self._states.values())
|
||||
|
||||
def get(self, entity_id):
|
||||
"""Retrieve state of entity_id or None if not found."""
|
||||
|
@ -483,6 +659,15 @@ class StateMachine(object):
|
|||
|
||||
def is_state(self, entity_id, state):
|
||||
"""Test if entity exists and is specified state."""
|
||||
return run_callback_threadsafe(
|
||||
self._loop, self.async_is_state, entity_id, state
|
||||
).result()
|
||||
|
||||
def async_is_state(self, entity_id, state):
|
||||
"""Test if entity exists and is specified state.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
entity_id = entity_id.lower()
|
||||
|
||||
return (entity_id in self._states and
|
||||
|
@ -490,6 +675,15 @@ class StateMachine(object):
|
|||
|
||||
def is_state_attr(self, entity_id, name, value):
|
||||
"""Test if entity exists and has a state attribute set to value."""
|
||||
return run_callback_threadsafe(
|
||||
self._loop, self.async_is_state_attr, entity_id, name, value
|
||||
).result()
|
||||
|
||||
def async_is_state_attr(self, entity_id, name, value):
|
||||
"""Test if entity exists and has a state attribute set to value.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
entity_id = entity_id.lower()
|
||||
|
||||
return (entity_id in self._states and
|
||||
|
@ -500,23 +694,32 @@ class StateMachine(object):
|
|||
|
||||
Returns boolean to indicate if an entity was removed.
|
||||
"""
|
||||
return run_callback_threadsafe(
|
||||
self._loop, self.async_remove, entity_id).result()
|
||||
|
||||
def async_remove(self, entity_id):
|
||||
"""Remove the state of an entity.
|
||||
|
||||
Returns boolean to indicate if an entity was removed.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
entity_id = entity_id.lower()
|
||||
|
||||
with self._lock:
|
||||
old_state = self._states.pop(entity_id, None)
|
||||
old_state = self._states.pop(entity_id, None)
|
||||
|
||||
if old_state is None:
|
||||
return False
|
||||
if old_state is None:
|
||||
return False
|
||||
|
||||
event_data = {
|
||||
'entity_id': entity_id,
|
||||
'old_state': old_state,
|
||||
'new_state': None,
|
||||
}
|
||||
event_data = {
|
||||
'entity_id': entity_id,
|
||||
'old_state': old_state,
|
||||
'new_state': None,
|
||||
}
|
||||
|
||||
self._bus.fire(EVENT_STATE_CHANGED, event_data)
|
||||
self._bus.async_fire(EVENT_STATE_CHANGED, event_data)
|
||||
|
||||
return True
|
||||
return True
|
||||
|
||||
def set(self, entity_id, new_state, attributes=None, force_update=False):
|
||||
"""Set the state of an entity, add entity if it does not exist.
|
||||
|
@ -526,34 +729,49 @@ class StateMachine(object):
|
|||
If you just update the attributes and not the state, last changed will
|
||||
not be affected.
|
||||
"""
|
||||
run_callback_threadsafe(
|
||||
self._loop,
|
||||
self.async_set, entity_id, new_state, attributes, force_update,
|
||||
).result()
|
||||
|
||||
def async_set(self, entity_id, new_state, attributes=None,
|
||||
force_update=False):
|
||||
"""Set the state of an entity, add entity if it does not exist.
|
||||
|
||||
Attributes is an optional dict to specify attributes of this state.
|
||||
|
||||
If you just update the attributes and not the state, last changed will
|
||||
not be affected.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
entity_id = entity_id.lower()
|
||||
new_state = str(new_state)
|
||||
attributes = attributes or {}
|
||||
|
||||
with self._lock:
|
||||
old_state = self._states.get(entity_id)
|
||||
old_state = self._states.get(entity_id)
|
||||
|
||||
is_existing = old_state is not None
|
||||
same_state = (is_existing and old_state.state == new_state and
|
||||
not force_update)
|
||||
same_attr = is_existing and old_state.attributes == attributes
|
||||
is_existing = old_state is not None
|
||||
same_state = (is_existing and old_state.state == new_state and
|
||||
not force_update)
|
||||
same_attr = is_existing and old_state.attributes == attributes
|
||||
|
||||
if same_state and same_attr:
|
||||
return
|
||||
if same_state and same_attr:
|
||||
return
|
||||
|
||||
# If state did not exist or is different, set it
|
||||
last_changed = old_state.last_changed if same_state else None
|
||||
# If state did not exist or is different, set it
|
||||
last_changed = old_state.last_changed if same_state else None
|
||||
|
||||
state = State(entity_id, new_state, attributes, last_changed)
|
||||
self._states[entity_id] = state
|
||||
state = State(entity_id, new_state, attributes, last_changed)
|
||||
self._states[entity_id] = state
|
||||
|
||||
event_data = {
|
||||
'entity_id': entity_id,
|
||||
'old_state': old_state,
|
||||
'new_state': state,
|
||||
}
|
||||
event_data = {
|
||||
'entity_id': entity_id,
|
||||
'old_state': old_state,
|
||||
'new_state': state,
|
||||
}
|
||||
|
||||
self._bus.fire(EVENT_STATE_CHANGED, event_data)
|
||||
self._bus.async_fire(EVENT_STATE_CHANGED, event_data)
|
||||
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
|
@ -615,22 +833,30 @@ class ServiceCall(object):
|
|||
class ServiceRegistry(object):
|
||||
"""Offers services over the eventbus."""
|
||||
|
||||
def __init__(self, bus, add_job):
|
||||
def __init__(self, bus, add_job, loop):
|
||||
"""Initialize a service registry."""
|
||||
self._services = {}
|
||||
self._lock = threading.Lock()
|
||||
self._add_job = add_job
|
||||
self._bus = bus
|
||||
self._loop = loop
|
||||
self._cur_id = 0
|
||||
bus.listen(EVENT_CALL_SERVICE, self._event_to_service_call)
|
||||
run_callback_threadsafe(
|
||||
loop,
|
||||
bus.async_listen, EVENT_CALL_SERVICE, self._event_to_service_call,
|
||||
)
|
||||
|
||||
@property
|
||||
def services(self):
|
||||
"""Dict with per domain a list of available services."""
|
||||
with self._lock:
|
||||
return {domain: {key: value.as_dict() for key, value
|
||||
in self._services[domain].items()}
|
||||
for domain in self._services}
|
||||
return run_callback_threadsafe(
|
||||
self._loop, self.async_services,
|
||||
).result()
|
||||
|
||||
def async_services(self):
|
||||
"""Dict with per domain a list of available services."""
|
||||
return {domain: {key: value.as_dict() for key, value
|
||||
in self._services[domain].items()}
|
||||
for domain in self._services}
|
||||
|
||||
def has_service(self, domain, service):
|
||||
"""Test if specified service exists."""
|
||||
|
@ -647,20 +873,39 @@ class ServiceRegistry(object):
|
|||
|
||||
Schema is called to coerce and validate the service data.
|
||||
"""
|
||||
run_callback_threadsafe(
|
||||
self._loop,
|
||||
self.async_register, domain, service, service_func, description,
|
||||
schema
|
||||
).result()
|
||||
|
||||
def async_register(self, domain, service, service_func, description=None,
|
||||
schema=None):
|
||||
"""
|
||||
Register a service.
|
||||
|
||||
Description is a dict containing key 'description' to describe
|
||||
the service and a key 'fields' to describe the fields.
|
||||
|
||||
Schema is called to coerce and validate the service data.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
domain = domain.lower()
|
||||
service = service.lower()
|
||||
description = description or {}
|
||||
service_obj = Service(service_func, description.get('description'),
|
||||
description.get('fields', {}), schema)
|
||||
with self._lock:
|
||||
if domain in self._services:
|
||||
self._services[domain][service] = service_obj
|
||||
else:
|
||||
self._services[domain] = {service: service_obj}
|
||||
|
||||
self._bus.fire(
|
||||
EVENT_SERVICE_REGISTERED,
|
||||
{ATTR_DOMAIN: domain, ATTR_SERVICE: service})
|
||||
if domain in self._services:
|
||||
self._services[domain][service] = service_obj
|
||||
else:
|
||||
self._services[domain] = {service: service_obj}
|
||||
|
||||
self._bus.async_fire(
|
||||
EVENT_SERVICE_REGISTERED,
|
||||
{ATTR_DOMAIN: domain, ATTR_SERVICE: service}
|
||||
)
|
||||
|
||||
def call(self, domain, service, service_data=None, blocking=False):
|
||||
"""
|
||||
|
@ -679,6 +924,31 @@ class ServiceRegistry(object):
|
|||
Because the service is sent as an event you are not allowed to use
|
||||
the keys ATTR_DOMAIN and ATTR_SERVICE in your service_data.
|
||||
"""
|
||||
return run_coroutine_threadsafe(
|
||||
self.async_call(domain, service, service_data, blocking),
|
||||
self._loop
|
||||
).result()
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_call(self, domain, service, service_data=None, blocking=False):
|
||||
"""
|
||||
Call a service.
|
||||
|
||||
Specify blocking=True to wait till service is executed.
|
||||
Waits a maximum of SERVICE_CALL_LIMIT.
|
||||
|
||||
If blocking = True, will return boolean if service executed
|
||||
succesfully within SERVICE_CALL_LIMIT.
|
||||
|
||||
This method will fire an event to call the service.
|
||||
This event will be picked up by this ServiceRegistry and any
|
||||
other ServiceRegistry that is listening on the EventBus.
|
||||
|
||||
Because the service is sent as an event you are not allowed to use
|
||||
the keys ATTR_DOMAIN and ATTR_SERVICE in your service_data.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
call_id = self._generate_unique_id()
|
||||
|
||||
event_data = {
|
||||
|
@ -689,19 +959,23 @@ class ServiceRegistry(object):
|
|||
}
|
||||
|
||||
if blocking:
|
||||
executed_event = threading.Event()
|
||||
fut = asyncio.Future(loop=self._loop)
|
||||
|
||||
@asyncio.coroutine
|
||||
def service_executed(call):
|
||||
"""Callback method that is called when service is executed."""
|
||||
if call.data[ATTR_SERVICE_CALL_ID] == call_id:
|
||||
executed_event.set()
|
||||
fut.set_result(True)
|
||||
|
||||
unsub = self._bus.listen(EVENT_SERVICE_EXECUTED, service_executed)
|
||||
unsub = self._bus.async_listen(EVENT_SERVICE_EXECUTED,
|
||||
service_executed)
|
||||
|
||||
self._bus.fire(EVENT_CALL_SERVICE, event_data)
|
||||
self._bus.async_fire(EVENT_CALL_SERVICE, event_data)
|
||||
|
||||
if blocking:
|
||||
success = executed_event.wait(SERVICE_CALL_LIMIT)
|
||||
done, _ = yield from asyncio.wait([fut], loop=self._loop,
|
||||
timeout=SERVICE_CALL_LIMIT)
|
||||
success = bool(done)
|
||||
unsub()
|
||||
return success
|
||||
|
||||
|
@ -797,16 +1071,19 @@ def create_timer(hass, interval=TIMER_INTERVAL):
|
|||
# every minute.
|
||||
assert 60 % interval == 0, "60 % TIMER_INTERVAL should be 0!"
|
||||
|
||||
def timer():
|
||||
"""Send an EVENT_TIME_CHANGED on interval."""
|
||||
stop_event = threading.Event()
|
||||
stop_event = asyncio.Event(loop=hass.loop)
|
||||
|
||||
def stop_timer(event):
|
||||
"""Stop the timer."""
|
||||
stop_event.set()
|
||||
# Setting the Event inside the loop by marking it as a coroutine
|
||||
@asyncio.coroutine
|
||||
def stop_timer(event):
|
||||
"""Stop the timer."""
|
||||
stop_event.set()
|
||||
|
||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_timer)
|
||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_timer)
|
||||
|
||||
@asyncio.coroutine
|
||||
def timer(interval, stop_event):
|
||||
"""Create an async timer."""
|
||||
_LOGGER.info("Timer:starting")
|
||||
|
||||
last_fired_on_second = -1
|
||||
|
@ -830,7 +1107,7 @@ def create_timer(hass, interval=TIMER_INTERVAL):
|
|||
slp_seconds = interval - now.second % interval + \
|
||||
.5 - now.microsecond/1000000.0
|
||||
|
||||
time.sleep(slp_seconds)
|
||||
yield from asyncio.sleep(slp_seconds, loop=hass.loop)
|
||||
|
||||
now = calc_now()
|
||||
|
||||
|
@ -839,18 +1116,22 @@ def create_timer(hass, interval=TIMER_INTERVAL):
|
|||
# Event might have been set while sleeping
|
||||
if not stop_event.is_set():
|
||||
try:
|
||||
hass.bus.fire(EVENT_TIME_CHANGED, {ATTR_NOW: now})
|
||||
# Schedule the bus event
|
||||
hass.loop.call_soon(
|
||||
hass.bus.async_fire,
|
||||
EVENT_TIME_CHANGED,
|
||||
{ATTR_NOW: now}
|
||||
)
|
||||
except HomeAssistantError:
|
||||
# HA raises error if firing event after it has shut down
|
||||
break
|
||||
|
||||
@asyncio.coroutine
|
||||
def start_timer(event):
|
||||
"""Start the timer."""
|
||||
thread = threading.Thread(target=timer, name='Timer')
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
"""Start our async timer."""
|
||||
hass.loop.create_task(timer(interval, stop_event))
|
||||
|
||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_timer)
|
||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, start_timer)
|
||||
|
||||
|
||||
def create_worker_pool(worker_count=None):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue