Added optional embedded image attachments to notify.smtp. (#2738)

* Added optional embedded image attachments to notify.smtp.

Also restructured a bit to minimize code duplication and add some tests.

* Fixed formatting errors.

* SMTP cleanups thanks to code review.
This commit is contained in:
Nick Touran 2016-08-08 17:36:49 -07:00 committed by Paulus Schoutsen
parent efe754636a
commit 3c2b4f5128
2 changed files with 147 additions and 49 deletions

View file

@ -6,14 +6,18 @@ https://home-assistant.io/components/notify.smtp/
"""
import logging
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from homeassistant.components.notify import (
ATTR_TITLE, DOMAIN, BaseNotificationService)
ATTR_TITLE, ATTR_DATA, DOMAIN, BaseNotificationService)
from homeassistant.helpers import validate_config
_LOGGER = logging.getLogger(__name__)
ATTR_IMAGES = 'images' # optional embedded image file attachments
def get_service(hass, config):
"""Get the mail notification service."""
@ -22,52 +26,21 @@ def get_service(hass, config):
_LOGGER):
return None
smtp_server = config.get('server', 'localhost')
port = int(config.get('port', '25'))
username = config.get('username', None)
password = config.get('password', None)
starttls = int(config.get('starttls', 0))
debug = config.get('debug', 0)
server = None
try:
server = smtplib.SMTP(smtp_server, port, timeout=5)
server.set_debuglevel(debug)
server.ehlo()
if starttls == 1:
server.starttls()
server.ehlo()
if username and password:
try:
server.login(username, password)
except (smtplib.SMTPException, smtplib.SMTPSenderRefused):
_LOGGER.exception("Please check your settings.")
return None
except smtplib.socket.gaierror:
_LOGGER.exception(
"SMTP server not found (%s:%s). "
"Please check the IP address or hostname of your SMTP server.",
smtp_server, port)
mail_service = MailNotificationService(
config.get('server', 'localhost'),
int(config.get('port', '25')),
config.get('sender', None),
int(config.get('starttls', 0)),
config.get('username', None),
config.get('password', None),
config.get('recipient', None),
config.get('debug', 0))
if mail_service.connection_is_valid():
return mail_service
else:
return None
except smtplib.SMTPAuthenticationError:
_LOGGER.exception(
"Login not possible. "
"Please check your setting and/or your credentials.")
return None
finally:
if server:
server.quit()
return MailNotificationService(
smtp_server, port, config['sender'], starttls, username, password,
config['recipient'], debug)
# pylint: disable=too-few-public-methods, too-many-instance-attributes
class MailNotificationService(BaseNotificationService):
@ -99,17 +72,57 @@ class MailNotificationService(BaseNotificationService):
mail.login(self.username, self.password)
return mail
def send_message(self, message="", **kwargs):
"""Send a message to a user."""
mail = self.connect()
subject = kwargs.get(ATTR_TITLE)
def connection_is_valid(self):
"""Check for valid config, verify connectivity."""
server = None
try:
server = self.connect()
except smtplib.socket.gaierror:
_LOGGER.exception(
"SMTP server not found (%s:%s). "
"Please check the IP address or hostname of your SMTP server.",
self._server, self._port)
return False
except (smtplib.SMTPAuthenticationError, ConnectionRefusedError):
_LOGGER.exception(
"Login not possible. "
"Please check your setting and/or your credentials.")
return False
finally:
if server:
server.quit()
return True
def send_message(self, message="", **kwargs):
"""
Build and send a message to a user.
Will send plain text normally, or will build a multipart HTML message
with inline image attachments if images config is defined.
"""
subject = kwargs.get(ATTR_TITLE)
data = kwargs.get(ATTR_DATA)
if data:
msg = _build_multipart_msg(message, images=data.get(ATTR_IMAGES))
else:
msg = _build_text_msg(message)
msg = MIMEText(message)
msg['Subject'] = subject
msg['To'] = self.recipient
msg['From'] = self._sender
msg['X-Mailer'] = 'HomeAssistant'
return self._send_email(msg)
def _send_email(self, msg):
"""Send the message."""
mail = self.connect()
for _ in range(self.tries):
try:
mail.sendmail(self._sender, self.recipient,
@ -122,3 +135,36 @@ class MailNotificationService(BaseNotificationService):
mail = self.connect()
mail.quit()
def _build_text_msg(message):
"""Build plaintext email."""
_LOGGER.debug('Building plain text email.')
return MIMEText(message)
def _build_multipart_msg(message, images):
"""Build Multipart message with in-line images."""
_LOGGER.debug('Building multipart email with embedded attachment(s).')
msg = MIMEMultipart('related')
msg_alt = MIMEMultipart('alternative')
msg.attach(msg_alt)
body_txt = MIMEText(message)
msg_alt.attach(body_txt)
body_text = ['<p>{}</p><br>'.format(message)]
for atch_num, atch_name in enumerate(images):
cid = 'image{}'.format(atch_num)
body_text.append('<img src="cid:{}"><br>'.format(cid))
try:
with open(atch_name, 'rb') as attachment_file:
attachment = MIMEImage(attachment_file.read())
msg.attach(attachment)
attachment.add_header('Content-ID', '<{}>'.format(cid))
except FileNotFoundError:
_LOGGER.warning('Attachment %s not found. Skipping.',
atch_name)
body_html = MIMEText(''.join(body_text), 'html')
msg_alt.attach(body_html)
return msg

View file

@ -0,0 +1,52 @@
"""The tests for the notify smtp platform."""
import unittest
from homeassistant.components.notify import smtp
from tests.common import get_test_home_assistant
class MockSMTP(smtp.MailNotificationService):
"""Test SMTP object that doesn't need a working server."""
def connection_is_valid(self):
"""Pretend connection is always valid for testing."""
return True
def _send_email(self, msg):
"""Just return string for testing."""
return msg.as_string()
class TestNotifySmtp(unittest.TestCase):
"""Test the smtp notify."""
def setUp(self): # pylint: disable=invalid-name
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant()
self.mailer = MockSMTP('localhost', 25, 'test@test.com', 1, 'testuser',
'testpass', 'testrecip@test.com', 0)
def tearDown(self): # pylint: disable=invalid-name
""""Stop down everything that was started."""
self.hass.stop()
def test_text_email(self):
"""Test build of default text email behavior."""
msg = self.mailer.send_message('Test msg')
expected = ('Content-Type: text/plain; charset="us-ascii"\n'
'MIME-Version: 1.0\n'
'Content-Transfer-Encoding: 7bit\n'
'Subject: \n'
'To: testrecip@test.com\n'
'From: test@test.com\n'
'X-Mailer: HomeAssistant\n'
'\n'
'Test msg')
self.assertEqual(msg, expected)
def test_mixed_email(self):
"""Test build of mixed text email behavior."""
msg = self.mailer.send_message('Test msg',
data={'images': ['test.jpg']})
self.assertTrue('Content-Type: multipart/related' in msg)