hass-core/homeassistant/components/clicksend/notify.py

104 lines
2.9 KiB
Python
Raw Normal View History

"""Clicksend platform for notify component."""
from http import HTTPStatus
import json
import logging
import requests
import voluptuous as vol
from homeassistant.components.notify import PLATFORM_SCHEMA, BaseNotificationService
from homeassistant.const import (
2019-07-31 12:25:30 -07:00
CONF_API_KEY,
CONF_RECIPIENT,
CONF_SENDER,
CONF_USERNAME,
CONTENT_TYPE_JSON,
)
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
2019-07-31 12:25:30 -07:00
BASE_API_URL = "https://rest.clicksend.com/v3"
DEFAULT_SENDER = "hass"
2018-10-23 14:06:42 +02:00
TIMEOUT = 5
HEADERS = {"Content-Type": CONTENT_TYPE_JSON}
PLATFORM_SCHEMA = vol.Schema(
2019-07-31 12:25:30 -07:00
vol.All(
PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_API_KEY): cv.string,
vol.Required(CONF_RECIPIENT, default=[]): vol.All(
cv.ensure_list, [cv.string]
),
vol.Optional(CONF_SENDER, default=DEFAULT_SENDER): cv.string,
}
)
)
)
def get_service(hass, config, discovery_info=None):
"""Get the ClickSend notification service."""
2018-10-23 14:06:42 +02:00
if not _authenticate(config):
_LOGGER.error("You are not authorized to access ClickSend")
return None
return ClicksendNotificationService(config)
class ClicksendNotificationService(BaseNotificationService):
"""Implementation of a notification service for the ClickSend service."""
def __init__(self, config):
"""Initialize the service."""
2018-10-23 14:06:42 +02:00
self.username = config[CONF_USERNAME]
self.api_key = config[CONF_API_KEY]
self.recipients = config[CONF_RECIPIENT]
self.sender = config[CONF_SENDER]
def send_message(self, message="", **kwargs):
"""Send a message to a user."""
data = {"messages": []}
for recipient in self.recipients:
2019-07-31 12:25:30 -07:00
data["messages"].append(
{
"source": "hass.notify",
"from": self.sender,
"to": recipient,
"body": message,
}
)
api_url = f"{BASE_API_URL}/sms/send"
2019-07-31 12:25:30 -07:00
resp = requests.post(
api_url,
data=json.dumps(data),
headers=HEADERS,
auth=(self.username, self.api_key),
timeout=TIMEOUT,
)
if resp.status_code == HTTPStatus.OK:
2018-10-23 14:06:42 +02:00
return
obj = json.loads(resp.text)
2019-07-31 12:25:30 -07:00
response_msg = obj.get("response_msg")
response_code = obj.get("response_code")
_LOGGER.error(
"Error %s : %s (Code %s)", resp.status_code, response_msg, response_code
)
def _authenticate(config):
"""Authenticate with ClickSend."""
api_url = f"{BASE_API_URL}/account"
2019-07-31 12:25:30 -07:00
resp = requests.get(
api_url,
headers=HEADERS,
auth=(config[CONF_USERNAME], config[CONF_API_KEY]),
timeout=TIMEOUT,
)
return resp.status_code == HTTPStatus.OK