fix telegram bug (#2653)
This commit is contained in:
parent
55624bcff9
commit
548d154cd8
1 changed files with 77 additions and 51 deletions
|
@ -8,43 +8,69 @@ import io
|
|||
import logging
|
||||
import urllib
|
||||
import requests
|
||||
from requests.auth import HTTPBasicAuth
|
||||
import voluptuous as vol
|
||||
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.components.notify import (
|
||||
ATTR_TITLE, ATTR_DATA, DOMAIN, BaseNotificationService)
|
||||
from homeassistant.const import CONF_API_KEY
|
||||
from homeassistant.helpers import validate_config
|
||||
ATTR_TITLE, ATTR_DATA, BaseNotificationService)
|
||||
from homeassistant.const import (CONF_API_KEY, CONF_NAME, ATTR_LOCATION,
|
||||
ATTR_LATITUDE, ATTR_LONGITUDE, CONF_PLATFORM)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
REQUIREMENTS = ['python-telegram-bot==5.0.0']
|
||||
|
||||
ATTR_PHOTO = "photo"
|
||||
ATTR_FILE = "file"
|
||||
ATTR_URL = "url"
|
||||
ATTR_CAPTION = "caption"
|
||||
ATTR_USERNAME = "username"
|
||||
ATTR_PASSWORD = "password"
|
||||
|
||||
CONF_CHAT_ID = 'chat_id'
|
||||
|
||||
PLATFORM_SCHEMA = vol.Schema({
|
||||
vol.Required(CONF_PLATFORM): "telegram",
|
||||
vol.Optional(CONF_NAME): cv.string,
|
||||
vol.Required(CONF_API_KEY): cv.string,
|
||||
vol.Required(CONF_CHAT_ID): cv.string,
|
||||
})
|
||||
|
||||
|
||||
def get_service(hass, config):
|
||||
"""Get the Telegram notification service."""
|
||||
import telegram
|
||||
|
||||
if not validate_config({DOMAIN: config},
|
||||
{DOMAIN: [CONF_API_KEY, 'chat_id']},
|
||||
_LOGGER):
|
||||
return None
|
||||
|
||||
try:
|
||||
bot = telegram.Bot(token=config[CONF_API_KEY])
|
||||
chat_id = config.get(CONF_CHAT_ID)
|
||||
api_key = config.get(CONF_API_KEY)
|
||||
bot = telegram.Bot(token=api_key)
|
||||
username = bot.getMe()['username']
|
||||
_LOGGER.info("Telegram bot is '%s'.", username)
|
||||
except urllib.error.HTTPError:
|
||||
_LOGGER.error("Please check your access token.")
|
||||
return None
|
||||
|
||||
return TelegramNotificationService(config[CONF_API_KEY], config['chat_id'])
|
||||
return TelegramNotificationService(api_key, chat_id)
|
||||
|
||||
|
||||
def load_data(url=None, file=None, username=None, password=None):
|
||||
"""Load photo/document into ByteIO/File container from a source."""
|
||||
try:
|
||||
if url is not None:
|
||||
# load photo from url
|
||||
if username is not None and password is not None:
|
||||
req = requests.get(url, auth=(username, password), timeout=15)
|
||||
else:
|
||||
req = requests.get(url, timeout=15)
|
||||
return io.BytesIO(req.content)
|
||||
|
||||
elif file is not None:
|
||||
# load photo from file
|
||||
return open(file, "rb")
|
||||
else:
|
||||
_LOGGER.warning("Can't load photo no photo found in params!")
|
||||
|
||||
except (OSError, IOError, requests.exceptions.RequestException):
|
||||
_LOGGER.error("Can't load photo into ByteIO")
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
|
@ -64,7 +90,18 @@ class TelegramNotificationService(BaseNotificationService):
|
|||
import telegram
|
||||
|
||||
title = kwargs.get(ATTR_TITLE)
|
||||
data = kwargs.get(ATTR_DATA, {})
|
||||
data = kwargs.get(ATTR_DATA)
|
||||
|
||||
# exists data for send a photo/location
|
||||
if data is not None and ATTR_PHOTO in data:
|
||||
photos = data.get(ATTR_PHOTO, None)
|
||||
photos = photos if isinstance(photos, list) else [photos]
|
||||
|
||||
for photo_data in photos:
|
||||
self.send_photo(photo_data)
|
||||
return
|
||||
elif data is not None and ATTR_LOCATION in data:
|
||||
return self.send_location(data.get(ATTR_LOCATION))
|
||||
|
||||
# send message
|
||||
try:
|
||||
|
@ -74,41 +111,30 @@ class TelegramNotificationService(BaseNotificationService):
|
|||
_LOGGER.exception("Error sending message.")
|
||||
return
|
||||
|
||||
def send_photo(self, data):
|
||||
"""Send a photo."""
|
||||
import telegram
|
||||
caption = data.pop(ATTR_CAPTION, None)
|
||||
|
||||
# send photo
|
||||
if ATTR_PHOTO in data:
|
||||
# if not a list
|
||||
if not isinstance(data[ATTR_PHOTO], list):
|
||||
photos = [data[ATTR_PHOTO]]
|
||||
else:
|
||||
photos = data[ATTR_PHOTO]
|
||||
try:
|
||||
photo = load_data(**data)
|
||||
self.bot.sendPhoto(chat_id=self._chat_id,
|
||||
photo=photo, caption=caption)
|
||||
except telegram.error.TelegramError:
|
||||
_LOGGER.exception("Error sending photo.")
|
||||
return
|
||||
|
||||
try:
|
||||
for photo_data in photos:
|
||||
caption = photo_data.get(ATTR_CAPTION, None)
|
||||
def send_location(self, gps):
|
||||
"""Send a location."""
|
||||
import telegram
|
||||
latitude = float(gps.get(ATTR_LATITUDE, 0.0))
|
||||
longitude = float(gps.get(ATTR_LONGITUDE, 0.0))
|
||||
|
||||
# file is a url
|
||||
if ATTR_URL in photo_data:
|
||||
# use http authenticate
|
||||
if ATTR_USERNAME in photo_data and\
|
||||
ATTR_PASSWORD in photo_data:
|
||||
req = requests.get(
|
||||
photo_data[ATTR_URL],
|
||||
auth=HTTPBasicAuth(photo_data[ATTR_USERNAME],
|
||||
photo_data[ATTR_PASSWORD])
|
||||
)
|
||||
else:
|
||||
req = requests.get(photo_data[ATTR_URL])
|
||||
file_id = io.BytesIO(req.content)
|
||||
elif ATTR_FILE in photo_data:
|
||||
file_id = open(photo_data[ATTR_FILE], "rb")
|
||||
else:
|
||||
_LOGGER.error("No url or path is set for photo!")
|
||||
continue
|
||||
|
||||
self.bot.sendPhoto(chat_id=self._chat_id,
|
||||
photo=file_id, caption=caption)
|
||||
|
||||
except (OSError, IOError, telegram.error.TelegramError,
|
||||
urllib.error.HTTPError):
|
||||
_LOGGER.exception("Error sending photo.")
|
||||
return
|
||||
# send location
|
||||
try:
|
||||
self.bot.sendLocation(chat_id=self._chat_id,
|
||||
latitude=latitude, longitude=longitude)
|
||||
except telegram.error.TelegramError:
|
||||
_LOGGER.exception("Error sending location.")
|
||||
return
|
||||
|
|
Loading…
Add table
Reference in a new issue