Refactor imap coordinator (#89759)
* Warn if the previous push wait task it taking longer than the update interval * refactor * Call _async_fetch_number_of_messages first * Add cleanup in case fetching fails * mypy * Set sensor to unknown if an error occured. * Handling invalid auth an reraise when needed * Handle invalid folder as setup error * Close IMAP stream before logout at cleanup --------- Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
parent
469dbec089
commit
5f22796b38
3 changed files with 185 additions and 64 deletions
|
@ -15,7 +15,11 @@ from homeassistant.exceptions import (
|
||||||
)
|
)
|
||||||
|
|
||||||
from .const import DOMAIN
|
from .const import DOMAIN
|
||||||
from .coordinator import ImapDataUpdateCoordinator, connect_to_server
|
from .coordinator import (
|
||||||
|
ImapPollingDataUpdateCoordinator,
|
||||||
|
ImapPushDataUpdateCoordinator,
|
||||||
|
connect_to_server,
|
||||||
|
)
|
||||||
from .errors import InvalidAuth, InvalidFolder
|
from .errors import InvalidAuth, InvalidFolder
|
||||||
|
|
||||||
PLATFORMS: list[Platform] = [Platform.SENSOR]
|
PLATFORMS: list[Platform] = [Platform.SENSOR]
|
||||||
|
@ -32,7 +36,17 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
except (asyncio.TimeoutError, AioImapException) as err:
|
except (asyncio.TimeoutError, AioImapException) as err:
|
||||||
raise ConfigEntryNotReady from err
|
raise ConfigEntryNotReady from err
|
||||||
|
|
||||||
coordinator = ImapDataUpdateCoordinator(hass, imap_client)
|
coordinator_class: type[
|
||||||
|
ImapPushDataUpdateCoordinator | ImapPollingDataUpdateCoordinator
|
||||||
|
]
|
||||||
|
if imap_client.has_capability("IDLE"):
|
||||||
|
coordinator_class = ImapPushDataUpdateCoordinator
|
||||||
|
else:
|
||||||
|
coordinator_class = ImapPollingDataUpdateCoordinator
|
||||||
|
|
||||||
|
coordinator: ImapPushDataUpdateCoordinator | ImapPollingDataUpdateCoordinator = (
|
||||||
|
coordinator_class(hass, imap_client)
|
||||||
|
)
|
||||||
await coordinator.async_config_entry_first_refresh()
|
await coordinator.async_config_entry_first_refresh()
|
||||||
|
|
||||||
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
|
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
|
||||||
|
@ -49,6 +63,10 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
"""Unload a config entry."""
|
"""Unload a config entry."""
|
||||||
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
|
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
|
||||||
coordinator: ImapDataUpdateCoordinator = hass.data[DOMAIN].pop(entry.entry_id)
|
coordinator: ImapPushDataUpdateCoordinator | ImapPollingDataUpdateCoordinator = hass.data[
|
||||||
|
DOMAIN
|
||||||
|
].pop(
|
||||||
|
entry.entry_id
|
||||||
|
)
|
||||||
await coordinator.shutdown()
|
await coordinator.shutdown()
|
||||||
return unload_ok
|
return unload_ok
|
||||||
|
|
|
@ -10,9 +10,10 @@ from typing import Any
|
||||||
from aioimaplib import AUTH, IMAP4_SSL, SELECTED, AioImapException
|
from aioimaplib import AUTH, IMAP4_SSL, SELECTED, AioImapException
|
||||||
import async_timeout
|
import async_timeout
|
||||||
|
|
||||||
from homeassistant.config_entries import ConfigEntry
|
from homeassistant.config_entries import ConfigEntry, ConfigEntryState
|
||||||
from homeassistant.const import CONF_PASSWORD, CONF_PORT, CONF_USERNAME
|
from homeassistant.const import CONF_PASSWORD, CONF_PORT, CONF_USERNAME
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import HomeAssistant
|
||||||
|
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryError
|
||||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||||
|
|
||||||
from .const import CONF_CHARSET, CONF_FOLDER, CONF_SEARCH, CONF_SERVER, DOMAIN
|
from .const import CONF_CHARSET, CONF_FOLDER, CONF_SEARCH, CONF_SERVER, DOMAIN
|
||||||
|
@ -20,6 +21,8 @@ from .errors import InvalidAuth, InvalidFolder
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
BACKOFF_TIME = 10
|
||||||
|
|
||||||
|
|
||||||
async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL:
|
async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL:
|
||||||
"""Connect to imap server and return client."""
|
"""Connect to imap server and return client."""
|
||||||
|
@ -27,63 +30,159 @@ async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL:
|
||||||
await client.wait_hello_from_server()
|
await client.wait_hello_from_server()
|
||||||
await client.login(data[CONF_USERNAME], data[CONF_PASSWORD])
|
await client.login(data[CONF_USERNAME], data[CONF_PASSWORD])
|
||||||
if client.protocol.state != AUTH:
|
if client.protocol.state != AUTH:
|
||||||
raise InvalidAuth
|
raise InvalidAuth("Invalid username or password")
|
||||||
await client.select(data[CONF_FOLDER])
|
await client.select(data[CONF_FOLDER])
|
||||||
if client.protocol.state != SELECTED:
|
if client.protocol.state != SELECTED:
|
||||||
raise InvalidFolder
|
raise InvalidFolder(f"Folder {data[CONF_FOLDER]} is invalid")
|
||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
||||||
class ImapDataUpdateCoordinator(DataUpdateCoordinator[int]):
|
class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]):
|
||||||
"""Class for imap client."""
|
"""Base class for imap client."""
|
||||||
|
|
||||||
config_entry: ConfigEntry
|
config_entry: ConfigEntry
|
||||||
|
|
||||||
def __init__(self, hass: HomeAssistant, imap_client: IMAP4_SSL) -> None:
|
def __init__(
|
||||||
|
self,
|
||||||
|
hass: HomeAssistant,
|
||||||
|
imap_client: IMAP4_SSL,
|
||||||
|
update_interval: timedelta | None,
|
||||||
|
) -> None:
|
||||||
"""Initiate imap client."""
|
"""Initiate imap client."""
|
||||||
self.hass = hass
|
|
||||||
self.imap_client = imap_client
|
self.imap_client = imap_client
|
||||||
self.support_push = imap_client.has_capability("IDLE")
|
|
||||||
super().__init__(
|
super().__init__(
|
||||||
hass,
|
hass,
|
||||||
_LOGGER,
|
_LOGGER,
|
||||||
name=DOMAIN,
|
name=DOMAIN,
|
||||||
update_interval=timedelta(seconds=10) if not self.support_push else None,
|
update_interval=update_interval,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _async_update_data(self) -> int:
|
async def async_start(self) -> None:
|
||||||
"""Update the number of unread emails."""
|
"""Start coordinator."""
|
||||||
try:
|
|
||||||
|
async def _async_reconnect_if_needed(self) -> None:
|
||||||
|
"""Connect to imap server."""
|
||||||
if self.imap_client is None:
|
if self.imap_client is None:
|
||||||
self.imap_client = await connect_to_server(self.config_entry.data)
|
self.imap_client = await connect_to_server(self.config_entry.data)
|
||||||
except (AioImapException, asyncio.TimeoutError) as err:
|
|
||||||
raise UpdateFailed(err) from err
|
|
||||||
|
|
||||||
return await self.refresh_email_count()
|
async def _async_fetch_number_of_messages(self) -> int | None:
|
||||||
|
"""Fetch number of messages."""
|
||||||
async def refresh_email_count(self) -> int:
|
await self._async_reconnect_if_needed()
|
||||||
"""Check the number of found emails."""
|
|
||||||
try:
|
|
||||||
await self.imap_client.noop()
|
await self.imap_client.noop()
|
||||||
result, lines = await self.imap_client.search(
|
result, lines = await self.imap_client.search(
|
||||||
self.config_entry.data[CONF_SEARCH],
|
self.config_entry.data[CONF_SEARCH],
|
||||||
charset=self.config_entry.data[CONF_CHARSET],
|
charset=self.config_entry.data[CONF_CHARSET],
|
||||||
)
|
)
|
||||||
except (AioImapException, asyncio.TimeoutError) as err:
|
|
||||||
raise UpdateFailed(err) from err
|
|
||||||
|
|
||||||
if result != "OK":
|
if result != "OK":
|
||||||
raise UpdateFailed(
|
raise UpdateFailed(
|
||||||
f"Invalid response for search '{self.config_entry.data[CONF_SEARCH]}': {result} / {lines[0]}"
|
f"Invalid response for search '{self.config_entry.data[CONF_SEARCH]}': {result} / {lines[0]}"
|
||||||
)
|
)
|
||||||
if self.support_push:
|
|
||||||
self.hass.async_create_background_task(
|
|
||||||
self.async_wait_server_push(), "Wait for IMAP data push"
|
|
||||||
)
|
|
||||||
return len(lines[0].split())
|
return len(lines[0].split())
|
||||||
|
|
||||||
async def async_wait_server_push(self) -> None:
|
async def _cleanup(self, log_error: bool = False) -> None:
|
||||||
|
"""Close resources."""
|
||||||
|
if self.imap_client:
|
||||||
|
try:
|
||||||
|
if self.imap_client.has_pending_idle():
|
||||||
|
self.imap_client.idle_done()
|
||||||
|
await self.imap_client.stop_wait_server_push()
|
||||||
|
await self.imap_client.close()
|
||||||
|
await self.imap_client.logout()
|
||||||
|
except (AioImapException, asyncio.TimeoutError) as ex:
|
||||||
|
if log_error:
|
||||||
|
self.async_set_update_error(ex)
|
||||||
|
_LOGGER.warning("Error while cleaning up imap connection")
|
||||||
|
self.imap_client = None
|
||||||
|
|
||||||
|
async def shutdown(self, *_) -> None:
|
||||||
|
"""Close resources."""
|
||||||
|
await self._cleanup(log_error=True)
|
||||||
|
|
||||||
|
|
||||||
|
class ImapPollingDataUpdateCoordinator(ImapDataUpdateCoordinator):
|
||||||
|
"""Class for imap client."""
|
||||||
|
|
||||||
|
def __init__(self, hass: HomeAssistant, imap_client: IMAP4_SSL) -> None:
|
||||||
|
"""Initiate imap client."""
|
||||||
|
super().__init__(hass, imap_client, timedelta(seconds=10))
|
||||||
|
|
||||||
|
async def _async_update_data(self) -> int | None:
|
||||||
|
"""Update the number of unread emails."""
|
||||||
|
try:
|
||||||
|
return await self._async_fetch_number_of_messages()
|
||||||
|
except (
|
||||||
|
AioImapException,
|
||||||
|
UpdateFailed,
|
||||||
|
asyncio.TimeoutError,
|
||||||
|
) as ex:
|
||||||
|
self.async_set_update_error(ex)
|
||||||
|
await self._cleanup()
|
||||||
|
raise UpdateFailed() from ex
|
||||||
|
except InvalidFolder as ex:
|
||||||
|
_LOGGER.warning("Selected mailbox folder is invalid")
|
||||||
|
self.async_set_update_error(ex)
|
||||||
|
await self._cleanup()
|
||||||
|
raise ConfigEntryError("Selected mailbox folder is invalid.") from ex
|
||||||
|
except InvalidAuth as ex:
|
||||||
|
_LOGGER.warning("Username or password incorrect, starting reauthentication")
|
||||||
|
self.async_set_update_error(ex)
|
||||||
|
await self._cleanup()
|
||||||
|
raise ConfigEntryAuthFailed() from ex
|
||||||
|
|
||||||
|
|
||||||
|
class ImapPushDataUpdateCoordinator(ImapDataUpdateCoordinator):
|
||||||
|
"""Class for imap client."""
|
||||||
|
|
||||||
|
def __init__(self, hass: HomeAssistant, imap_client: IMAP4_SSL) -> None:
|
||||||
|
"""Initiate imap client."""
|
||||||
|
super().__init__(hass, imap_client, None)
|
||||||
|
self._push_wait_task: asyncio.Task[None] | None = None
|
||||||
|
|
||||||
|
async def _async_update_data(self) -> int | None:
|
||||||
|
"""Update the number of unread emails."""
|
||||||
|
await self.async_start()
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def async_start(self) -> None:
|
||||||
|
"""Start coordinator."""
|
||||||
|
self._push_wait_task = self.hass.async_create_background_task(
|
||||||
|
self._async_wait_push_loop(), "Wait for IMAP data push"
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _async_wait_push_loop(self) -> None:
|
||||||
"""Wait for data push from server."""
|
"""Wait for data push from server."""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
number_of_messages = await self._async_fetch_number_of_messages()
|
||||||
|
except InvalidAuth as ex:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Username or password incorrect, starting reauthentication"
|
||||||
|
)
|
||||||
|
self.config_entry.async_start_reauth(self.hass)
|
||||||
|
self.async_set_update_error(ex)
|
||||||
|
await self._cleanup()
|
||||||
|
await asyncio.sleep(BACKOFF_TIME)
|
||||||
|
except InvalidFolder as ex:
|
||||||
|
_LOGGER.warning("Selected mailbox folder is invalid")
|
||||||
|
self.config_entry.async_set_state(
|
||||||
|
self.hass,
|
||||||
|
ConfigEntryState.SETUP_ERROR,
|
||||||
|
"Selected mailbox folder is invalid.",
|
||||||
|
)
|
||||||
|
self.async_set_update_error(ex)
|
||||||
|
await self._cleanup()
|
||||||
|
await asyncio.sleep(BACKOFF_TIME)
|
||||||
|
except (
|
||||||
|
UpdateFailed,
|
||||||
|
AioImapException,
|
||||||
|
asyncio.TimeoutError,
|
||||||
|
) as ex:
|
||||||
|
self.async_set_update_error(ex)
|
||||||
|
await self._cleanup()
|
||||||
|
await asyncio.sleep(BACKOFF_TIME)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
self.async_set_updated_data(number_of_messages)
|
||||||
try:
|
try:
|
||||||
idle: asyncio.Future = await self.imap_client.idle_start()
|
idle: asyncio.Future = await self.imap_client.idle_start()
|
||||||
await self.imap_client.wait_server_push()
|
await self.imap_client.wait_server_push()
|
||||||
|
@ -93,16 +192,17 @@ class ImapDataUpdateCoordinator(DataUpdateCoordinator[int]):
|
||||||
|
|
||||||
except (AioImapException, asyncio.TimeoutError):
|
except (AioImapException, asyncio.TimeoutError):
|
||||||
_LOGGER.warning(
|
_LOGGER.warning(
|
||||||
"Lost %s (will attempt to reconnect)",
|
"Lost %s (will attempt to reconnect after %s s)",
|
||||||
self.config_entry.data[CONF_SERVER],
|
self.config_entry.data[CONF_SERVER],
|
||||||
|
BACKOFF_TIME,
|
||||||
)
|
)
|
||||||
self.imap_client = None
|
self.async_set_update_error(UpdateFailed("Lost connection"))
|
||||||
await self.async_request_refresh()
|
await self._cleanup()
|
||||||
|
await asyncio.sleep(BACKOFF_TIME)
|
||||||
|
continue
|
||||||
|
|
||||||
async def shutdown(self, *_) -> None:
|
async def shutdown(self, *_) -> None:
|
||||||
"""Close resources."""
|
"""Close resources."""
|
||||||
if self.imap_client:
|
if self._push_wait_task:
|
||||||
if self.imap_client.has_pending_idle():
|
self._push_wait_task.cancel()
|
||||||
self.imap_client.idle_done()
|
await super().shutdown()
|
||||||
await self.imap_client.stop_wait_server_push()
|
|
||||||
await self.imap_client.logout()
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ from homeassistant.helpers.issue_registry import IssueSeverity, async_create_iss
|
||||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||||
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
||||||
|
|
||||||
from . import ImapDataUpdateCoordinator
|
from . import ImapPollingDataUpdateCoordinator, ImapPushDataUpdateCoordinator
|
||||||
from .const import (
|
from .const import (
|
||||||
CONF_CHARSET,
|
CONF_CHARSET,
|
||||||
CONF_FOLDER,
|
CONF_FOLDER,
|
||||||
|
@ -69,18 +69,26 @@ async def async_setup_entry(
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Set up the Imap sensor."""
|
"""Set up the Imap sensor."""
|
||||||
|
|
||||||
coordinator: ImapDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
|
coordinator: ImapPushDataUpdateCoordinator | ImapPollingDataUpdateCoordinator = (
|
||||||
|
hass.data[DOMAIN][entry.entry_id]
|
||||||
|
)
|
||||||
|
|
||||||
async_add_entities([ImapSensor(coordinator)])
|
async_add_entities([ImapSensor(coordinator)])
|
||||||
|
|
||||||
|
|
||||||
class ImapSensor(CoordinatorEntity[ImapDataUpdateCoordinator], SensorEntity):
|
class ImapSensor(
|
||||||
|
CoordinatorEntity[ImapPushDataUpdateCoordinator | ImapPollingDataUpdateCoordinator],
|
||||||
|
SensorEntity,
|
||||||
|
):
|
||||||
"""Representation of an IMAP sensor."""
|
"""Representation of an IMAP sensor."""
|
||||||
|
|
||||||
_attr_icon = "mdi:email-outline"
|
_attr_icon = "mdi:email-outline"
|
||||||
_attr_has_entity_name = True
|
_attr_has_entity_name = True
|
||||||
|
|
||||||
def __init__(self, coordinator: ImapDataUpdateCoordinator) -> None:
|
def __init__(
|
||||||
|
self,
|
||||||
|
coordinator: ImapPushDataUpdateCoordinator | ImapPollingDataUpdateCoordinator,
|
||||||
|
) -> None:
|
||||||
"""Initialize the sensor."""
|
"""Initialize the sensor."""
|
||||||
super().__init__(coordinator)
|
super().__init__(coordinator)
|
||||||
# To be removed when YAML import is removed
|
# To be removed when YAML import is removed
|
||||||
|
@ -95,11 +103,6 @@ class ImapSensor(CoordinatorEntity[ImapDataUpdateCoordinator], SensorEntity):
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def native_value(self) -> int:
|
def native_value(self) -> int | None:
|
||||||
"""Return the number of emails found."""
|
"""Return the number of emails found."""
|
||||||
return self.coordinator.data
|
return self.coordinator.data
|
||||||
|
|
||||||
async def async_update(self) -> None:
|
|
||||||
"""Check for idle state before updating."""
|
|
||||||
if not await self.coordinator.imap_client.stop_wait_server_push():
|
|
||||||
await super().async_update()
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue