Cloud reconnect tweaks (#12586)
This commit is contained in:
parent
b8df2d4042
commit
51c06e35cf
3 changed files with 63 additions and 57 deletions
|
@ -219,7 +219,7 @@ class Cloud:
|
||||||
|
|
||||||
# Fetching keyset can fail if internet is not up yet.
|
# Fetching keyset can fail if internet is not up yet.
|
||||||
if not success:
|
if not success:
|
||||||
self.hass.helpers.async_call_later(5, self.async_start)
|
self.hass.helpers.event.async_call_later(5, self.async_start)
|
||||||
return
|
return
|
||||||
|
|
||||||
def load_config():
|
def load_config():
|
||||||
|
|
|
@ -44,20 +44,13 @@ class CloudIoT:
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def connect(self):
|
def connect(self):
|
||||||
"""Connect to the IoT broker."""
|
"""Connect to the IoT broker."""
|
||||||
|
if self.state != STATE_DISCONNECTED:
|
||||||
|
raise RuntimeError('Connect called while not disconnected')
|
||||||
|
|
||||||
hass = self.cloud.hass
|
hass = self.cloud.hass
|
||||||
if self.cloud.subscription_expired:
|
self.close_requested = False
|
||||||
# Try refreshing the token to see if it is still expired.
|
self.state = STATE_CONNECTING
|
||||||
yield from hass.async_add_job(auth_api.check_token, self.cloud)
|
self.tries = 0
|
||||||
|
|
||||||
if self.cloud.subscription_expired:
|
|
||||||
hass.components.persistent_notification.async_create(
|
|
||||||
MESSAGE_EXPIRATION, 'Subscription expired',
|
|
||||||
'cloud_subscription_expired')
|
|
||||||
self.state = STATE_DISCONNECTED
|
|
||||||
return
|
|
||||||
|
|
||||||
if self.state == STATE_CONNECTED:
|
|
||||||
raise RuntimeError('Already connected')
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _handle_hass_stop(event):
|
def _handle_hass_stop(event):
|
||||||
|
@ -66,17 +59,60 @@ class CloudIoT:
|
||||||
remove_hass_stop_listener = None
|
remove_hass_stop_listener = None
|
||||||
yield from self.disconnect()
|
yield from self.disconnect()
|
||||||
|
|
||||||
self.state = STATE_CONNECTING
|
|
||||||
self.close_requested = False
|
|
||||||
remove_hass_stop_listener = hass.bus.async_listen_once(
|
remove_hass_stop_listener = hass.bus.async_listen_once(
|
||||||
EVENT_HOMEASSISTANT_STOP, _handle_hass_stop)
|
EVENT_HOMEASSISTANT_STOP, _handle_hass_stop)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
yield from self._handle_connection()
|
||||||
|
except Exception: # pylint: disable=broad-except
|
||||||
|
# Safety net. This should never hit.
|
||||||
|
# Still adding it here to make sure we can always reconnect
|
||||||
|
_LOGGER.exception("Unexpected error")
|
||||||
|
|
||||||
|
if self.close_requested:
|
||||||
|
break
|
||||||
|
|
||||||
|
self.state = STATE_CONNECTING
|
||||||
|
self.tries += 1
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Sleep 0, 5, 10, 15 ... 30 seconds between retries
|
||||||
|
self.retry_task = hass.async_add_job(asyncio.sleep(
|
||||||
|
min(30, (self.tries - 1) * 5), loop=hass.loop))
|
||||||
|
yield from self.retry_task
|
||||||
|
self.retry_task = None
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
# Happens if disconnect called
|
||||||
|
break
|
||||||
|
|
||||||
|
self.state = STATE_DISCONNECTED
|
||||||
|
if remove_hass_stop_listener is not None:
|
||||||
|
remove_hass_stop_listener()
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def _handle_connection(self):
|
||||||
|
"""Connect to the IoT broker."""
|
||||||
|
hass = self.cloud.hass
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield from hass.async_add_job(auth_api.check_token, self.cloud)
|
||||||
|
except auth_api.CloudError as err:
|
||||||
|
_LOGGER.warning("Unable to connect: %s", err)
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.cloud.subscription_expired:
|
||||||
|
hass.components.persistent_notification.async_create(
|
||||||
|
MESSAGE_EXPIRATION, 'Subscription expired',
|
||||||
|
'cloud_subscription_expired')
|
||||||
|
self.close_requested = True
|
||||||
|
return
|
||||||
|
|
||||||
session = async_get_clientsession(self.cloud.hass)
|
session = async_get_clientsession(self.cloud.hass)
|
||||||
client = None
|
client = None
|
||||||
disconnect_warn = None
|
disconnect_warn = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield from hass.async_add_job(auth_api.check_token, self.cloud)
|
|
||||||
|
|
||||||
self.client = client = yield from session.ws_connect(
|
self.client = client = yield from session.ws_connect(
|
||||||
self.cloud.relayer, heartbeat=55, headers={
|
self.cloud.relayer, heartbeat=55, headers={
|
||||||
hdrs.AUTHORIZATION:
|
hdrs.AUTHORIZATION:
|
||||||
|
@ -93,6 +129,10 @@ class CloudIoT:
|
||||||
if msg.type in (WSMsgType.CLOSED, WSMsgType.CLOSING):
|
if msg.type in (WSMsgType.CLOSED, WSMsgType.CLOSING):
|
||||||
break
|
break
|
||||||
|
|
||||||
|
elif msg.type == WSMsgType.ERROR:
|
||||||
|
disconnect_warn = 'Connection error'
|
||||||
|
break
|
||||||
|
|
||||||
elif msg.type != WSMsgType.TEXT:
|
elif msg.type != WSMsgType.TEXT:
|
||||||
disconnect_warn = 'Received non-Text message: {}'.format(
|
disconnect_warn = 'Received non-Text message: {}'.format(
|
||||||
msg.type)
|
msg.type)
|
||||||
|
@ -129,9 +169,6 @@ class CloudIoT:
|
||||||
_LOGGER.debug("Publishing message: %s", response)
|
_LOGGER.debug("Publishing message: %s", response)
|
||||||
yield from client.send_json(response)
|
yield from client.send_json(response)
|
||||||
|
|
||||||
except auth_api.CloudError as err:
|
|
||||||
_LOGGER.warning("Unable to connect: %s", err)
|
|
||||||
|
|
||||||
except client_exceptions.WSServerHandshakeError as err:
|
except client_exceptions.WSServerHandshakeError as err:
|
||||||
if err.code == 401:
|
if err.code == 401:
|
||||||
disconnect_warn = 'Invalid auth.'
|
disconnect_warn = 'Invalid auth.'
|
||||||
|
@ -143,41 +180,12 @@ class CloudIoT:
|
||||||
except client_exceptions.ClientError as err:
|
except client_exceptions.ClientError as err:
|
||||||
_LOGGER.warning("Unable to connect: %s", err)
|
_LOGGER.warning("Unable to connect: %s", err)
|
||||||
|
|
||||||
except Exception: # pylint: disable=broad-except
|
|
||||||
if not self.close_requested:
|
|
||||||
_LOGGER.exception("Unexpected error")
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if disconnect_warn is None:
|
if disconnect_warn is None:
|
||||||
_LOGGER.info("Connection closed")
|
_LOGGER.info("Connection closed")
|
||||||
else:
|
else:
|
||||||
_LOGGER.warning("Connection closed: %s", disconnect_warn)
|
_LOGGER.warning("Connection closed: %s", disconnect_warn)
|
||||||
|
|
||||||
if remove_hass_stop_listener is not None:
|
|
||||||
remove_hass_stop_listener()
|
|
||||||
|
|
||||||
if client is not None:
|
|
||||||
self.client = None
|
|
||||||
yield from client.close()
|
|
||||||
|
|
||||||
if self.close_requested:
|
|
||||||
self.state = STATE_DISCONNECTED
|
|
||||||
|
|
||||||
else:
|
|
||||||
self.state = STATE_CONNECTING
|
|
||||||
self.tries += 1
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Sleep 0, 5, 10, 15 ... up to 30 seconds between retries
|
|
||||||
self.retry_task = hass.async_add_job(asyncio.sleep(
|
|
||||||
min(30, (self.tries - 1) * 5), loop=hass.loop))
|
|
||||||
yield from self.retry_task
|
|
||||||
self.retry_task = None
|
|
||||||
hass.async_add_job(self.connect())
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
# Happens if disconnect called
|
|
||||||
pass
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
"""Disconnect the client."""
|
"""Disconnect the client."""
|
||||||
|
|
|
@ -17,7 +17,8 @@ def mock_client():
|
||||||
client = MagicMock()
|
client = MagicMock()
|
||||||
type(client).closed = PropertyMock(side_effect=[False, True])
|
type(client).closed = PropertyMock(side_effect=[False, True])
|
||||||
|
|
||||||
with patch('asyncio.sleep'), \
|
# Trigger cancelled error to avoid reconnect.
|
||||||
|
with patch('asyncio.sleep', side_effect=asyncio.CancelledError), \
|
||||||
patch('homeassistant.components.cloud.iot'
|
patch('homeassistant.components.cloud.iot'
|
||||||
'.async_get_clientsession') as session:
|
'.async_get_clientsession') as session:
|
||||||
session().ws_connect.return_value = mock_coro(client)
|
session().ws_connect.return_value = mock_coro(client)
|
||||||
|
@ -160,10 +161,10 @@ def test_cloud_getting_disconnected_by_server(mock_client, caplog, mock_cloud):
|
||||||
type=WSMsgType.CLOSING,
|
type=WSMsgType.CLOSING,
|
||||||
))
|
))
|
||||||
|
|
||||||
|
with patch('asyncio.sleep', side_effect=[None, asyncio.CancelledError]):
|
||||||
yield from conn.connect()
|
yield from conn.connect()
|
||||||
|
|
||||||
assert 'Connection closed' in caplog.text
|
assert 'Connection closed' in caplog.text
|
||||||
assert 'connect' in str(mock_cloud.hass.async_add_job.mock_calls[-1][1][0])
|
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
@ -177,7 +178,6 @@ def test_cloud_receiving_bytes(mock_client, caplog, mock_cloud):
|
||||||
yield from conn.connect()
|
yield from conn.connect()
|
||||||
|
|
||||||
assert 'Connection closed: Received non-Text message' in caplog.text
|
assert 'Connection closed: Received non-Text message' in caplog.text
|
||||||
assert 'connect' in str(mock_cloud.hass.async_add_job.mock_calls[-1][1][0])
|
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
@ -192,19 +192,17 @@ def test_cloud_sending_invalid_json(mock_client, caplog, mock_cloud):
|
||||||
yield from conn.connect()
|
yield from conn.connect()
|
||||||
|
|
||||||
assert 'Connection closed: Received invalid JSON.' in caplog.text
|
assert 'Connection closed: Received invalid JSON.' in caplog.text
|
||||||
assert 'connect' in str(mock_cloud.hass.async_add_job.mock_calls[-1][1][0])
|
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def test_cloud_check_token_raising(mock_client, caplog, mock_cloud):
|
def test_cloud_check_token_raising(mock_client, caplog, mock_cloud):
|
||||||
"""Test cloud unable to check token."""
|
"""Test cloud unable to check token."""
|
||||||
conn = iot.CloudIoT(mock_cloud)
|
conn = iot.CloudIoT(mock_cloud)
|
||||||
mock_client.receive.side_effect = auth_api.CloudError("BLA")
|
mock_cloud.hass.async_add_job.side_effect = auth_api.CloudError("BLA")
|
||||||
|
|
||||||
yield from conn.connect()
|
yield from conn.connect()
|
||||||
|
|
||||||
assert 'Unable to connect: BLA' in caplog.text
|
assert 'Unable to connect: BLA' in caplog.text
|
||||||
assert 'connect' in str(mock_cloud.hass.async_add_job.mock_calls[-1][1][0])
|
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue