parent
0cd3271dfa
commit
ea7ffff0ca
9 changed files with 219 additions and 91 deletions
|
@ -9,11 +9,16 @@ from homeassistant.components.alexa import smart_home
|
|||
from homeassistant.util.decorator import Registry
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from . import auth_api
|
||||
from .const import MESSAGE_EXPIRATION
|
||||
|
||||
|
||||
HANDLERS = Registry()
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
STATE_CONNECTING = 'connecting'
|
||||
STATE_CONNECTED = 'connected'
|
||||
STATE_DISCONNECTED = 'disconnected'
|
||||
|
||||
|
||||
class UnknownHandler(Exception):
|
||||
"""Exception raised when trying to handle unknown handler."""
|
||||
|
@ -25,27 +30,41 @@ class CloudIoT:
|
|||
def __init__(self, cloud):
|
||||
"""Initialize the CloudIoT class."""
|
||||
self.cloud = cloud
|
||||
# The WebSocket client
|
||||
self.client = None
|
||||
# Scheduled sleep task till next connection retry
|
||||
self.retry_task = None
|
||||
# Boolean to indicate if we wanted the connection to close
|
||||
self.close_requested = False
|
||||
# The current number of attempts to connect, impacts wait time
|
||||
self.tries = 0
|
||||
|
||||
@property
|
||||
def is_connected(self):
|
||||
"""Return if connected to the cloud."""
|
||||
return self.client is not None
|
||||
# Current state of the connection
|
||||
self.state = STATE_DISCONNECTED
|
||||
|
||||
@asyncio.coroutine
|
||||
def connect(self):
|
||||
"""Connect to the IoT broker."""
|
||||
if self.client is not None:
|
||||
raise RuntimeError('Cannot connect while already connected')
|
||||
|
||||
self.close_requested = False
|
||||
|
||||
hass = self.cloud.hass
|
||||
remove_hass_stop_listener = None
|
||||
if self.cloud.subscription_expired:
|
||||
# Try refreshing the token to see if it is still expired.
|
||||
yield from hass.async_add_job(auth_api.check_token, self.cloud)
|
||||
|
||||
if self.cloud.subscription_expired:
|
||||
hass.components.persistent_notification.async_create(
|
||||
MESSAGE_EXPIRATION, 'Subscription expired',
|
||||
'cloud_subscription_expired')
|
||||
self.state = STATE_DISCONNECTED
|
||||
return
|
||||
|
||||
if self.state == STATE_CONNECTED:
|
||||
raise RuntimeError('Already connected')
|
||||
|
||||
self.state = STATE_CONNECTING
|
||||
self.close_requested = False
|
||||
remove_hass_stop_listener = None
|
||||
session = async_get_clientsession(self.cloud.hass)
|
||||
client = None
|
||||
disconnect_warn = None
|
||||
|
||||
@asyncio.coroutine
|
||||
def _handle_hass_stop(event):
|
||||
|
@ -54,8 +73,6 @@ class CloudIoT:
|
|||
remove_hass_stop_listener = None
|
||||
yield from self.disconnect()
|
||||
|
||||
client = None
|
||||
disconnect_warn = None
|
||||
try:
|
||||
yield from hass.async_add_job(auth_api.check_token, self.cloud)
|
||||
|
||||
|
@ -70,13 +87,14 @@ class CloudIoT:
|
|||
EVENT_HOMEASSISTANT_STOP, _handle_hass_stop)
|
||||
|
||||
_LOGGER.info('Connected')
|
||||
self.state = STATE_CONNECTED
|
||||
|
||||
while not client.closed:
|
||||
msg = yield from client.receive()
|
||||
|
||||
if msg.type in (WSMsgType.ERROR, WSMsgType.CLOSED,
|
||||
WSMsgType.CLOSING):
|
||||
disconnect_warn = 'Closed by server'
|
||||
disconnect_warn = 'Connection cancelled.'
|
||||
break
|
||||
|
||||
elif msg.type != WSMsgType.TEXT:
|
||||
|
@ -144,20 +162,33 @@ class CloudIoT:
|
|||
self.client = None
|
||||
yield from client.close()
|
||||
|
||||
if not self.close_requested:
|
||||
if self.close_requested:
|
||||
self.state = STATE_DISCONNECTED
|
||||
|
||||
else:
|
||||
self.state = STATE_CONNECTING
|
||||
self.tries += 1
|
||||
|
||||
# Sleep 0, 5, 10, 15 … up to 30 seconds between retries
|
||||
yield from asyncio.sleep(
|
||||
min(30, (self.tries - 1) * 5), loop=hass.loop)
|
||||
|
||||
hass.async_add_job(self.connect())
|
||||
try:
|
||||
# Sleep 0, 5, 10, 15 … up to 30 seconds between retries
|
||||
self.retry_task = hass.async_add_job(asyncio.sleep(
|
||||
min(30, (self.tries - 1) * 5), loop=hass.loop))
|
||||
yield from self.retry_task
|
||||
self.retry_task = None
|
||||
hass.async_add_job(self.connect())
|
||||
except asyncio.CancelledError:
|
||||
# Happens if disconnect called
|
||||
pass
|
||||
|
||||
@asyncio.coroutine
|
||||
def disconnect(self):
|
||||
"""Disconnect the client."""
|
||||
self.close_requested = True
|
||||
yield from self.client.close()
|
||||
|
||||
if self.client is not None:
|
||||
yield from self.client.close()
|
||||
elif self.retry_task is not None:
|
||||
self.retry_task.cancel()
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue