* Telegram bot polling proxy support * CI fix * houndci-bot review fix * houndci-bot review fix * CI fix * Review * Update polling.py
This commit is contained in:
parent
f13f723a04
commit
917df1af00
2 changed files with 54 additions and 72 deletions
|
@ -311,10 +311,11 @@ def initialize_bot(p_config):
|
|||
proxy_url = p_config.get(CONF_PROXY_URL)
|
||||
proxy_params = p_config.get(CONF_PROXY_PARAMS)
|
||||
|
||||
request = None
|
||||
if proxy_url is not None:
|
||||
request = Request(proxy_url=proxy_url,
|
||||
request = Request(con_pool_size=4, proxy_url=proxy_url,
|
||||
urllib3_proxy_kwargs=proxy_params)
|
||||
else:
|
||||
request = Request(con_pool_size=4)
|
||||
return Bot(token=api_key, request=request)
|
||||
|
||||
|
||||
|
|
|
@ -5,13 +5,8 @@ For more details about this platform, please refer to the documentation at
|
|||
https://home-assistant.io/components/telegram_bot.polling/
|
||||
"""
|
||||
import asyncio
|
||||
from asyncio.futures import CancelledError
|
||||
import logging
|
||||
|
||||
import async_timeout
|
||||
from aiohttp.client_exceptions import ClientError
|
||||
from aiohttp.hdrs import CONNECTION, KEEP_ALIVE
|
||||
|
||||
from homeassistant.components.telegram_bot import (
|
||||
initialize_bot,
|
||||
CONF_ALLOWED_CHAT_IDS, BaseTelegramBotEntity,
|
||||
|
@ -19,18 +14,10 @@ from homeassistant.components.telegram_bot import (
|
|||
from homeassistant.const import (
|
||||
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP)
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
PLATFORM_SCHEMA = TELEGRAM_PLATFORM_SCHEMA
|
||||
RETRY_SLEEP = 10
|
||||
|
||||
|
||||
class WrongHttpStatus(Exception):
|
||||
"""Thrown when a wrong http status is received."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
|
@ -55,73 +42,67 @@ def async_setup_platform(hass, config):
|
|||
return True
|
||||
|
||||
|
||||
def process_error(bot, update, error):
|
||||
"""Telegram bot error handler."""
|
||||
from telegram.error import (
|
||||
TelegramError, TimedOut, NetworkError, RetryAfter)
|
||||
|
||||
try:
|
||||
raise error
|
||||
except (TimedOut, NetworkError, RetryAfter):
|
||||
# Long polling timeout or connection problem. Nothing serious.
|
||||
pass
|
||||
except TelegramError:
|
||||
_LOGGER.error('Update "%s" caused error "%s"', update, error)
|
||||
|
||||
|
||||
def message_handler(handler):
|
||||
"""Create messages handler."""
|
||||
from telegram import Update
|
||||
from telegram.ext import Handler
|
||||
|
||||
class MessageHandler(Handler):
|
||||
"""Telegram bot message handler."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the messages handler instance."""
|
||||
super().__init__(handler)
|
||||
|
||||
def check_update(self, update):
|
||||
"""Check is update valid."""
|
||||
return isinstance(update, Update)
|
||||
|
||||
def handle_update(self, update, dispatcher):
|
||||
"""Handle update."""
|
||||
optional_args = self.collect_optional_args(dispatcher, update)
|
||||
return self.callback(dispatcher.bot, update, **optional_args)
|
||||
|
||||
return MessageHandler()
|
||||
|
||||
|
||||
class TelegramPoll(BaseTelegramBotEntity):
|
||||
"""Asyncio telegram incoming message handler."""
|
||||
|
||||
def __init__(self, bot, hass, allowed_chat_ids):
|
||||
"""Initialize the polling instance."""
|
||||
from telegram.ext import Updater
|
||||
|
||||
BaseTelegramBotEntity.__init__(self, hass, allowed_chat_ids)
|
||||
self.update_id = 0
|
||||
self.websession = async_get_clientsession(hass)
|
||||
self.update_url = '{0}/getUpdates'.format(bot.base_url)
|
||||
self.polling_task = None # The actual polling task.
|
||||
self.timeout = 15 # async post timeout
|
||||
# Polling timeout should always be less than async post timeout.
|
||||
self.post_data = {'timeout': self.timeout - 5}
|
||||
|
||||
self.updater = Updater(bot=bot, workers=4)
|
||||
self.dispatcher = self.updater.dispatcher
|
||||
|
||||
self.dispatcher.add_handler(message_handler(self.process_update))
|
||||
self.dispatcher.add_error_handler(process_error)
|
||||
|
||||
def start_polling(self):
|
||||
"""Start the polling task."""
|
||||
self.polling_task = self.hass.async_add_job(self.check_incoming())
|
||||
self.updater.start_polling()
|
||||
|
||||
def stop_polling(self):
|
||||
"""Stop the polling task."""
|
||||
self.polling_task.cancel()
|
||||
self.updater.stop()
|
||||
|
||||
@asyncio.coroutine
|
||||
def get_updates(self, offset):
|
||||
"""Bypass the default long polling method to enable asyncio."""
|
||||
resp = None
|
||||
if offset:
|
||||
self.post_data['offset'] = offset
|
||||
try:
|
||||
with async_timeout.timeout(self.timeout, loop=self.hass.loop):
|
||||
resp = yield from self.websession.post(
|
||||
self.update_url, data=self.post_data,
|
||||
headers={CONNECTION: KEEP_ALIVE}
|
||||
)
|
||||
if resp.status == 200:
|
||||
_json = yield from resp.json()
|
||||
return _json
|
||||
raise WrongHttpStatus('wrong status {}'.format(resp.status))
|
||||
finally:
|
||||
if resp is not None:
|
||||
yield from resp.release()
|
||||
|
||||
@asyncio.coroutine
|
||||
def check_incoming(self):
|
||||
"""Continuously check for incoming telegram messages."""
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
_updates = yield from self.get_updates(self.update_id)
|
||||
except (WrongHttpStatus, ClientError) as err:
|
||||
# WrongHttpStatus: Non-200 status code.
|
||||
# Occurs at times (mainly 502) and recovers
|
||||
# automatically. Pause for a while before retrying.
|
||||
_LOGGER.error(err)
|
||||
yield from asyncio.sleep(RETRY_SLEEP)
|
||||
except (asyncio.TimeoutError, ValueError):
|
||||
# Long polling timeout. Nothing serious.
|
||||
# Json error. Just retry for the next message.
|
||||
pass
|
||||
else:
|
||||
# no exception raised. update received data.
|
||||
_updates = _updates.get('result')
|
||||
if _updates is None:
|
||||
_LOGGER.error("Incorrect result received.")
|
||||
else:
|
||||
for update in _updates:
|
||||
self.update_id = update['update_id'] + 1
|
||||
self.process_message(update)
|
||||
except CancelledError:
|
||||
_LOGGER.debug("Stopping Telegram polling bot")
|
||||
def process_update(self, bot, update):
|
||||
"""Process incoming message."""
|
||||
self.process_message(update.to_dict())
|
||||
|
|
Loading…
Add table
Reference in a new issue