diff --git a/homeassistant/components/cloud/__init__.py b/homeassistant/components/cloud/__init__.py index 7de4f5b57f8..3657b64b989 100644 --- a/homeassistant/components/cloud/__init__.py +++ b/homeassistant/components/cloud/__init__.py @@ -219,7 +219,7 @@ class Cloud: # Fetching keyset can fail if internet is not up yet. if not success: - self.hass.helpers.async_call_later(5, self.async_start) + self.hass.helpers.event.async_call_later(5, self.async_start) return def load_config(): diff --git a/homeassistant/components/cloud/iot.py b/homeassistant/components/cloud/iot.py index 5f61263824b..3220fc372f7 100644 --- a/homeassistant/components/cloud/iot.py +++ b/homeassistant/components/cloud/iot.py @@ -44,20 +44,13 @@ class CloudIoT: @asyncio.coroutine def connect(self): """Connect to the IoT broker.""" + if self.state != STATE_DISCONNECTED: + raise RuntimeError('Connect called while not disconnected') + hass = self.cloud.hass - if self.cloud.subscription_expired: - # Try refreshing the token to see if it is still expired. - yield from hass.async_add_job(auth_api.check_token, self.cloud) - - if self.cloud.subscription_expired: - hass.components.persistent_notification.async_create( - MESSAGE_EXPIRATION, 'Subscription expired', - 'cloud_subscription_expired') - self.state = STATE_DISCONNECTED - return - - if self.state == STATE_CONNECTED: - raise RuntimeError('Already connected') + self.close_requested = False + self.state = STATE_CONNECTING + self.tries = 0 @asyncio.coroutine def _handle_hass_stop(event): @@ -66,17 +59,60 @@ class CloudIoT: remove_hass_stop_listener = None yield from self.disconnect() - self.state = STATE_CONNECTING - self.close_requested = False remove_hass_stop_listener = hass.bus.async_listen_once( EVENT_HOMEASSISTANT_STOP, _handle_hass_stop) + + while True: + try: + yield from self._handle_connection() + except Exception: # pylint: disable=broad-except + # Safety net. This should never hit. + # Still adding it here to make sure we can always reconnect + _LOGGER.exception("Unexpected error") + + if self.close_requested: + break + + self.state = STATE_CONNECTING + self.tries += 1 + + try: + # Sleep 0, 5, 10, 15 ... 30 seconds between retries + self.retry_task = hass.async_add_job(asyncio.sleep( + min(30, (self.tries - 1) * 5), loop=hass.loop)) + yield from self.retry_task + self.retry_task = None + except asyncio.CancelledError: + # Happens if disconnect called + break + + self.state = STATE_DISCONNECTED + if remove_hass_stop_listener is not None: + remove_hass_stop_listener() + + @asyncio.coroutine + def _handle_connection(self): + """Connect to the IoT broker.""" + hass = self.cloud.hass + + try: + yield from hass.async_add_job(auth_api.check_token, self.cloud) + except auth_api.CloudError as err: + _LOGGER.warning("Unable to connect: %s", err) + return + + if self.cloud.subscription_expired: + hass.components.persistent_notification.async_create( + MESSAGE_EXPIRATION, 'Subscription expired', + 'cloud_subscription_expired') + self.close_requested = True + return + session = async_get_clientsession(self.cloud.hass) client = None disconnect_warn = None try: - yield from hass.async_add_job(auth_api.check_token, self.cloud) - self.client = client = yield from session.ws_connect( self.cloud.relayer, heartbeat=55, headers={ hdrs.AUTHORIZATION: @@ -93,6 +129,10 @@ class CloudIoT: if msg.type in (WSMsgType.CLOSED, WSMsgType.CLOSING): break + elif msg.type == WSMsgType.ERROR: + disconnect_warn = 'Connection error' + break + elif msg.type != WSMsgType.TEXT: disconnect_warn = 'Received non-Text message: {}'.format( msg.type) @@ -129,9 +169,6 @@ class CloudIoT: _LOGGER.debug("Publishing message: %s", response) yield from client.send_json(response) - except auth_api.CloudError as err: - _LOGGER.warning("Unable to connect: %s", err) - except client_exceptions.WSServerHandshakeError as err: if err.code == 401: disconnect_warn = 'Invalid auth.' @@ -143,41 +180,12 @@ class CloudIoT: except client_exceptions.ClientError as err: _LOGGER.warning("Unable to connect: %s", err) - except Exception: # pylint: disable=broad-except - if not self.close_requested: - _LOGGER.exception("Unexpected error") - finally: if disconnect_warn is None: _LOGGER.info("Connection closed") else: _LOGGER.warning("Connection closed: %s", disconnect_warn) - if remove_hass_stop_listener is not None: - remove_hass_stop_listener() - - if client is not None: - self.client = None - yield from client.close() - - if self.close_requested: - self.state = STATE_DISCONNECTED - - else: - self.state = STATE_CONNECTING - self.tries += 1 - - try: - # Sleep 0, 5, 10, 15 ... up to 30 seconds between retries - self.retry_task = hass.async_add_job(asyncio.sleep( - min(30, (self.tries - 1) * 5), loop=hass.loop)) - yield from self.retry_task - self.retry_task = None - hass.async_add_job(self.connect()) - except asyncio.CancelledError: - # Happens if disconnect called - pass - @asyncio.coroutine def disconnect(self): """Disconnect the client.""" diff --git a/tests/components/cloud/test_iot.py b/tests/components/cloud/test_iot.py index ff382b697cf..3eec350b2cb 100644 --- a/tests/components/cloud/test_iot.py +++ b/tests/components/cloud/test_iot.py @@ -17,7 +17,8 @@ def mock_client(): client = MagicMock() type(client).closed = PropertyMock(side_effect=[False, True]) - with patch('asyncio.sleep'), \ + # Trigger cancelled error to avoid reconnect. + with patch('asyncio.sleep', side_effect=asyncio.CancelledError), \ patch('homeassistant.components.cloud.iot' '.async_get_clientsession') as session: session().ws_connect.return_value = mock_coro(client) @@ -160,10 +161,10 @@ def test_cloud_getting_disconnected_by_server(mock_client, caplog, mock_cloud): type=WSMsgType.CLOSING, )) - yield from conn.connect() + with patch('asyncio.sleep', side_effect=[None, asyncio.CancelledError]): + yield from conn.connect() assert 'Connection closed' in caplog.text - assert 'connect' in str(mock_cloud.hass.async_add_job.mock_calls[-1][1][0]) @asyncio.coroutine @@ -177,7 +178,6 @@ def test_cloud_receiving_bytes(mock_client, caplog, mock_cloud): yield from conn.connect() assert 'Connection closed: Received non-Text message' in caplog.text - assert 'connect' in str(mock_cloud.hass.async_add_job.mock_calls[-1][1][0]) @asyncio.coroutine @@ -192,19 +192,17 @@ def test_cloud_sending_invalid_json(mock_client, caplog, mock_cloud): yield from conn.connect() assert 'Connection closed: Received invalid JSON.' in caplog.text - assert 'connect' in str(mock_cloud.hass.async_add_job.mock_calls[-1][1][0]) @asyncio.coroutine def test_cloud_check_token_raising(mock_client, caplog, mock_cloud): """Test cloud unable to check token.""" conn = iot.CloudIoT(mock_cloud) - mock_client.receive.side_effect = auth_api.CloudError("BLA") + mock_cloud.hass.async_add_job.side_effect = auth_api.CloudError("BLA") yield from conn.connect() assert 'Unable to connect: BLA' in caplog.text - assert 'connect' in str(mock_cloud.hass.async_add_job.mock_calls[-1][1][0]) @asyncio.coroutine