Add config flow to Feedreader (#118047)

This commit is contained in:
Michael 2024-06-21 20:23:47 +02:00 committed by GitHub
parent ba7388546e
commit d6be733287
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 897 additions and 200 deletions

View file

@ -2,61 +2,119 @@
from __future__ import annotations
import asyncio
from datetime import timedelta
import voluptuous as vol
from homeassistant.const import CONF_SCAN_INTERVAL
from homeassistant.core import HomeAssistant
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import CONF_SCAN_INTERVAL, CONF_URL
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from homeassistant.helpers.typing import ConfigType
from homeassistant.util.hass_dict import HassKey
from .const import DOMAIN
from .const import CONF_MAX_ENTRIES, DEFAULT_MAX_ENTRIES, DEFAULT_SCAN_INTERVAL, DOMAIN
from .coordinator import FeedReaderCoordinator, StoredData
type FeedReaderConfigEntry = ConfigEntry[FeedReaderCoordinator]
CONF_URLS = "urls"
CONF_MAX_ENTRIES = "max_entries"
DEFAULT_MAX_ENTRIES = 20
DEFAULT_SCAN_INTERVAL = timedelta(hours=1)
MY_KEY: HassKey[StoredData] = HassKey(DOMAIN)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: {
vol.Required(CONF_URLS): vol.All(cv.ensure_list, [cv.url]),
vol.Optional(
CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL
): cv.time_period,
vol.Optional(
CONF_MAX_ENTRIES, default=DEFAULT_MAX_ENTRIES
): cv.positive_int,
}
},
vol.All(
cv.deprecated(DOMAIN),
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_URLS): vol.All(cv.ensure_list, [cv.url]),
vol.Optional(
CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL
): cv.time_period,
vol.Optional(
CONF_MAX_ENTRIES, default=DEFAULT_MAX_ENTRIES
): cv.positive_int,
}
)
},
),
extra=vol.ALLOW_EXTRA,
)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Feedreader component."""
urls: list[str] = config[DOMAIN][CONF_URLS]
if not urls:
return False
if DOMAIN in config:
for url in config[DOMAIN][CONF_URLS]:
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_URL: url,
CONF_MAX_ENTRIES: config[DOMAIN][CONF_MAX_ENTRIES],
},
)
)
scan_interval: timedelta = config[DOMAIN][CONF_SCAN_INTERVAL]
max_entries: int = config[DOMAIN][CONF_MAX_ENTRIES]
storage = StoredData(hass)
await storage.async_setup()
feeds = [
FeedReaderCoordinator(hass, url, scan_interval, max_entries, storage)
for url in urls
]
async_create_issue(
hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_yaml_{DOMAIN}",
breaks_in_ha_version="2025.1.0",
is_fixable=False,
is_persistent=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_yaml",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "Feedreader",
},
)
await asyncio.gather(*[feed.async_refresh() for feed in feeds])
return True
async def async_setup_entry(hass: HomeAssistant, entry: FeedReaderConfigEntry) -> bool:
"""Set up Feedreader from a config entry."""
storage = hass.data.setdefault(MY_KEY, StoredData(hass))
if not storage.is_initialized:
await storage.async_setup()
coordinator = FeedReaderCoordinator(
hass,
entry.data[CONF_URL],
entry.options[CONF_MAX_ENTRIES],
storage,
)
await coordinator.async_config_entry_first_refresh()
# workaround because coordinators without listeners won't update
# can be removed when we have entities to update
[feed.async_add_listener(lambda: None) for feed in feeds]
coordinator.async_add_listener(lambda: None)
entry.runtime_data = coordinator
entry.async_on_unload(entry.add_update_listener(_async_update_listener))
return True
async def async_unload_entry(hass: HomeAssistant, entry: FeedReaderConfigEntry) -> bool:
"""Unload a config entry."""
entries = hass.config_entries.async_entries(
DOMAIN, include_disabled=False, include_ignore=False
)
# if this is the last entry, remove the storage
if len(entries) == 1:
hass.data.pop(MY_KEY)
return True
async def _async_update_listener(
hass: HomeAssistant, entry: FeedReaderConfigEntry
) -> None:
"""Handle reconfiguration."""
await hass.config_entries.async_reload(entry.entry_id)

View file

@ -0,0 +1,195 @@
"""Config flow for RSS/Atom feeds."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
import urllib.error
import feedparser
import voluptuous as vol
from homeassistant.config_entries import (
SOURCE_IMPORT,
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
OptionsFlow,
OptionsFlowWithConfigEntry,
)
from homeassistant.const import CONF_URL
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from homeassistant.helpers.selector import (
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from homeassistant.util import slugify
from .const import CONF_MAX_ENTRIES, DEFAULT_MAX_ENTRIES, DOMAIN
LOGGER = logging.getLogger(__name__)
async def async_fetch_feed(hass: HomeAssistant, url: str) -> feedparser.FeedParserDict:
"""Fetch the feed."""
return await hass.async_add_executor_job(feedparser.parse, url)
class FeedReaderConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow."""
VERSION = 1
_config_entry: ConfigEntry
_max_entries: int | None = None
@staticmethod
@callback
def async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlow:
"""Get the options flow for this handler."""
return FeedReaderOptionsFlowHandler(config_entry)
def show_user_form(
self,
user_input: dict[str, Any] | None = None,
errors: dict[str, str] | None = None,
description_placeholders: dict[str, str] | None = None,
step_id: str = "user",
) -> ConfigFlowResult:
"""Show the user form."""
if user_input is None:
user_input = {}
return self.async_show_form(
step_id=step_id,
data_schema=vol.Schema(
{
vol.Required(
CONF_URL, default=user_input.get(CONF_URL, "")
): TextSelector(TextSelectorConfig(type=TextSelectorType.URL))
}
),
description_placeholders=description_placeholders,
errors=errors,
)
def abort_on_import_error(self, url: str, error: str) -> ConfigFlowResult:
"""Abort import flow on error."""
async_create_issue(
self.hass,
DOMAIN,
f"import_yaml_error_{DOMAIN}_{error}_{slugify(url)}",
breaks_in_ha_version="2025.1.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key=f"import_yaml_error_{error}",
translation_placeholders={"url": url},
)
return self.async_abort(reason=error)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initialized by the user."""
if not user_input:
return self.show_user_form()
self._async_abort_entries_match({CONF_URL: user_input[CONF_URL]})
feed = await async_fetch_feed(self.hass, user_input[CONF_URL])
if feed.bozo:
LOGGER.debug("feed bozo_exception: %s", feed.bozo_exception)
if isinstance(feed.bozo_exception, urllib.error.URLError):
if self.context["source"] == SOURCE_IMPORT:
return self.abort_on_import_error(user_input[CONF_URL], "url_error")
return self.show_user_form(user_input, {"base": "url_error"})
if not feed.entries:
if self.context["source"] == SOURCE_IMPORT:
return self.abort_on_import_error(
user_input[CONF_URL], "no_feed_entries"
)
return self.show_user_form(user_input, {"base": "no_feed_entries"})
feed_title = feed["feed"]["title"]
return self.async_create_entry(
title=feed_title,
data=user_input,
options={CONF_MAX_ENTRIES: self._max_entries or DEFAULT_MAX_ENTRIES},
)
async def async_step_import(self, user_input: dict[str, Any]) -> ConfigFlowResult:
"""Handle an import flow."""
self._max_entries = user_input[CONF_MAX_ENTRIES]
return await self.async_step_user({CONF_URL: user_input[CONF_URL]})
async def async_step_reconfigure(
self, _: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a reconfiguration flow initialized by the user."""
config_entry = self.hass.config_entries.async_get_entry(
self.context["entry_id"]
)
if TYPE_CHECKING:
assert config_entry is not None
self._config_entry = config_entry
return await self.async_step_reconfigure_confirm()
async def async_step_reconfigure_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a reconfiguration flow initialized by the user."""
if not user_input:
return self.show_user_form(
user_input={**self._config_entry.data},
description_placeholders={"name": self._config_entry.title},
step_id="reconfigure_confirm",
)
feed = await async_fetch_feed(self.hass, user_input[CONF_URL])
if feed.bozo:
LOGGER.debug("feed bozo_exception: %s", feed.bozo_exception)
if isinstance(feed.bozo_exception, urllib.error.URLError):
return self.show_user_form(
user_input=user_input,
description_placeholders={"name": self._config_entry.title},
step_id="reconfigure_confirm",
errors={"base": "url_error"},
)
if not feed.entries:
return self.show_user_form(
user_input=user_input,
description_placeholders={"name": self._config_entry.title},
step_id="reconfigure_confirm",
errors={"base": "no_feed_entries"},
)
self.hass.config_entries.async_update_entry(self._config_entry, data=user_input)
return self.async_abort(reason="reconfigure_successful")
class FeedReaderOptionsFlowHandler(OptionsFlowWithConfigEntry):
"""Handle an options flow."""
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle options flow."""
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
data_schema = vol.Schema(
{
vol.Optional(
CONF_MAX_ENTRIES,
default=self.options.get(CONF_MAX_ENTRIES, DEFAULT_MAX_ENTRIES),
): cv.positive_int,
}
)
return self.async_show_form(step_id="init", data_schema=data_schema)

View file

@ -1,3 +1,9 @@
"""Constants for RSS/Atom feeds."""
from datetime import timedelta
DOMAIN = "feedreader"
CONF_MAX_ENTRIES = "max_entries"
DEFAULT_MAX_ENTRIES = 20
DEFAULT_SCAN_INTERVAL = timedelta(hours=1)

View file

@ -3,18 +3,19 @@
from __future__ import annotations
from calendar import timegm
from datetime import datetime, timedelta
from datetime import datetime
from logging import getLogger
from time import gmtime, struct_time
from urllib.error import URLError
import feedparser
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
from .const import DOMAIN
from .const import DEFAULT_SCAN_INTERVAL, DOMAIN
DELAY_SAVE = 30
EVENT_FEEDREADER = "feedreader"
@ -31,7 +32,6 @@ class FeedReaderCoordinator(DataUpdateCoordinator[None]):
self,
hass: HomeAssistant,
url: str,
scan_interval: timedelta,
max_entries: int,
storage: StoredData,
) -> None:
@ -40,7 +40,7 @@ class FeedReaderCoordinator(DataUpdateCoordinator[None]):
hass=hass,
logger=_LOGGER,
name=f"{DOMAIN} {url}",
update_interval=scan_interval,
update_interval=DEFAULT_SCAN_INTERVAL,
)
self._url = url
self._max_entries = max_entries
@ -69,8 +69,8 @@ class FeedReaderCoordinator(DataUpdateCoordinator[None]):
self._feed = await self.hass.async_add_executor_job(self._fetch_feed)
if not self._feed:
_LOGGER.error("Error fetching feed data from %s", self._url)
return None
raise UpdateFailed(f"Error fetching feed data from {self._url}")
# The 'bozo' flag really only indicates that there was an issue
# during the initial parsing of the XML, but it doesn't indicate
# whether this is an unrecoverable error. In this case the
@ -78,6 +78,12 @@ class FeedReaderCoordinator(DataUpdateCoordinator[None]):
# If an error is detected here, log warning message but continue
# processing the feed entries if present.
if self._feed.bozo != 0:
if isinstance(self._feed.bozo_exception, URLError):
raise UpdateFailed(
f"Error fetching feed data from {self._url}: {self._feed.bozo_exception}"
)
# no connection issue, but parsing issue
_LOGGER.warning(
"Possible issue parsing feed %s: %s",
self._url,
@ -169,16 +175,17 @@ class StoredData:
self._data: dict[str, struct_time] = {}
self.hass = hass
self._store: Store[dict[str, str]] = Store(hass, STORAGE_VERSION, DOMAIN)
self.is_initialized = False
async def async_setup(self) -> None:
"""Set up storage."""
if (store_data := await self._store.async_load()) is None:
return
# Make sure that dst is set to 0, by using gmtime() on the timestamp.
self._data = {
feed_id: gmtime(datetime.fromisoformat(timestamp_string).timestamp())
for feed_id, timestamp_string in store_data.items()
}
if (store_data := await self._store.async_load()) is not None:
# Make sure that dst is set to 0, by using gmtime() on the timestamp.
self._data = {
feed_id: gmtime(datetime.fromisoformat(timestamp_string).timestamp())
for feed_id, timestamp_string in store_data.items()
}
self.is_initialized = True
def get_timestamp(self, feed_id: str) -> struct_time | None:
"""Return stored timestamp for given feed id."""

View file

@ -2,6 +2,7 @@
"domain": "feedreader",
"name": "Feedreader",
"codeowners": [],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/feedreader",
"iot_class": "cloud_polling",
"loggers": ["feedparser", "sgmllib3k"],

View file

@ -0,0 +1,47 @@
{
"config": {
"step": {
"user": {
"data": {
"url": "[%key:common::config_flow::data::url%]"
}
},
"reconfigure_confirm": {
"description": "Update your configuration information for {name}.",
"data": {
"url": "[%key:common::config_flow::data::url%]"
}
}
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]",
"reconfigure_successful": "[%key:common::config_flow::abort::reconfigure_successful%]"
},
"error": {
"url_error": "The URL could not be opened.",
"no_feed_entries": "The URL seems not to serve any feed entries."
}
},
"options": {
"step": {
"init": {
"data": {
"max_entries": "Maximum feed entries"
},
"data_description": {
"max_entries": "The maximum number of entries to extract from each feed."
}
}
}
},
"issues": {
"import_yaml_error_url_error": {
"title": "The Feedreader YAML configuration import failed",
"description": "Configuring the Feedreader using YAML is being removed but there was a connection error when trying to import the YAML configuration for `{url}`.\n\nPlease verify that url is reachable and accessable for Home Assistant and restart Home Assistant to try again or remove the Feedreader YAML configuration from your configuration.yaml file and continue to set up the integration manually."
},
"import_yaml_error_no_feed_entries": {
"title": "[%key:component::feedreader::issues::import_yaml_error_url_error::title%]",
"description": "Configuring the Feedreader using YAML is being removed but when trying to import the YAML configuration for `{url}` no feed entries were found.\n\nPlease verify that url serves any feed entries and restart Home Assistant to try again or remove the Feedreader YAML configuration from your configuration.yaml file and continue to set up the integration manually."
}
}
}

View file

@ -166,6 +166,7 @@ FLOWS = {
"ezviz",
"faa_delays",
"fastdotcom",
"feedreader",
"fibaro",
"file",
"filesize",

View file

@ -1792,7 +1792,7 @@
"feedreader": {
"name": "Feedreader",
"integration_type": "hub",
"config_flow": false,
"config_flow": true,
"iot_class": "cloud_polling"
},
"ffmpeg": {

View file

@ -1 +1,48 @@
"""Tests for the feedreader component."""
from typing import Any
from unittest.mock import patch
from homeassistant.components.feedreader.const import CONF_MAX_ENTRIES, DOMAIN
from homeassistant.const import CONF_URL
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry, load_fixture
def load_fixture_bytes(src: str) -> bytes:
"""Return byte stream of fixture."""
feed_data = load_fixture(src, DOMAIN)
return bytes(feed_data, "utf-8")
def create_mock_entry(
data: dict[str, Any],
) -> MockConfigEntry:
"""Create config entry mock from data."""
return MockConfigEntry(
domain=DOMAIN,
data={CONF_URL: data[CONF_URL]},
options={CONF_MAX_ENTRIES: data[CONF_MAX_ENTRIES]},
)
async def async_setup_config_entry(
hass: HomeAssistant,
data: dict[str, Any],
return_value: bytes | None = None,
side_effect: bytes | None = None,
) -> bool:
"""Do setup of a MockConfigEntry."""
entry = create_mock_entry(data)
entry.add_to_hass(hass)
with patch(
"homeassistant.components.feedreader.coordinator.feedparser.http.get",
) as feedparser:
if return_value:
feedparser.return_value = return_value
if side_effect:
feedparser.side_effect = side_effect
result = await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
return result

View file

@ -0,0 +1,58 @@
"""Fixtures for the tests for the feedreader component."""
import pytest
from homeassistant.components.feedreader.coordinator import EVENT_FEEDREADER
from homeassistant.core import Event, HomeAssistant
from . import load_fixture_bytes
from tests.common import async_capture_events
@pytest.fixture(name="feed_one_event")
def fixture_feed_one_event(hass: HomeAssistant) -> bytes:
"""Load test feed data for one event."""
return load_fixture_bytes("feedreader.xml")
@pytest.fixture(name="feed_two_event")
def fixture_feed_two_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for two event."""
return load_fixture_bytes("feedreader1.xml")
@pytest.fixture(name="feed_21_events")
def fixture_feed_21_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for twenty one events."""
return load_fixture_bytes("feedreader2.xml")
@pytest.fixture(name="feed_three_events")
def fixture_feed_three_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for three events."""
return load_fixture_bytes("feedreader3.xml")
@pytest.fixture(name="feed_four_events")
def fixture_feed_four_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for three events."""
return load_fixture_bytes("feedreader4.xml")
@pytest.fixture(name="feed_atom_event")
def fixture_feed_atom_event(hass: HomeAssistant) -> bytes:
"""Load test feed data for atom event."""
return load_fixture_bytes("feedreader5.xml")
@pytest.fixture(name="feed_identically_timed_events")
def fixture_feed_identically_timed_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for two events published at the exact same time."""
return load_fixture_bytes("feedreader6.xml")
@pytest.fixture(name="events")
async def fixture_events(hass: HomeAssistant) -> list[Event]:
"""Fixture that catches alexa events."""
return async_capture_events(hass, EVENT_FEEDREADER)

View file

@ -0,0 +1,14 @@
"""Constants for the tests for the feedreader component."""
from homeassistant.components.feedreader.const import (
CONF_MAX_ENTRIES,
DEFAULT_MAX_ENTRIES,
)
from homeassistant.const import CONF_URL
URL = "http://some.rss.local/rss_feed.xml"
FEED_TITLE = "RSS Sample"
VALID_CONFIG_DEFAULT = {CONF_URL: URL, CONF_MAX_ENTRIES: DEFAULT_MAX_ENTRIES}
VALID_CONFIG_100 = {CONF_URL: URL, CONF_MAX_ENTRIES: 100}
VALID_CONFIG_5 = {CONF_URL: URL, CONF_MAX_ENTRIES: 5}
VALID_CONFIG_1 = {CONF_URL: URL, CONF_MAX_ENTRIES: 1}

View file

@ -0,0 +1,298 @@
"""The tests for the feedreader config flow."""
from unittest.mock import Mock, patch
import urllib
import pytest
from homeassistant.components.feedreader import CONF_URLS
from homeassistant.components.feedreader.const import (
CONF_MAX_ENTRIES,
DEFAULT_MAX_ENTRIES,
DOMAIN,
)
from homeassistant.config_entries import SOURCE_RECONFIGURE, SOURCE_USER
from homeassistant.const import CONF_URL
from homeassistant.core import DOMAIN as HA_DOMAIN, HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers import issue_registry as ir
from homeassistant.setup import async_setup_component
from . import create_mock_entry
from .const import FEED_TITLE, URL, VALID_CONFIG_DEFAULT
@pytest.fixture(name="feedparser")
def feedparser_fixture(feed_one_event: bytes) -> Mock:
"""Patch libraries."""
with (
patch(
"homeassistant.components.feedreader.config_flow.feedparser.http.get",
return_value=feed_one_event,
) as feedparser,
):
yield feedparser
@pytest.fixture(name="setup_entry")
def setup_entry_fixture(feed_one_event: bytes) -> Mock:
"""Patch libraries."""
with (
patch("homeassistant.components.feedreader.async_setup_entry") as setup_entry,
):
yield setup_entry
async def test_user(hass: HomeAssistant, feedparser, setup_entry) -> None:
"""Test starting a flow by user."""
# init user flow
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# success
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_URL: URL}
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == FEED_TITLE
assert result["data"][CONF_URL] == URL
assert result["options"][CONF_MAX_ENTRIES] == DEFAULT_MAX_ENTRIES
async def test_user_errors(
hass: HomeAssistant, feedparser, setup_entry, feed_one_event
) -> None:
"""Test starting a flow by user which results in an URL error."""
# init user flow
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# raise URLError
feedparser.side_effect = urllib.error.URLError("Test")
feedparser.return_value = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_URL: URL}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": "url_error"}
# no feed entries returned
feedparser.side_effect = None
feedparser.return_value = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_URL: URL}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": "no_feed_entries"}
# success
feedparser.side_effect = None
feedparser.return_value = feed_one_event
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_URL: URL}
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == FEED_TITLE
assert result["data"][CONF_URL] == URL
assert result["options"][CONF_MAX_ENTRIES] == DEFAULT_MAX_ENTRIES
@pytest.mark.parametrize(
("data", "expected_data", "expected_options"),
[
({CONF_URLS: [URL]}, {CONF_URL: URL}, {CONF_MAX_ENTRIES: DEFAULT_MAX_ENTRIES}),
(
{CONF_URLS: [URL], CONF_MAX_ENTRIES: 5},
{CONF_URL: URL},
{CONF_MAX_ENTRIES: 5},
),
],
)
async def test_import(
hass: HomeAssistant,
issue_registry: ir.IssueRegistry,
data,
expected_data,
expected_options,
feedparser,
setup_entry,
) -> None:
"""Test starting an import flow."""
config_entries = hass.config_entries.async_entries(DOMAIN)
assert not config_entries
assert await async_setup_component(hass, DOMAIN, {DOMAIN: data})
config_entries = hass.config_entries.async_entries(DOMAIN)
assert config_entries
assert len(config_entries) == 1
assert config_entries[0].title == FEED_TITLE
assert config_entries[0].data == expected_data
assert config_entries[0].options == expected_options
assert issue_registry.async_get_issue(HA_DOMAIN, "deprecated_yaml_feedreader")
@pytest.mark.parametrize(
("side_effect", "return_value", "expected_issue_id"),
[
(
urllib.error.URLError("Test"),
None,
"import_yaml_error_feedreader_url_error_http_some_rss_local_rss_feed_xml",
),
(
None,
None,
"import_yaml_error_feedreader_no_feed_entries_http_some_rss_local_rss_feed_xml",
),
],
)
async def test_import_errors(
hass: HomeAssistant,
issue_registry: ir.IssueRegistry,
feedparser,
setup_entry,
feed_one_event,
side_effect,
return_value,
expected_issue_id,
) -> None:
"""Test starting an import flow which results in an URL error."""
config_entries = hass.config_entries.async_entries(DOMAIN)
assert not config_entries
# raise URLError
feedparser.side_effect = side_effect
feedparser.return_value = return_value
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {CONF_URLS: [URL]}})
assert issue_registry.async_get_issue(DOMAIN, expected_issue_id)
async def test_reconfigure(hass: HomeAssistant, feedparser) -> None:
"""Test starting a reconfigure flow."""
entry = create_mock_entry(VALID_CONFIG_DEFAULT)
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
# init user flow
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={
"source": SOURCE_RECONFIGURE,
"entry_id": entry.entry_id,
},
data=entry.data,
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reconfigure_confirm"
# success
with patch(
"homeassistant.config_entries.ConfigEntries.async_reload"
) as mock_async_reload:
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={
CONF_URL: "http://other.rss.local/rss_feed.xml",
},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reconfigure_successful"
assert entry.data == {
CONF_URL: "http://other.rss.local/rss_feed.xml",
}
await hass.async_block_till_done()
assert mock_async_reload.call_count == 1
async def test_reconfigure_errors(
hass: HomeAssistant, feedparser, setup_entry, feed_one_event
) -> None:
"""Test starting a reconfigure flow by user which results in an URL error."""
entry = create_mock_entry(VALID_CONFIG_DEFAULT)
entry.add_to_hass(hass)
# init user flow
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={
"source": SOURCE_RECONFIGURE,
"entry_id": entry.entry_id,
},
data=entry.data,
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reconfigure_confirm"
# raise URLError
feedparser.side_effect = urllib.error.URLError("Test")
feedparser.return_value = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={
CONF_URL: "http://other.rss.local/rss_feed.xml",
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reconfigure_confirm"
assert result["errors"] == {"base": "url_error"}
# no feed entries returned
feedparser.side_effect = None
feedparser.return_value = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={
CONF_URL: "http://other.rss.local/rss_feed.xml",
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reconfigure_confirm"
assert result["errors"] == {"base": "no_feed_entries"}
# success
feedparser.side_effect = None
feedparser.return_value = feed_one_event
# success
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={
CONF_URL: "http://other.rss.local/rss_feed.xml",
},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reconfigure_successful"
assert entry.data == {
CONF_URL: "http://other.rss.local/rss_feed.xml",
}
async def test_options_flow(hass: HomeAssistant) -> None:
"""Test options flow."""
entry = create_mock_entry(VALID_CONFIG_DEFAULT)
entry.add_to_hass(hass)
result = await hass.config_entries.options.async_init(entry.entry_id)
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_MAX_ENTRIES: 10,
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"] == {
CONF_MAX_ENTRIES: 10,
}

View file

@ -4,90 +4,38 @@ from datetime import datetime, timedelta
from time import gmtime
from typing import Any
from unittest.mock import patch
import urllib
import urllib.error
from freezegun.api import FrozenDateTimeFactory
import pytest
from homeassistant.components.feedreader import CONF_MAX_ENTRIES, CONF_URLS
from homeassistant.components.feedreader.const import DOMAIN
from homeassistant.components.feedreader.coordinator import EVENT_FEEDREADER
from homeassistant.const import CONF_SCAN_INTERVAL, EVENT_HOMEASSISTANT_START
from homeassistant.core import Event, HomeAssistant
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.common import async_capture_events, async_fire_time_changed, load_fixture
from . import async_setup_config_entry, create_mock_entry
from .const import (
URL,
VALID_CONFIG_1,
VALID_CONFIG_5,
VALID_CONFIG_100,
VALID_CONFIG_DEFAULT,
)
URL = "http://some.rss.local/rss_feed.xml"
VALID_CONFIG_1 = {DOMAIN: {CONF_URLS: [URL]}}
VALID_CONFIG_2 = {DOMAIN: {CONF_URLS: [URL], CONF_SCAN_INTERVAL: 60}}
VALID_CONFIG_3 = {DOMAIN: {CONF_URLS: [URL], CONF_MAX_ENTRIES: 100}}
VALID_CONFIG_4 = {DOMAIN: {CONF_URLS: [URL], CONF_MAX_ENTRIES: 5}}
VALID_CONFIG_5 = {DOMAIN: {CONF_URLS: [URL], CONF_MAX_ENTRIES: 1}}
from tests.common import async_fire_time_changed
def load_fixture_bytes(src: str) -> bytes:
"""Return byte stream of fixture."""
feed_data = load_fixture(src, DOMAIN)
return bytes(feed_data, "utf-8")
@pytest.fixture(name="feed_one_event")
def fixture_feed_one_event(hass: HomeAssistant) -> bytes:
"""Load test feed data for one event."""
return load_fixture_bytes("feedreader.xml")
@pytest.fixture(name="feed_two_event")
def fixture_feed_two_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for two event."""
return load_fixture_bytes("feedreader1.xml")
@pytest.fixture(name="feed_21_events")
def fixture_feed_21_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for twenty one events."""
return load_fixture_bytes("feedreader2.xml")
@pytest.fixture(name="feed_three_events")
def fixture_feed_three_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for three events."""
return load_fixture_bytes("feedreader3.xml")
@pytest.fixture(name="feed_atom_event")
def fixture_feed_atom_event(hass: HomeAssistant) -> bytes:
"""Load test feed data for atom event."""
return load_fixture_bytes("feedreader5.xml")
@pytest.fixture(name="feed_identically_timed_events")
def fixture_feed_identically_timed_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for two events published at the exact same time."""
return load_fixture_bytes("feedreader6.xml")
@pytest.fixture(name="events")
async def fixture_events(hass: HomeAssistant) -> list[Event]:
"""Fixture that catches alexa events."""
return async_capture_events(hass, EVENT_FEEDREADER)
async def test_setup_one_feed(hass: HomeAssistant) -> None:
"""Test the general setup of this component."""
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_1)
async def test_setup_no_feeds(hass: HomeAssistant) -> None:
"""Test config with no urls."""
assert not await async_setup_component(hass, DOMAIN, {DOMAIN: {CONF_URLS: []}})
async def test_storage_data_loading(
@pytest.mark.parametrize(
"config",
[VALID_CONFIG_DEFAULT, VALID_CONFIG_1, VALID_CONFIG_100, VALID_CONFIG_5],
)
async def test_setup(
hass: HomeAssistant,
events: list[Event],
feed_one_event: bytes,
hass_storage: dict[str, Any],
config: dict[str, Any],
) -> None:
"""Test loading existing storage data."""
storage_data: dict[str, str] = {URL: "2018-04-30T05:10:00+00:00"}
@ -97,15 +45,7 @@ async def test_storage_data_loading(
"key": DOMAIN,
"data": storage_data,
}
with patch(
"feedparser.http.get",
return_value=feed_one_event,
):
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_2)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
assert await async_setup_config_entry(hass, config, return_value=feed_one_event)
# no new events
assert not events
@ -121,16 +61,11 @@ async def test_storage_data_writing(
storage_data: dict[str, str] = {URL: "2018-04-30T05:10:00+00:00"}
with (
patch(
"feedparser.http.get",
return_value=feed_one_event,
),
patch("homeassistant.components.feedreader.coordinator.DELAY_SAVE", new=0),
):
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_2)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
assert await async_setup_config_entry(
hass, VALID_CONFIG_DEFAULT, return_value=feed_one_event
)
# one new event
assert len(events) == 1
@ -139,22 +74,11 @@ async def test_storage_data_writing(
assert hass_storage[DOMAIN]["data"] == storage_data
async def test_setup_max_entries(hass: HomeAssistant) -> None:
"""Test the setup of this component with max entries."""
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_3)
await hass.async_block_till_done()
async def test_feed(hass: HomeAssistant, events, feed_one_event) -> None:
"""Test simple rss feed with valid data."""
with patch(
"feedparser.http.get",
return_value=feed_one_event,
):
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_2)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
assert await async_setup_config_entry(
hass, VALID_CONFIG_DEFAULT, return_value=feed_one_event
)
assert len(events) == 1
assert events[0].data.title == "Title 1"
@ -170,14 +94,9 @@ async def test_feed(hass: HomeAssistant, events, feed_one_event) -> None:
async def test_atom_feed(hass: HomeAssistant, events, feed_atom_event) -> None:
"""Test simple atom feed with valid data."""
with patch(
"feedparser.http.get",
return_value=feed_atom_event,
):
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_5)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
assert await async_setup_config_entry(
hass, VALID_CONFIG_DEFAULT, return_value=feed_atom_event
)
assert len(events) == 1
assert events[0].data.title == "Atom-Powered Robots Run Amok"
@ -196,10 +115,6 @@ async def test_feed_identical_timestamps(
) -> None:
"""Test feed with 2 entries with identical timestamps."""
with (
patch(
"feedparser.http.get",
return_value=feed_identically_timed_events,
),
patch(
"homeassistant.components.feedreader.coordinator.StoredData.get_timestamp",
return_value=gmtime(
@ -207,10 +122,9 @@ async def test_feed_identical_timestamps(
),
),
):
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_2)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
assert await async_setup_config_entry(
hass, VALID_CONFIG_DEFAULT, return_value=feed_identically_timed_events
)
assert len(events) == 2
assert events[0].data.title == "Title 1"
@ -261,11 +175,13 @@ async def test_feed_updates(
feed_two_event,
]
entry = create_mock_entry(VALID_CONFIG_DEFAULT)
entry.add_to_hass(hass)
with patch(
"homeassistant.components.feedreader.coordinator.feedparser.http.get",
side_effect=side_effect,
):
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_2)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert len(events) == 1
@ -289,22 +205,20 @@ async def test_feed_default_max_length(
hass: HomeAssistant, events, feed_21_events
) -> None:
"""Test long feed beyond the default 20 entry limit."""
with patch("feedparser.http.get", return_value=feed_21_events):
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_2)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
assert await async_setup_config_entry(
hass, VALID_CONFIG_DEFAULT, return_value=feed_21_events
)
await hass.async_block_till_done()
assert len(events) == 20
async def test_feed_max_length(hass: HomeAssistant, events, feed_21_events) -> None:
"""Test long feed beyond a configured 5 entry limit."""
with patch("feedparser.http.get", return_value=feed_21_events):
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_4)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
assert await async_setup_config_entry(
hass, VALID_CONFIG_5, return_value=feed_21_events
)
await hass.async_block_till_done()
assert len(events) == 5
@ -313,53 +227,104 @@ async def test_feed_without_publication_date_and_title(
hass: HomeAssistant, events, feed_three_events
) -> None:
"""Test simple feed with entry without publication date and title."""
with patch("feedparser.http.get", return_value=feed_three_events):
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_2)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
assert await async_setup_config_entry(
hass, VALID_CONFIG_DEFAULT, return_value=feed_three_events
)
await hass.async_block_till_done()
assert len(events) == 3
async def test_feed_with_unrecognized_publication_date(
hass: HomeAssistant, events
hass: HomeAssistant, events, feed_four_events
) -> None:
"""Test simple feed with entry with unrecognized publication date."""
with patch(
"feedparser.http.get", return_value=load_fixture_bytes("feedreader4.xml")
):
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_2)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
assert await async_setup_config_entry(
hass, VALID_CONFIG_DEFAULT, return_value=feed_four_events
)
await hass.async_block_till_done()
assert len(events) == 1
async def test_feed_invalid_data(hass: HomeAssistant, events) -> None:
"""Test feed with invalid data."""
invalid_data = bytes("INVALID DATA", "utf-8")
with patch("feedparser.http.get", return_value=invalid_data):
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_2)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
assert await async_setup_config_entry(
hass, VALID_CONFIG_DEFAULT, return_value=bytes("INVALID DATA", "utf-8")
)
await hass.async_block_till_done()
assert len(events) == 0
async def test_feed_parsing_failed(
hass: HomeAssistant, events, caplog: pytest.LogCaptureFixture
hass: HomeAssistant, events, feed_one_event, caplog: pytest.LogCaptureFixture
) -> None:
"""Test feed where parsing fails."""
assert "Error fetching feed data" not in caplog.text
with patch("feedparser.parse", return_value=None):
assert await async_setup_component(hass, DOMAIN, VALID_CONFIG_2)
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
assert not await async_setup_config_entry(
hass, VALID_CONFIG_DEFAULT, return_value=feed_one_event
)
await hass.async_block_till_done()
assert "Error fetching feed data" in caplog.text
assert not events
async def test_feed_errors(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
caplog: pytest.LogCaptureFixture,
feed_one_event,
) -> None:
"""Test feed errors."""
entry = create_mock_entry(VALID_CONFIG_DEFAULT)
entry.add_to_hass(hass)
with patch(
"homeassistant.components.feedreader.coordinator.feedparser.http.get"
) as feedreader:
# success setup
feedreader.return_value = feed_one_event
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
# raise URL error
feedreader.side_effect = urllib.error.URLError("Test")
freezer.tick(timedelta(hours=1, seconds=1))
async_fire_time_changed(hass)
await hass.async_block_till_done(wait_background_tasks=True)
assert (
"Error fetching feed data from http://some.rss.local/rss_feed.xml: <urlopen error Test>"
in caplog.text
)
# success
feedreader.side_effect = None
feedreader.return_value = feed_one_event
freezer.tick(timedelta(hours=1, seconds=1))
async_fire_time_changed(hass)
await hass.async_block_till_done(wait_background_tasks=True)
caplog.clear()
# no feed returned
freezer.tick(timedelta(hours=1, seconds=1))
with patch(
"homeassistant.components.feedreader.coordinator.feedparser.parse",
return_value=None,
):
async_fire_time_changed(hass)
await hass.async_block_till_done(wait_background_tasks=True)
assert (
"Error fetching feed data from http://some.rss.local/rss_feed.xml"
in caplog.text
)
caplog.clear()
# success
feedreader.side_effect = None
feedreader.return_value = feed_one_event
freezer.tick(timedelta(hours=1, seconds=1))
async_fire_time_changed(hass)
await hass.async_block_till_done(wait_background_tasks=True)