Add ssl_verify option to imap integration (#93811)
* Add ssl_verify option to imap integration * Add test
This commit is contained in:
parent
46d8885023
commit
1e0770ff8a
5 changed files with 89 additions and 12 deletions
|
@ -10,12 +10,19 @@ from aioimaplib import AioImapException
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
from homeassistant import config_entries
|
from homeassistant import config_entries
|
||||||
from homeassistant.const import CONF_NAME, CONF_PASSWORD, CONF_PORT, CONF_USERNAME
|
from homeassistant.const import (
|
||||||
|
CONF_NAME,
|
||||||
|
CONF_PASSWORD,
|
||||||
|
CONF_PORT,
|
||||||
|
CONF_USERNAME,
|
||||||
|
CONF_VERIFY_SSL,
|
||||||
|
)
|
||||||
from homeassistant.core import HomeAssistant, callback
|
from homeassistant.core import HomeAssistant, callback
|
||||||
from homeassistant.data_entry_flow import AbortFlow, FlowResult
|
from homeassistant.data_entry_flow import AbortFlow, FlowResult
|
||||||
from homeassistant.exceptions import TemplateError
|
from homeassistant.exceptions import TemplateError
|
||||||
from homeassistant.helpers import config_validation as cv
|
from homeassistant.helpers import config_validation as cv
|
||||||
from homeassistant.helpers.selector import (
|
from homeassistant.helpers.selector import (
|
||||||
|
BooleanSelector,
|
||||||
SelectSelector,
|
SelectSelector,
|
||||||
SelectSelectorConfig,
|
SelectSelectorConfig,
|
||||||
SelectSelectorMode,
|
SelectSelectorMode,
|
||||||
|
@ -42,6 +49,7 @@ from .const import (
|
||||||
from .coordinator import connect_to_server
|
from .coordinator import connect_to_server
|
||||||
from .errors import InvalidAuth, InvalidFolder
|
from .errors import InvalidAuth, InvalidFolder
|
||||||
|
|
||||||
|
BOOLEAN_SELECTOR = BooleanSelector()
|
||||||
CIPHER_SELECTOR = SelectSelector(
|
CIPHER_SELECTOR = SelectSelector(
|
||||||
SelectSelectorConfig(
|
SelectSelectorConfig(
|
||||||
options=list(SSLCipherList),
|
options=list(SSLCipherList),
|
||||||
|
@ -68,6 +76,7 @@ CONFIG_SCHEMA_ADVANCED = {
|
||||||
vol.Optional(
|
vol.Optional(
|
||||||
CONF_SSL_CIPHER_LIST, default=SSLCipherList.PYTHON_DEFAULT
|
CONF_SSL_CIPHER_LIST, default=SSLCipherList.PYTHON_DEFAULT
|
||||||
): CIPHER_SELECTOR,
|
): CIPHER_SELECTOR,
|
||||||
|
vol.Optional(CONF_VERIFY_SSL, default=True): BOOLEAN_SELECTOR,
|
||||||
}
|
}
|
||||||
|
|
||||||
OPTIONS_SCHEMA = vol.Schema(
|
OPTIONS_SCHEMA = vol.Schema(
|
||||||
|
|
|
@ -18,6 +18,7 @@ from homeassistant.const import (
|
||||||
CONF_PASSWORD,
|
CONF_PASSWORD,
|
||||||
CONF_PORT,
|
CONF_PORT,
|
||||||
CONF_USERNAME,
|
CONF_USERNAME,
|
||||||
|
CONF_VERIFY_SSL,
|
||||||
CONTENT_TYPE_TEXT_PLAIN,
|
CONTENT_TYPE_TEXT_PLAIN,
|
||||||
)
|
)
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import HomeAssistant
|
||||||
|
@ -29,7 +30,11 @@ from homeassistant.exceptions import (
|
||||||
from homeassistant.helpers.json import json_bytes
|
from homeassistant.helpers.json import json_bytes
|
||||||
from homeassistant.helpers.template import Template
|
from homeassistant.helpers.template import Template
|
||||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||||
from homeassistant.util.ssl import SSLCipherList, client_context
|
from homeassistant.util.ssl import (
|
||||||
|
SSLCipherList,
|
||||||
|
client_context,
|
||||||
|
create_no_verify_ssl_context,
|
||||||
|
)
|
||||||
|
|
||||||
from .const import (
|
from .const import (
|
||||||
CONF_CHARSET,
|
CONF_CHARSET,
|
||||||
|
@ -54,9 +59,11 @@ MAX_EVENT_DATA_BYTES = 32168
|
||||||
|
|
||||||
async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL:
|
async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL:
|
||||||
"""Connect to imap server and return client."""
|
"""Connect to imap server and return client."""
|
||||||
ssl_context = client_context(
|
ssl_cipher_list: str = data.get(CONF_SSL_CIPHER_LIST, SSLCipherList.PYTHON_DEFAULT)
|
||||||
ssl_cipher_list=data.get(CONF_SSL_CIPHER_LIST, SSLCipherList.PYTHON_DEFAULT)
|
if data.get(CONF_VERIFY_SSL, True):
|
||||||
)
|
ssl_context = client_context(ssl_cipher_list=ssl_cipher_list)
|
||||||
|
else:
|
||||||
|
ssl_context = create_no_verify_ssl_context()
|
||||||
client = IMAP4_SSL(data[CONF_SERVER], data[CONF_PORT], ssl_context=ssl_context)
|
client = IMAP4_SSL(data[CONF_SERVER], data[CONF_PORT], ssl_context=ssl_context)
|
||||||
|
|
||||||
await client.wait_hello_from_server()
|
await client.wait_hello_from_server()
|
||||||
|
|
|
@ -10,7 +10,8 @@
|
||||||
"charset": "Character set",
|
"charset": "Character set",
|
||||||
"folder": "Folder",
|
"folder": "Folder",
|
||||||
"search": "IMAP search",
|
"search": "IMAP search",
|
||||||
"ssl_cipher_list": "SSL cipher list (Advanced)"
|
"ssl_cipher_list": "SSL cipher list (Advanced)",
|
||||||
|
"verify_ssl": "[%key:common::config_flow::data::verify_ssl%]"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"reauth_confirm": {
|
"reauth_confirm": {
|
||||||
|
|
|
@ -533,12 +533,14 @@ async def test_import_flow_connection_error(hass: HomeAssistant) -> None:
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("cipher_list", ["python_default", "modern", "intermediate"])
|
@pytest.mark.parametrize("cipher_list", ["python_default", "modern", "intermediate"])
|
||||||
async def test_config_flow_with_cipherlist(
|
@pytest.mark.parametrize("verify_ssl", [False, True])
|
||||||
hass: HomeAssistant, mock_setup_entry: AsyncMock, cipher_list: str
|
async def test_config_flow_with_cipherlist_and_ssl_verify(
|
||||||
|
hass: HomeAssistant, mock_setup_entry: AsyncMock, cipher_list: str, verify_ssl: True
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test with alternate cipherlist."""
|
"""Test with alternate cipherlist or disabled ssl verification."""
|
||||||
config = MOCK_CONFIG.copy()
|
config = MOCK_CONFIG.copy()
|
||||||
config["ssl_cipher_list"] = cipher_list
|
config["ssl_cipher_list"] = cipher_list
|
||||||
|
config["verify_ssl"] = verify_ssl
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
context={"source": config_entries.SOURCE_USER, "show_advanced_options": True},
|
context={"source": config_entries.SOURCE_USER, "show_advanced_options": True},
|
||||||
|
@ -562,3 +564,49 @@ async def test_config_flow_with_cipherlist(
|
||||||
assert result2["title"] == "email@email.com"
|
assert result2["title"] == "email@email.com"
|
||||||
assert result2["data"] == config
|
assert result2["data"] == config
|
||||||
assert len(mock_setup_entry.mock_calls) == 1
|
assert len(mock_setup_entry.mock_calls) == 1
|
||||||
|
|
||||||
|
|
||||||
|
async def test_config_flow_from_with_advanced_settings(
|
||||||
|
hass: HomeAssistant, mock_setup_entry: AsyncMock
|
||||||
|
) -> None:
|
||||||
|
"""Test if advanced settings show correctly."""
|
||||||
|
config = MOCK_CONFIG.copy()
|
||||||
|
config["ssl_cipher_list"] = "python_default"
|
||||||
|
config["verify_ssl"] = True
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN,
|
||||||
|
context={"source": config_entries.SOURCE_USER, "show_advanced_options": True},
|
||||||
|
)
|
||||||
|
assert result["type"] == FlowResultType.FORM
|
||||||
|
assert result["errors"] is None
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.imap.config_flow.connect_to_server",
|
||||||
|
side_effect=asyncio.TimeoutError,
|
||||||
|
):
|
||||||
|
result2 = await hass.config_entries.flow.async_configure(
|
||||||
|
result["flow_id"], config
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
assert result2["type"] == FlowResultType.FORM
|
||||||
|
assert result2["errors"]["base"] == "cannot_connect"
|
||||||
|
assert "ssl_cipher_list" in result2["data_schema"].schema
|
||||||
|
|
||||||
|
config["ssl_cipher_list"] = "modern"
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.imap.config_flow.connect_to_server"
|
||||||
|
) as mock_client:
|
||||||
|
mock_client.return_value.search.return_value = (
|
||||||
|
"OK",
|
||||||
|
[b""],
|
||||||
|
)
|
||||||
|
result3 = await hass.config_entries.flow.async_configure(
|
||||||
|
result2["flow_id"], config
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
assert result3["type"] == FlowResultType.CREATE_ENTRY
|
||||||
|
assert result3["title"] == "email@email.com"
|
||||||
|
assert result3["data"] == config
|
||||||
|
assert len(mock_setup_entry.mock_calls) == 1
|
||||||
|
|
|
@ -34,16 +34,28 @@ from tests.common import MockConfigEntry, async_capture_events, async_fire_time_
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"cipher_list", [None, "python_default", "modern", "intermediate"]
|
("cipher_list", "verify_ssl"),
|
||||||
|
[
|
||||||
|
(None, None),
|
||||||
|
("python_default", True),
|
||||||
|
("python_default", False),
|
||||||
|
("modern", True),
|
||||||
|
("intermediate", True),
|
||||||
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
|
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
|
||||||
async def test_entry_startup_and_unload(
|
async def test_entry_startup_and_unload(
|
||||||
hass: HomeAssistant, mock_imap_protocol: MagicMock, cipher_list: str
|
hass: HomeAssistant,
|
||||||
|
mock_imap_protocol: MagicMock,
|
||||||
|
cipher_list: str | None,
|
||||||
|
verify_ssl: bool | None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test imap entry startup and unload with push and polling coordinator and alternate ciphers."""
|
"""Test imap entry startup and unload with push and polling coordinator and alternate ciphers."""
|
||||||
config = MOCK_CONFIG.copy()
|
config = MOCK_CONFIG.copy()
|
||||||
if cipher_list:
|
if cipher_list is not None:
|
||||||
config["ssl_cipher_list"] = cipher_list
|
config["ssl_cipher_list"] = cipher_list
|
||||||
|
if verify_ssl is not None:
|
||||||
|
config["verify_ssl"] = verify_ssl
|
||||||
|
|
||||||
config_entry = MockConfigEntry(domain=DOMAIN, data=config)
|
config_entry = MockConfigEntry(domain=DOMAIN, data=config)
|
||||||
config_entry.add_to_hass(hass)
|
config_entry.add_to_hass(hass)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue