Add TTL support and custom headers support. (#22988)

* Add TTL custom support and custom headers support.

* fix pywebpush version

* removed whitespaces surrounding docstrings.

* fixes for tests

* priority option to data

* checking of ATTR_ENDPOINT

* change checking of target to vol.Schema

* more tests
This commit is contained in:
Pawel 2019-05-07 01:37:05 +02:00 committed by Robbie Trencheny
parent b60c815cde
commit 1f551e5f6f
2 changed files with 227 additions and 29 deletions

View file

@ -1,5 +1,6 @@
"""HTML5 Push Messaging notification service.""" """HTML5 Push Messaging notification service."""
import datetime from datetime import datetime, timedelta
from functools import partial from functools import partial
import json import json
import logging import logging
@ -39,14 +40,12 @@ ATTR_VAPID_EMAIL = 'vapid_email'
def gcm_api_deprecated(value): def gcm_api_deprecated(value):
"""Warn user that GCM API config is deprecated.""" """Warn user that GCM API config is deprecated."""
if not value: if value:
return value _LOGGER.warning(
"Configuring html5_push_notifications via the GCM api"
_LOGGER.warning( " has been deprecated and will stop working after April 11,"
"Configuring html5_push_notifications via the GCM api" " 2019. Use the VAPID configuration instead. For instructions,"
" has been deprecated and will stop working after April 11," " see https://www.home-assistant.io/components/notify.html5/")
" 2019. Use the VAPID configuration instead. For instructions,"
" see https://www.home-assistant.io/components/notify.html5/")
return value return value
@ -75,6 +74,10 @@ ATTR_ACTIONS = 'actions'
ATTR_TYPE = 'type' ATTR_TYPE = 'type'
ATTR_URL = 'url' ATTR_URL = 'url'
ATTR_DISMISS = 'dismiss' ATTR_DISMISS = 'dismiss'
ATTR_PRIORITY = 'priority'
DEFAULT_PRIORITY = 'normal'
ATTR_TTL = 'ttl'
DEFAULT_TTL = 86400
ATTR_JWT = 'jwt' ATTR_JWT = 'jwt'
@ -193,7 +196,6 @@ class HTML5PushRegistrationView(HomeAssistantView):
data = await request.json() data = await request.json()
except ValueError: except ValueError:
return self.json_message('Invalid JSON', HTTP_BAD_REQUEST) return self.json_message('Invalid JSON', HTTP_BAD_REQUEST)
try: try:
data = REGISTER_SCHEMA(data) data = REGISTER_SCHEMA(data)
except vol.Invalid as ex: except vol.Invalid as ex:
@ -373,7 +375,7 @@ class HTML5NotificationService(BaseNotificationService):
"""Initialize the service.""" """Initialize the service."""
self._gcm_key = gcm_key self._gcm_key = gcm_key
self._vapid_prv = vapid_prv self._vapid_prv = vapid_prv
self._vapid_claims = {"sub": "mailto:{}".format(vapid_email)} self._vapid_email = vapid_email
self.registrations = registrations self.registrations = registrations
self.registrations_json_path = json_path self.registrations_json_path = json_path
@ -425,7 +427,6 @@ class HTML5NotificationService(BaseNotificationService):
def send_message(self, message="", **kwargs): def send_message(self, message="", **kwargs):
"""Send a message to a user.""" """Send a message to a user."""
tag = str(uuid.uuid4()) tag = str(uuid.uuid4())
payload = { payload = {
'badge': '/static/images/notification-badge.png', 'badge': '/static/images/notification-badge.png',
'body': message, 'body': message,
@ -459,13 +460,14 @@ class HTML5NotificationService(BaseNotificationService):
def _push_message(self, payload, **kwargs): def _push_message(self, payload, **kwargs):
"""Send the message.""" """Send the message."""
import jwt from pywebpush import WebPusher
from pywebpush import WebPusher, webpush
timestamp = int(time.time()) timestamp = int(time.time())
ttl = int(kwargs.get(ATTR_TTL, DEFAULT_TTL))
priority = kwargs.get(ATTR_PRIORITY, DEFAULT_PRIORITY)
if priority not in ['normal', 'high']:
priority = DEFAULT_PRIORITY
payload['timestamp'] = (timestamp*1000) # Javascript ms since epoch payload['timestamp'] = (timestamp*1000) # Javascript ms since epoch
targets = kwargs.get(ATTR_TARGET) targets = kwargs.get(ATTR_TARGET)
if not targets: if not targets:
@ -473,26 +475,37 @@ class HTML5NotificationService(BaseNotificationService):
for target in list(targets): for target in list(targets):
info = self.registrations.get(target) info = self.registrations.get(target)
if info is None: try:
info = REGISTER_SCHEMA(info)
except vol.Invalid:
_LOGGER.error("%s is not a valid HTML5 push notification" _LOGGER.error("%s is not a valid HTML5 push notification"
" target", target) " target", target)
continue continue
payload[ATTR_DATA][ATTR_JWT] = add_jwt(
jwt_exp = (datetime.datetime.fromtimestamp(timestamp) + timestamp, target, payload[ATTR_TAG],
datetime.timedelta(days=JWT_VALID_DAYS)) info[ATTR_SUBSCRIPTION][ATTR_KEYS][ATTR_AUTH])
import jwt
jwt_secret = info[ATTR_SUBSCRIPTION][ATTR_KEYS][ATTR_AUTH] jwt_secret = info[ATTR_SUBSCRIPTION][ATTR_KEYS][ATTR_AUTH]
jwt_exp = (datetime.fromtimestamp(timestamp) +
timedelta(days=JWT_VALID_DAYS))
jwt_claims = {'exp': jwt_exp, 'nbf': timestamp, jwt_claims = {'exp': jwt_exp, 'nbf': timestamp,
'iat': timestamp, ATTR_TARGET: target, 'iat': timestamp, ATTR_TARGET: target,
ATTR_TAG: payload[ATTR_TAG]} ATTR_TAG: payload[ATTR_TAG]}
jwt_token = jwt.encode(jwt_claims, jwt_secret).decode('utf-8') jwt_token = jwt.encode(jwt_claims, jwt_secret).decode('utf-8')
payload[ATTR_DATA][ATTR_JWT] = jwt_token payload[ATTR_DATA][ATTR_JWT] = jwt_token
webpusher = WebPusher(info[ATTR_SUBSCRIPTION])
if self._vapid_prv and self._vapid_claims: if self._vapid_prv and self._vapid_email:
response = webpush( vapid_headers = create_vapid_headers(
info[ATTR_SUBSCRIPTION], self._vapid_email, info[ATTR_SUBSCRIPTION],
json.dumps(payload), self._vapid_prv)
vapid_private_key=self._vapid_prv, vapid_headers.update({
vapid_claims=self._vapid_claims 'urgency': priority,
'priority': priority
})
response = webpusher.send(
data=json.dumps(payload),
headers=vapid_headers,
ttl=ttl
) )
else: else:
# Only pass the gcm key if we're actually using GCM # Only pass the gcm key if we're actually using GCM
@ -501,8 +514,8 @@ class HTML5NotificationService(BaseNotificationService):
if 'googleapis.com' \ if 'googleapis.com' \
in info[ATTR_SUBSCRIPTION][ATTR_ENDPOINT] \ in info[ATTR_SUBSCRIPTION][ATTR_ENDPOINT] \
else None else None
response = WebPusher(info[ATTR_SUBSCRIPTION]).send( response = webpusher.send(
json.dumps(payload), gcm_key=gcm_key, ttl='86400' json.dumps(payload), gcm_key=gcm_key, ttl=ttl
) )
if response.status_code == 410: if response.status_code == 410:
@ -514,3 +527,33 @@ class HTML5NotificationService(BaseNotificationService):
_LOGGER.error("Error saving registration") _LOGGER.error("Error saving registration")
else: else:
_LOGGER.info("Configuration saved") _LOGGER.info("Configuration saved")
def add_jwt(timestamp, target, tag, jwt_secret):
"""Create JWT json to put into payload."""
import jwt
jwt_exp = (datetime.fromtimestamp(timestamp) +
timedelta(days=JWT_VALID_DAYS))
jwt_claims = {'exp': jwt_exp, 'nbf': timestamp,
'iat': timestamp, ATTR_TARGET: target,
ATTR_TAG: tag}
return jwt.encode(jwt_claims, jwt_secret).decode('utf-8')
def create_vapid_headers(vapid_email, subscription_info, vapid_private_key):
"""Create encrypted headers to send to WebPusher."""
from py_vapid import Vapid
try:
from urllib.parse import urlparse
except ImportError: # pragma: no cover
from urlparse import urlparse
if (vapid_email and vapid_private_key and
ATTR_ENDPOINT in subscription_info):
url = urlparse(subscription_info.get(ATTR_ENDPOINT))
vapid_claims = {
'sub': 'mailto:{}'.format(vapid_email),
'aud': "{}://{}".format(url.scheme, url.netloc)
}
vapid = Vapid.from_string(private_key=vapid_private_key)
return vapid.sign(vapid_claims)
return None

View file

@ -9,6 +9,14 @@ import homeassistant.components.html5.notify as html5
CONFIG_FILE = 'file.conf' CONFIG_FILE = 'file.conf'
VAPID_CONF = {
'vapid_pub_key': 'BJMA2gDZEkHaXRhf1fhY_' +
'QbKbhVIHlSJXI0bFyo0eJXnUPOjdgycCAbj-2bMKMKNKs' +
'_rM8JoSnyKGCXAY2dbONI',
'vapid_prv_key': 'ZwPgwKpESGuGLMZYU39vKgrekrWzCijo-LsBM3CZ9-c',
'vapid_email': 'someone@example.com'
}
SUBSCRIPTION_1 = { SUBSCRIPTION_1 = {
'browser': 'chrome', 'browser': 'chrome',
'subscription': { 'subscription': {
@ -45,6 +53,15 @@ SUBSCRIPTION_4 = {
}, },
} }
SUBSCRIPTION_5 = {
'browser': 'chrome',
'subscription': {
'endpoint': 'https://fcm.googleapis.com/fcm/send/LONG-RANDOM-KEY',
'expirationTime': None,
'keys': {'auth': 'auth', 'p256dh': 'p256dh'}
},
}
REGISTER_URL = '/api/notify.html5' REGISTER_URL = '/api/notify.html5'
PUBLISH_URL = '/api/notify.html5/callback' PUBLISH_URL = '/api/notify.html5/callback'
@ -185,6 +202,122 @@ class TestHtml5Notify:
assert mock_wp.mock_calls[1][2]['gcm_key'] is not None assert mock_wp.mock_calls[1][2]['gcm_key'] is not None
assert mock_wp.mock_calls[4][2]['gcm_key'] is None assert mock_wp.mock_calls[4][2]['gcm_key'] is None
@patch('pywebpush.WebPusher')
def test_fcm_key_include(self, mock_wp):
"""Test if the FCM header is included."""
hass = MagicMock()
data = {
'chrome': SUBSCRIPTION_5
}
m = mock_open(read_data=json.dumps(data))
with patch('homeassistant.util.json.open', m, create=True):
service = html5.get_service(hass, VAPID_CONF)
assert service is not None
service.send_message('Hello', target=['chrome'])
assert len(mock_wp.mock_calls) == 3
# WebPusher constructor
assert mock_wp.mock_calls[0][1][0] == SUBSCRIPTION_5['subscription']
# Third mock_call checks the status_code of the response.
assert mock_wp.mock_calls[2][0] == '().send().status_code.__eq__'
# Get the keys passed to the WebPusher's send method
assert mock_wp.mock_calls[1][2]['headers']['Authorization'] is not None
@patch('pywebpush.WebPusher')
def test_fcm_send_with_unknown_priority(self, mock_wp):
"""Test if the gcm_key is only included for GCM endpoints."""
hass = MagicMock()
data = {
'chrome': SUBSCRIPTION_5
}
m = mock_open(read_data=json.dumps(data))
with patch('homeassistant.util.json.open', m, create=True):
service = html5.get_service(hass, VAPID_CONF)
assert service is not None
service.send_message('Hello', target=['chrome'], priority='undefined')
assert len(mock_wp.mock_calls) == 3
# WebPusher constructor
assert mock_wp.mock_calls[0][1][0] == SUBSCRIPTION_5['subscription']
# Third mock_call checks the status_code of the response.
assert mock_wp.mock_calls[2][0] == '().send().status_code.__eq__'
# Get the keys passed to the WebPusher's send method
assert mock_wp.mock_calls[1][2]['headers']['priority'] == 'normal'
@patch('pywebpush.WebPusher')
def test_fcm_no_targets(self, mock_wp):
"""Test if the gcm_key is only included for GCM endpoints."""
hass = MagicMock()
data = {
'chrome': SUBSCRIPTION_5
}
m = mock_open(read_data=json.dumps(data))
with patch('homeassistant.util.json.open', m, create=True):
service = html5.get_service(hass, VAPID_CONF)
assert service is not None
service.send_message('Hello', )
assert len(mock_wp.mock_calls) == 3
# WebPusher constructor
assert mock_wp.mock_calls[0][1][0] == SUBSCRIPTION_5['subscription']
# Third mock_call checks the status_code of the response.
assert mock_wp.mock_calls[2][0] == '().send().status_code.__eq__'
# Get the keys passed to the WebPusher's send method
assert mock_wp.mock_calls[1][2]['headers']['priority'] == 'normal'
@patch('pywebpush.WebPusher')
def test_fcm_additional_data(self, mock_wp):
"""Test if the gcm_key is only included for GCM endpoints."""
hass = MagicMock()
data = {
'chrome': SUBSCRIPTION_5
}
m = mock_open(read_data=json.dumps(data))
with patch('homeassistant.util.json.open', m, create=True):
service = html5.get_service(hass, VAPID_CONF)
assert service is not None
service.send_message('Hello', data={'mykey': 'myvalue'})
assert len(mock_wp.mock_calls) == 3
# WebPusher constructor
assert mock_wp.mock_calls[0][1][0] == SUBSCRIPTION_5['subscription']
# Third mock_call checks the status_code of the response.
assert mock_wp.mock_calls[2][0] == '().send().status_code.__eq__'
# Get the keys passed to the WebPusher's send method
assert mock_wp.mock_calls[1][2]['headers']['priority'] == 'normal'
def test_create_vapid_withoutvapid():
"""Test creating empty vapid."""
resp = html5.create_vapid_headers(vapid_email=None,
vapid_private_key=None,
subscription_info=None)
assert resp is None
async def test_registering_new_device_view(hass, hass_client): async def test_registering_new_device_view(hass, hass_client):
"""Test that the HTML view works.""" """Test that the HTML view works."""
@ -428,3 +561,25 @@ async def test_callback_view_with_jwt(hass, hass_client):
assert resp.status == 200 assert resp.status == 200
body = await resp.json() body = await resp.json()
assert body == {"event": "push", "status": "ok"} assert body == {"event": "push", "status": "ok"}
async def test_send_fcm_without_targets(hass, hass_client):
"""Test that the notification is send with FCM without targets."""
registrations = {
'device': SUBSCRIPTION_5
}
await mock_client(hass, hass_client, registrations)
with patch('pywebpush.WebPusher') as mock_wp:
await hass.services.async_call('notify', 'notify', {
'message': 'Hello',
'target': ['device'],
'data': {'icon': 'beer.png'}
}, blocking=True)
assert len(mock_wp.mock_calls) == 3
# WebPusher constructor
assert mock_wp.mock_calls[0][1][0] == \
SUBSCRIPTION_5['subscription']
# Third mock_call checks the status_code of the response.
assert mock_wp.mock_calls[2][0] == '().send().status_code.__eq__'