Add entry diagnostics to imap integration (#109460)

This commit is contained in:
Jan Bouwhuis 2024-02-09 11:30:27 +01:00 committed by GitHub
parent 793b6aa97d
commit 8aa4157290
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 143 additions and 0 deletions

View file

@ -30,6 +30,7 @@ from homeassistant.exceptions import (
from homeassistant.helpers.json import json_bytes
from homeassistant.helpers.template import Template
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
from homeassistant.util.ssl import (
SSLCipherList,
client_context,
@ -57,6 +58,8 @@ EVENT_IMAP = "imap_content"
MAX_ERRORS = 3
MAX_EVENT_DATA_BYTES = 32168
DIAGNOSTICS_ATTRIBUTES = ["date", "initial"]
async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL:
"""Connect to imap server and return client."""
@ -220,6 +223,7 @@ class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]):
self._last_message_uid: str | None = None
self._last_message_id: str | None = None
self.custom_event_template = None
self._diagnostics_data: dict[str, Any] = {}
_custom_event_template = entry.data.get(CONF_CUSTOM_EVENT_DATA_TEMPLATE)
if _custom_event_template is not None:
self.custom_event_template = Template(_custom_event_template, hass=hass)
@ -287,6 +291,7 @@ class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]):
CONF_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE
)
]
self._update_diagnostics(data)
if (size := len(json_bytes(data))) > MAX_EVENT_DATA_BYTES:
_LOGGER.warning(
"Custom imap_content event skipped, size (%s) exceeds "
@ -357,6 +362,23 @@ class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]):
"""Close resources."""
await self._cleanup(log_error=True)
def _update_diagnostics(self, data: dict[str, Any]) -> None:
"""Update the diagnostics."""
self._diagnostics_data.update(
{key: value for key, value in data.items() if key in DIAGNOSTICS_ATTRIBUTES}
)
custom: Any | None = data.get("custom")
self._diagnostics_data["custom_template_data_type"] = str(type(custom))
self._diagnostics_data["custom_template_result_length"] = (
None if custom is None else len(f"{custom}")
)
self._diagnostics_data["event_time"] = dt_util.now().isoformat()
@property
def diagnostics_data(self) -> dict[str, Any]:
"""Return diagnostics info."""
return self._diagnostics_data
class ImapPollingDataUpdateCoordinator(ImapDataUpdateCoordinator):
"""Class for imap client."""

View file

@ -0,0 +1,38 @@
"""Diagnostics support for IMAP."""
from __future__ import annotations
from typing import Any
from homeassistant.components.diagnostics import async_redact_data
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant, callback
from .const import DOMAIN
from .coordinator import ImapDataUpdateCoordinator
REDACT_CONFIG = {CONF_PASSWORD, CONF_USERNAME}
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: ConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
return _async_get_diagnostics(hass, entry)
@callback
def _async_get_diagnostics(
hass: HomeAssistant,
entry: ConfigEntry,
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
redacted_config = async_redact_data(entry.data, REDACT_CONFIG)
coordinator: ImapDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
data = {
"config": redacted_config,
"event": coordinator.diagnostics_data,
}
return data

View file

@ -0,0 +1,83 @@
"""Test IMAP diagnostics."""
from datetime import timedelta
from typing import Any
from unittest.mock import MagicMock
import pytest
from homeassistant.components import imap
from homeassistant.components.sensor.const import SensorStateClass
from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util
from .const import TEST_FETCH_RESPONSE_TEXT_PLAIN, TEST_SEARCH_RESPONSE
from .test_config_flow import MOCK_CONFIG
from tests.common import MockConfigEntry, async_capture_events, async_fire_time_changed
from tests.components.diagnostics import get_diagnostics_for_config_entry
from tests.typing import ClientSessionGenerator
@pytest.mark.parametrize("imap_search", [TEST_SEARCH_RESPONSE])
@pytest.mark.parametrize("imap_fetch", [TEST_FETCH_RESPONSE_TEXT_PLAIN])
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
async def test_entry_diagnostics(
hass: HomeAssistant,
mock_imap_protocol: MagicMock,
hass_client: ClientSessionGenerator,
) -> None:
"""Test receiving a message successfully."""
event_called = async_capture_events(hass, "imap_content")
template = "{{ 4 * 4 }}"
config = MOCK_CONFIG.copy()
config["custom_event_data_template"] = template
config_entry = MockConfigEntry(domain=imap.DOMAIN, data=config)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
# Make sure we have had one update (when polling)
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=5))
await hass.async_block_till_done()
state = hass.states.get("sensor.imap_email_email_com")
# we should have received one message
assert state is not None
assert state.state == "1"
assert state.attributes["state_class"] == SensorStateClass.MEASUREMENT
# we should have received one event
assert len(event_called) == 1
data: dict[str, Any] = event_called[0].data
assert data["server"] == "imap.server.com"
assert data["username"] == "email@email.com"
assert data["search"] == "UnSeen UnDeleted"
assert data["folder"] == "INBOX"
assert data["sender"] == "john.doe@example.com"
assert data["subject"] == "Test subject"
await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
expected_config = {
"username": "**REDACTED**",
"password": "**REDACTED**",
"server": "imap.server.com",
"port": 993,
"charset": "utf-8",
"folder": "INBOX",
"search": "UnSeen UnDeleted",
"custom_event_data_template": "{{ 4 * 4 }}",
}
expected_event_data = {
"date": "2023-03-24T13:52:00+01:00",
"initial": True,
"custom_template_data_type": "<class 'int'>",
"custom_template_result_length": 2,
}
diagnostics = await get_diagnostics_for_config_entry(
hass, hass_client, config_entry
)
assert diagnostics["config"] == expected_config
event_data = diagnostics["event"]
assert event_data.pop("event_time") is not None
assert event_data == expected_event_data