Fix imap parsing non rfc compliant date crash (#93630)

* Fix imap parsing non rfc compliant date crash

* Use parsedate_to_datetime from mail.utils
This commit is contained in:
Jan Bouwhuis 2023-05-28 13:28:11 +02:00 committed by GitHub
parent 202c9071a4
commit 4c0d169cfc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 20 deletions

View file

@ -5,6 +5,8 @@ import asyncio
from collections.abc import Mapping
from datetime import datetime, timedelta
import email
from email.header import decode_header, make_header
from email.utils import parseaddr, parsedate_to_datetime
import logging
from typing import Any
@ -82,9 +84,9 @@ class ImapMessage:
"""Get the email headers."""
header_base: dict[str, tuple[str,]] = {}
for key, value in self.email_message.items():
header: tuple[str,] = (str(value),)
if header_base.setdefault(key, header) != header:
header_base[key] += header # type: ignore[assignment]
header_instances: tuple[str,] = (str(value),)
if header_base.setdefault(key, header_instances) != header_instances:
header_base[key] += header_instances # type: ignore[assignment]
return header_base
@property
@ -94,23 +96,26 @@ class ImapMessage:
date_str: str | None
if (date_str := self.email_message["Date"]) is None:
return None
# In some cases a timezone or comment is added in parenthesis after the date
# We want to strip that part to avoid parsing errors
return datetime.strptime(
date_str.split("(")[0].strip(), "%a, %d %b %Y %H:%M:%S %z"
try:
mail_dt_tm = parsedate_to_datetime(date_str)
except ValueError:
_LOGGER.debug(
"Parsed date %s is not compliant with rfc2822#section-3.3", date_str
)
return None
return mail_dt_tm
@property
def sender(self) -> str:
"""Get the parsed message sender from the email."""
return str(email.utils.parseaddr(self.email_message["From"])[1])
return str(parseaddr(self.email_message["From"])[1])
@property
def subject(self) -> str:
"""Decode the message subject."""
decoded_header = email.header.decode_header(self.email_message["Subject"])
header = email.header.make_header(decoded_header)
return str(header)
decoded_header = decode_header(self.email_message["Subject"])
subject_header = make_header(decoded_header)
return str(subject_header)
@property
def text(self) -> str:

View file

@ -3,7 +3,10 @@
DATE_HEADER1 = b"Date: Fri, 24 Mar 2023 13:52:00 +0100\r\n"
DATE_HEADER2 = b"Date: Fri, 24 Mar 2023 13:52:00 +0100 (CET)\r\n"
DATE_HEADER_INVALID = b"2023-03-27T13:52:00 +0100\r\n"
DATE_HEADER3 = b"Date: 24 Mar 2023 13:52:00 +0100\r\n"
DATE_HEADER_INVALID1 = b"2023-03-27T13:52:00 +0100\r\n"
DATE_HEADER_INVALID2 = b"Date: 2023-03-27T13:52:00 +0100\r\n"
DATE_HEADER_INVALID3 = b"Date: Fri, 2023-03-27T13:52:00 +0100\r\n"
TEST_MESSAGE_HEADERS1 = (
b"Return-Path: <john.doe@example.com>\r\nDelivered-To: notify@example.com\r\n"
@ -23,7 +26,15 @@ TEST_MESSAGE_HEADERS2 = (
TEST_MESSAGE = TEST_MESSAGE_HEADERS1 + DATE_HEADER1 + TEST_MESSAGE_HEADERS2
TEST_MESSAGE_ALT = TEST_MESSAGE_HEADERS1 + DATE_HEADER2 + TEST_MESSAGE_HEADERS2
TEST_INVALID_DATE = TEST_MESSAGE_HEADERS1 + DATE_HEADER_INVALID + TEST_MESSAGE_HEADERS2
TEST_INVALID_DATE1 = (
TEST_MESSAGE_HEADERS1 + DATE_HEADER_INVALID1 + TEST_MESSAGE_HEADERS2
)
TEST_INVALID_DATE2 = (
TEST_MESSAGE_HEADERS1 + DATE_HEADER_INVALID2 + TEST_MESSAGE_HEADERS2
)
TEST_INVALID_DATE3 = (
TEST_MESSAGE_HEADERS1 + DATE_HEADER_INVALID3 + TEST_MESSAGE_HEADERS2
)
TEST_CONTENT_TEXT_BARE = b"\r\n" b"Test body\r\n" b"\r\n"
@ -110,13 +121,35 @@ TEST_FETCH_RESPONSE_TEXT_PLAIN_ALT = (
],
)
TEST_FETCH_RESPONSE_INVALID_DATE = (
TEST_FETCH_RESPONSE_INVALID_DATE1 = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(len(TEST_INVALID_DATE + TEST_CONTENT_TEXT_PLAIN)).encode("utf-8")
+ str(len(TEST_INVALID_DATE1 + TEST_CONTENT_TEXT_PLAIN)).encode("utf-8")
+ b"}",
bytearray(TEST_INVALID_DATE + TEST_CONTENT_TEXT_PLAIN),
bytearray(TEST_INVALID_DATE1 + TEST_CONTENT_TEXT_PLAIN),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],
)
TEST_FETCH_RESPONSE_INVALID_DATE2 = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(len(TEST_INVALID_DATE2 + TEST_CONTENT_TEXT_PLAIN)).encode("utf-8")
+ b"}",
bytearray(TEST_INVALID_DATE2 + TEST_CONTENT_TEXT_PLAIN),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],
)
TEST_FETCH_RESPONSE_INVALID_DATE3 = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(len(TEST_INVALID_DATE3 + TEST_CONTENT_TEXT_PLAIN)).encode("utf-8")
+ b"}",
bytearray(TEST_INVALID_DATE3 + TEST_CONTENT_TEXT_PLAIN),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],

View file

@ -18,7 +18,9 @@ from .const import (
EMPTY_SEARCH_RESPONSE,
TEST_FETCH_RESPONSE_BINARY,
TEST_FETCH_RESPONSE_HTML,
TEST_FETCH_RESPONSE_INVALID_DATE,
TEST_FETCH_RESPONSE_INVALID_DATE1,
TEST_FETCH_RESPONSE_INVALID_DATE2,
TEST_FETCH_RESPONSE_INVALID_DATE3,
TEST_FETCH_RESPONSE_MULTIPART,
TEST_FETCH_RESPONSE_TEXT_BARE,
TEST_FETCH_RESPONSE_TEXT_OTHER,
@ -81,7 +83,9 @@ async def test_entry_startup_fails(
(TEST_FETCH_RESPONSE_TEXT_BARE, True),
(TEST_FETCH_RESPONSE_TEXT_PLAIN, True),
(TEST_FETCH_RESPONSE_TEXT_PLAIN_ALT, True),
(TEST_FETCH_RESPONSE_INVALID_DATE, False),
(TEST_FETCH_RESPONSE_INVALID_DATE1, False),
(TEST_FETCH_RESPONSE_INVALID_DATE2, False),
(TEST_FETCH_RESPONSE_INVALID_DATE3, False),
(TEST_FETCH_RESPONSE_TEXT_OTHER, True),
(TEST_FETCH_RESPONSE_HTML, True),
(TEST_FETCH_RESPONSE_MULTIPART, True),
@ -91,7 +95,9 @@ async def test_entry_startup_fails(
"bare",
"plain",
"plain_alt",
"invalid_date",
"invalid_date1",
"invalid_date2",
"invalid_date3",
"other",
"html",
"multipart",