Correct zamg config flow comments (#81369)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
fixes undefined
This commit is contained in:
Daniel Gangl 2022-12-20 11:47:32 +01:00 committed by GitHub
parent fd9124279b
commit 8ca92254b6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 451 additions and 112 deletions

View file

@ -1585,8 +1585,6 @@ omit =
homeassistant/components/youless/const.py homeassistant/components/youless/const.py
homeassistant/components/youless/sensor.py homeassistant/components/youless/sensor.py
homeassistant/components/zabbix/* homeassistant/components/zabbix/*
homeassistant/components/zamg/__init__.py
homeassistant/components/zamg/const.py
homeassistant/components/zamg/coordinator.py homeassistant/components/zamg/coordinator.py
homeassistant/components/zamg/sensor.py homeassistant/components/zamg/sensor.py
homeassistant/components/zamg/weather.py homeassistant/components/zamg/weather.py

View file

@ -3,16 +3,19 @@ from __future__ import annotations
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform from homeassistant.const import Platform
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
from .const import CONF_STATION_ID, DOMAIN from .const import CONF_STATION_ID, DOMAIN, LOGGER
from .coordinator import ZamgDataUpdateCoordinator from .coordinator import ZamgDataUpdateCoordinator
PLATFORMS = (Platform.WEATHER, Platform.SENSOR) PLATFORMS = (Platform.SENSOR, Platform.WEATHER)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Zamg from config entry.""" """Set up Zamg from config entry."""
await _async_migrate_entries(hass, entry)
coordinator = ZamgDataUpdateCoordinator(hass, entry=entry) coordinator = ZamgDataUpdateCoordinator(hass, entry=entry)
station_id = entry.data[CONF_STATION_ID] station_id = entry.data[CONF_STATION_ID]
coordinator.zamg.set_default_station(station_id) coordinator.zamg.set_default_station(station_id)
@ -31,3 +34,45 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS): if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
hass.data[DOMAIN].pop(entry.entry_id) hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok return unload_ok
async def _async_migrate_entries(
hass: HomeAssistant, config_entry: ConfigEntry
) -> bool:
"""Migrate old entry."""
entity_registry = er.async_get(hass)
@callback
def update_unique_id(entry: er.RegistryEntry) -> dict[str, str] | None:
"""Convert the unique_id from 'name_stationid' to 'station_id'.
Example: 'WIEN/HOHE WARTE_11035' --> '11035'.
"""
if (
entry.domain == Platform.WEATHER
and entry.unique_id != config_entry.data[CONF_STATION_ID]
):
new_unique_id = config_entry.data[CONF_STATION_ID]
LOGGER.debug(
"Migrating entity '%s' unique_id from '%s' to '%s'",
entry.entity_id,
entry.unique_id,
new_unique_id,
)
if existing_entity_id := entity_registry.async_get_entity_id(
entry.domain, entry.platform, new_unique_id
):
LOGGER.debug(
"Cannot migrate to unique_id '%s', already exists for '%s'",
new_unique_id,
existing_entity_id,
)
return None
return {
"new_unique_id": new_unique_id,
}
return None
await er.async_migrate_entries(hass, config_entry.entry_id, update_unique_id)
return True

View file

@ -5,9 +5,10 @@ from typing import Any
import voluptuous as vol import voluptuous as vol
from zamg import ZamgData from zamg import ZamgData
from zamg.exceptions import ZamgApiError, ZamgNoDataError, ZamgStationNotFoundError
from homeassistant import config_entries from homeassistant import config_entries
from homeassistant.const import CONF_LATITUDE, CONF_LONGITUDE, CONF_NAME from homeassistant.const import CONF_LATITUDE, CONF_LONGITUDE
from homeassistant.data_entry_flow import FlowResult from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
@ -26,28 +27,28 @@ class ZamgConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self, user_input: dict[str, Any] | None = None self, user_input: dict[str, Any] | None = None
) -> FlowResult: ) -> FlowResult:
"""Handle a flow initiated by the user.""" """Handle a flow initiated by the user."""
errors: dict[str, Any] = {}
if self._client is None: if self._client is None:
self._client = ZamgData() self._client = ZamgData()
self._client.session = async_get_clientsession(self.hass) self._client.session = async_get_clientsession(self.hass)
if user_input is None: if user_input is None:
closest_station_id = await self._client.closest_station( try:
self.hass.config.latitude, stations = await self._client.zamg_stations()
self.hass.config.longitude, closest_station_id = await self._client.closest_station(
) self.hass.config.latitude,
LOGGER.debug("config_flow: closest station = %s", str(closest_station_id)) self.hass.config.longitude,
stations = await self._client.zamg_stations() )
except (ZamgApiError, ZamgNoDataError) as err:
LOGGER.error("Config_flow: Received error from ZAMG: %s", err)
return self.async_abort(reason="cannot_connect")
LOGGER.debug("config_flow: closest station = %s", closest_station_id)
user_input = {} user_input = {}
schema = vol.Schema( schema = vol.Schema(
{ {
vol.Required( vol.Required(CONF_STATION_ID, default=closest_station_id): vol.In(
CONF_STATION_ID, default=int(closest_station_id)
): vol.In(
{ {
int(station): f"{stations[station][2]} ({station})" station: f"{stations[station][2]} ({station})"
for station in stations for station in stations
} }
) )
@ -55,7 +56,7 @@ class ZamgConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
) )
return self.async_show_form(step_id="user", data_schema=schema) return self.async_show_form(step_id="user", data_schema=schema)
station_id = str(user_input[CONF_STATION_ID]) station_id = user_input[CONF_STATION_ID]
# Check if already configured # Check if already configured
await self.async_set_unique_id(station_id) await self.async_set_unique_id(station_id)
@ -64,22 +65,18 @@ class ZamgConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
try: try:
self._client.set_default_station(station_id) self._client.set_default_station(station_id)
await self._client.update() await self._client.update()
except (ValueError, TypeError) as err: except (ZamgApiError, ZamgNoDataError) as err:
LOGGER.error("Config_flow: Received error from ZAMG: %s", err) LOGGER.error("Config_flow: Received error from ZAMG: %s", err)
errors["base"] = "cannot_connect" return self.async_abort(reason="cannot_connect")
return self.async_abort(
reason="cannot_connect", description_placeholders=errors
)
return self.async_create_entry( return self.async_create_entry(
title=user_input.get(CONF_NAME) or self._client.get_station_name, title=self._client.get_station_name,
data={CONF_STATION_ID: station_id}, data={CONF_STATION_ID: station_id},
) )
async def async_step_import(self, config: dict[str, Any]) -> FlowResult: async def async_step_import(self, config: dict[str, Any]) -> FlowResult:
"""Handle ZAMG configuration import.""" """Handle ZAMG configuration import."""
station_id = str(config.get(CONF_STATION_ID)) station_id = config.get(CONF_STATION_ID)
station_name = config.get(CONF_NAME)
# create issue every time after restart # create issue every time after restart
# parameter is_persistent seems not working # parameter is_persistent seems not working
async_create_issue( async_create_issue(
@ -92,45 +89,37 @@ class ZamgConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
translation_key="deprecated_yaml", translation_key="deprecated_yaml",
) )
for entry in self.hass.config_entries.async_entries(DOMAIN):
if station_id in entry.data[CONF_STATION_ID]:
return self.async_abort(
reason="already_configured",
)
if self._client is None: if self._client is None:
self._client = ZamgData() self._client = ZamgData()
self._client.session = async_get_clientsession(self.hass) self._client.session = async_get_clientsession(self.hass)
if station_id not in await self._client.zamg_stations(): try:
LOGGER.warning( if station_id not in await self._client.zamg_stations():
"Configured station_id %s could not be found at zamg, adding the nearest weather station instead", LOGGER.warning(
"Configured station_id %s could not be found at zamg, trying to add nearest weather station instead",
station_id,
)
latitude = config.get(CONF_LATITUDE) or self.hass.config.latitude
longitude = config.get(CONF_LONGITUDE) or self.hass.config.longitude
station_id = await self._client.closest_station(latitude, longitude)
# Check if already configured
await self.async_set_unique_id(station_id)
self._abort_if_unique_id_configured()
LOGGER.debug(
"importing zamg station from configuration.yaml: station_id = %s",
station_id, station_id,
) )
latitude = config.get(CONF_LATITUDE) or self.hass.config.latitude except (ZamgApiError) as err:
longitude = config.get(CONF_LONGITUDE) or self.hass.config.longitude LOGGER.error("Config_flow import: Received error from ZAMG: %s", err)
station_id = await self._client.closest_station(latitude, longitude) return self.async_abort(reason="cannot_connect")
except (ZamgStationNotFoundError) as err:
if not station_name: LOGGER.error("Config_flow import: Received error from ZAMG: %s", err)
await self._client.zamg_stations() return self.async_abort(reason="station_not_found")
self._client.set_default_station(station_id)
station_name = self._client.get_station_name
for entry in self.hass.config_entries.async_entries(DOMAIN):
if station_id in entry.data[CONF_STATION_ID]:
return self.async_abort(
reason="already_configured",
)
LOGGER.debug(
"importing zamg station from configuration.yaml: station_id = %s, name = %s",
station_id,
station_name,
)
return await self.async_step_user( return await self.async_step_user(
user_input={ user_input={
CONF_STATION_ID: int(station_id), CONF_STATION_ID: station_id,
CONF_NAME: station_name,
} }
) )

View file

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
from zamg import ZamgData as ZamgDevice from zamg import ZamgData as ZamgDevice
from zamg.exceptions import ZamgError, ZamgNoDataError
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
@ -16,6 +17,7 @@ class ZamgDataUpdateCoordinator(DataUpdateCoordinator[ZamgDevice]):
config_entry: ConfigEntry config_entry: ConfigEntry
data: dict = {} data: dict = {}
api_fields: list[str] | None = None
def __init__( def __init__(
self, self,
@ -36,9 +38,13 @@ class ZamgDataUpdateCoordinator(DataUpdateCoordinator[ZamgDevice]):
async def _async_update_data(self) -> ZamgDevice: async def _async_update_data(self) -> ZamgDevice:
"""Fetch data from ZAMG api.""" """Fetch data from ZAMG api."""
try: try:
await self.zamg.zamg_stations() if self.api_fields:
self.zamg.set_parameters(self.api_fields)
self.zamg.request_timeout = 60.0
device = await self.zamg.update() device = await self.zamg.update()
except ValueError as error: except ZamgNoDataError as error:
raise UpdateFailed("No response from API") from error
except (ZamgError) as error:
raise UpdateFailed(f"Invalid response from API: {error}") from error raise UpdateFailed(f"Invalid response from API: {error}") from error
self.data = device self.data = device
self.data["last_update"] = self.zamg.last_update self.data["last_update"] = self.zamg.last_update

View file

@ -2,7 +2,7 @@
"domain": "zamg", "domain": "zamg",
"name": "Zentralanstalt f\u00fcr Meteorologie und Geodynamik (ZAMG)", "name": "Zentralanstalt f\u00fcr Meteorologie und Geodynamik (ZAMG)",
"documentation": "https://www.home-assistant.io/integrations/zamg", "documentation": "https://www.home-assistant.io/integrations/zamg",
"requirements": ["zamg==0.1.1"], "requirements": ["zamg==0.2.1"],
"codeowners": ["@killer0071234"], "codeowners": ["@killer0071234"],
"config_flow": true, "config_flow": true,
"iot_class": "cloud_polling" "iot_class": "cloud_polling"

View file

@ -15,7 +15,6 @@ from homeassistant.components.sensor import (
) )
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import ( from homeassistant.const import (
ATTR_ATTRIBUTION,
CONF_LATITUDE, CONF_LATITUDE,
CONF_LONGITUDE, CONF_LONGITUDE,
CONF_MONITORED_CONDITIONS, CONF_MONITORED_CONDITIONS,
@ -45,6 +44,7 @@ from .const import (
DOMAIN, DOMAIN,
MANUFACTURER_URL, MANUFACTURER_URL,
) )
from .coordinator import ZamgDataUpdateCoordinator
_DType = Union[type[int], type[float], type[str]] _DType = Union[type[int], type[float], type[str]]
@ -190,9 +190,7 @@ SENSOR_TYPES: tuple[ZamgSensorEntityDescription, ...] = (
SENSOR_KEYS: list[str] = [desc.key for desc in SENSOR_TYPES] SENSOR_KEYS: list[str] = [desc.key for desc in SENSOR_TYPES]
API_FIELDS: dict[str, tuple[str, _DType]] = { API_FIELDS: list[str] = [desc.para_name for desc in SENSOR_TYPES]
desc.para_name: (desc.key, desc.dtype) for desc in SENSOR_TYPES
}
PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend( PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend(
{ {
@ -219,10 +217,12 @@ async def async_setup_platform(
) -> None: ) -> None:
"""Set up the ZAMG sensor platform.""" """Set up the ZAMG sensor platform."""
# trigger import flow # trigger import flow
await hass.config_entries.flow.async_init( hass.async_create_task(
DOMAIN, hass.config_entries.flow.async_init(
context={"source": SOURCE_IMPORT}, DOMAIN,
data=config, context={"source": SOURCE_IMPORT},
data=config,
)
) )
@ -245,8 +245,12 @@ class ZamgSensor(CoordinatorEntity, SensorEntity):
entity_description: ZamgSensorEntityDescription entity_description: ZamgSensorEntityDescription
def __init__( def __init__(
self, coordinator, name, station_id, description: ZamgSensorEntityDescription self,
): coordinator: ZamgDataUpdateCoordinator,
name: str,
station_id: str,
description: ZamgSensorEntityDescription,
) -> None:
"""Initialize the sensor.""" """Initialize the sensor."""
super().__init__(coordinator) super().__init__(coordinator)
self.entity_description = description self.entity_description = description
@ -260,21 +264,24 @@ class ZamgSensor(CoordinatorEntity, SensorEntity):
configuration_url=MANUFACTURER_URL, configuration_url=MANUFACTURER_URL,
name=coordinator.name, name=coordinator.name,
) )
coordinator.api_fields = API_FIELDS
@property @property
def native_value(self) -> StateType: def native_value(self) -> StateType:
"""Return the state of the sensor.""" """Return the state of the sensor."""
return self.coordinator.data[self.station_id].get( try:
self.entity_description.para_name return self.coordinator.data[self.station_id][
)["data"] self.entity_description.para_name
]["data"]
except (KeyError):
return None
@property @property
def extra_state_attributes(self) -> Mapping[str, str]: def extra_state_attributes(self) -> Mapping[str, str]:
"""Return the state attributes.""" """Return the state attributes."""
update_time = self.coordinator.data.get("last_update", "") if (update_time := self.coordinator.data["last_update"]) is not None:
update_time = update_time.isoformat()
return { return {
ATTR_ATTRIBUTION: ATTRIBUTION, ATTR_STATION: self.coordinator.data["Name"],
ATTR_STATION: self.coordinator.data.get("Name"), ATTR_UPDATED: update_time,
CONF_STATION_ID: self.station_id,
ATTR_UPDATED: update_time.isoformat(),
} }

View file

@ -10,11 +10,13 @@
} }
}, },
"error": { "error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]" "cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"station_not_found": "Station ID not found at zamg"
}, },
"abort": { "abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]", "already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]" "cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"station_not_found": "Station ID not found at zamg"
} }
}, },
"issues": { "issues": {

View file

@ -47,10 +47,12 @@ async def async_setup_platform(
) -> None: ) -> None:
"""Set up the ZAMG weather platform.""" """Set up the ZAMG weather platform."""
# trigger import flow # trigger import flow
await hass.config_entries.flow.async_init( hass.async_create_task(
DOMAIN, hass.config_entries.flow.async_init(
context={"source": SOURCE_IMPORT}, DOMAIN,
data=config, context={"source": SOURCE_IMPORT},
data=config,
)
) )
@ -67,12 +69,14 @@ async def async_setup_entry(
class ZamgWeather(CoordinatorEntity, WeatherEntity): class ZamgWeather(CoordinatorEntity, WeatherEntity):
"""Representation of a weather condition.""" """Representation of a weather condition."""
_attr_attribution = ATTRIBUTION
def __init__( def __init__(
self, coordinator: ZamgDataUpdateCoordinator, name, station_id self, coordinator: ZamgDataUpdateCoordinator, name: str, station_id: str
) -> None: ) -> None:
"""Initialise the platform with a data instance and station name.""" """Initialise the platform with a data instance and station name."""
super().__init__(coordinator) super().__init__(coordinator)
self._attr_unique_id = f"{name}_{station_id}" self._attr_unique_id = station_id
self._attr_name = f"ZAMG {name}" self._attr_name = f"ZAMG {name}"
self.station_id = f"{station_id}" self.station_id = f"{station_id}"
self._attr_device_info = DeviceInfo( self._attr_device_info = DeviceInfo(
@ -93,47 +97,42 @@ class ZamgWeather(CoordinatorEntity, WeatherEntity):
"""Return the current condition.""" """Return the current condition."""
return None return None
@property
def attribution(self) -> str | None:
"""Return the attribution."""
return ATTRIBUTION
@property @property
def native_temperature(self) -> float | None: def native_temperature(self) -> float | None:
"""Return the platform temperature.""" """Return the platform temperature."""
try: try:
return float(self.coordinator.data[self.station_id].get("TL")["data"]) return float(self.coordinator.data[self.station_id]["TL"]["data"])
except (TypeError, ValueError): except (KeyError, ValueError):
return None return None
@property @property
def native_pressure(self) -> float | None: def native_pressure(self) -> float | None:
"""Return the pressure.""" """Return the pressure."""
try: try:
return float(self.coordinator.data[self.station_id].get("P")["data"]) return float(self.coordinator.data[self.station_id]["P"]["data"])
except (TypeError, ValueError): except (KeyError, ValueError):
return None return None
@property @property
def humidity(self) -> float | None: def humidity(self) -> float | None:
"""Return the humidity.""" """Return the humidity."""
try: try:
return float(self.coordinator.data[self.station_id].get("RFAM")["data"]) return float(self.coordinator.data[self.station_id]["RFAM"]["data"])
except (TypeError, ValueError): except (KeyError, ValueError):
return None return None
@property @property
def native_wind_speed(self) -> float | None: def native_wind_speed(self) -> float | None:
"""Return the wind speed.""" """Return the wind speed."""
try: try:
return float(self.coordinator.data[self.station_id].get("FF")["data"]) return float(self.coordinator.data[self.station_id]["FFAM"]["data"])
except (TypeError, ValueError): except (KeyError, ValueError):
return None return None
@property @property
def wind_bearing(self) -> float | str | None: def wind_bearing(self) -> float | str | None:
"""Return the wind bearing.""" """Return the wind bearing."""
try: try:
return self.coordinator.data[self.station_id].get("DD")["data"] return self.coordinator.data[self.station_id]["DD"]["data"]
except (TypeError, ValueError): except (KeyError, ValueError):
return None return None

View file

@ -2635,7 +2635,7 @@ youless-api==0.16
youtube_dl==2021.12.17 youtube_dl==2021.12.17
# homeassistant.components.zamg # homeassistant.components.zamg
zamg==0.1.1 zamg==0.2.1
# homeassistant.components.zengge # homeassistant.components.zengge
zengge==0.2 zengge==0.2

View file

@ -1842,7 +1842,7 @@ yolink-api==0.1.5
youless-api==0.16 youless-api==0.16
# homeassistant.components.zamg # homeassistant.components.zamg
zamg==0.1.1 zamg==0.2.1
# homeassistant.components.zeroconf # homeassistant.components.zeroconf
zeroconf==0.44.0 zeroconf==0.44.0

View file

@ -1 +1,18 @@
"""Tests for the ZAMG component.""" """Tests for the ZAMG component."""
from homeassistant import config_entries
from homeassistant.components.zamg.const import CONF_STATION_ID, DOMAIN as ZAMG_DOMAIN
from .conftest import TEST_STATION_ID, TEST_STATION_NAME
FIXTURE_CONFIG_ENTRY = {
"entry_id": "1",
"domain": ZAMG_DOMAIN,
"title": TEST_STATION_NAME,
"data": {
CONF_STATION_ID: TEST_STATION_ID,
},
"options": None,
"source": config_entries.SOURCE_USER,
"unique_id": TEST_STATION_ID,
}

View file

@ -14,6 +14,9 @@ from tests.common import MockConfigEntry, load_fixture
TEST_STATION_ID = "11240" TEST_STATION_ID = "11240"
TEST_STATION_NAME = "Graz/Flughafen" TEST_STATION_NAME = "Graz/Flughafen"
TEST_STATION_ID_2 = "11035"
TEST_STATION_NAME_2 = "WIEN/HOHE WARTE"
@pytest.fixture @pytest.fixture
def mock_config_entry() -> MockConfigEntry: def mock_config_entry() -> MockConfigEntry:
@ -67,6 +70,27 @@ def mock_zamg(request: pytest.FixtureRequest) -> Generator[None, MagicMock, None
yield zamg yield zamg
@pytest.fixture
def mock_zamg_coordinator(
request: pytest.FixtureRequest,
) -> Generator[None, MagicMock, None]:
"""Return a mocked Zamg client."""
with patch(
"homeassistant.components.zamg.coordinator.ZamgDevice", autospec=True
) as zamg_mock:
zamg = zamg_mock.return_value
zamg.update.return_value = {TEST_STATION_ID: {"Name": TEST_STATION_NAME}}
zamg.zamg_stations.return_value = {
TEST_STATION_ID: (46.99305556, 15.43916667, TEST_STATION_NAME),
"11244": (46.8722229, 15.90361118, "BAD GLEICHENBERG"),
}
zamg.closest_station.return_value = TEST_STATION_ID
zamg.get_data.return_value = TEST_STATION_ID
zamg.get_station_name = TEST_STATION_NAME
yield zamg
@pytest.fixture @pytest.fixture
def mock_zamg_stations( def mock_zamg_stations(
request: pytest.FixtureRequest, request: pytest.FixtureRequest,

View file

@ -1,6 +1,8 @@
"""Tests for the Zamg config flow.""" """Tests for the Zamg config flow."""
from unittest.mock import MagicMock from unittest.mock import MagicMock
from zamg.exceptions import ZamgApiError, ZamgStationNotFoundError
from homeassistant.components.zamg.const import CONF_STATION_ID, DOMAIN, LOGGER from homeassistant.components.zamg.const import CONF_STATION_ID, DOMAIN, LOGGER
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER
from homeassistant.const import CONF_NAME from homeassistant.const import CONF_NAME
@ -27,7 +29,7 @@ async def test_full_user_flow_implementation(
assert "flow_id" in result assert "flow_id" in result
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], result["flow_id"],
user_input={CONF_STATION_ID: int(TEST_STATION_ID)}, user_input={CONF_STATION_ID: TEST_STATION_ID},
) )
assert result.get("type") == FlowResultType.CREATE_ENTRY assert result.get("type") == FlowResultType.CREATE_ENTRY
assert "data" in result assert "data" in result
@ -36,6 +38,21 @@ async def test_full_user_flow_implementation(
assert result["result"].unique_id == TEST_STATION_ID assert result["result"].unique_id == TEST_STATION_ID
async def test_error_closest_station(
hass: HomeAssistant,
mock_zamg: MagicMock,
mock_setup_entry: None,
) -> None:
"""Test with error of reading from Zamg."""
mock_zamg.closest_station.side_effect = ZamgApiError
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "cannot_connect"
async def test_error_update( async def test_error_update(
hass: HomeAssistant, hass: HomeAssistant,
mock_zamg: MagicMock, mock_zamg: MagicMock,
@ -50,11 +67,11 @@ async def test_error_update(
assert result.get("type") == FlowResultType.FORM assert result.get("type") == FlowResultType.FORM
LOGGER.debug(result) LOGGER.debug(result)
assert result.get("data_schema") != "" assert result.get("data_schema") != ""
mock_zamg.update.side_effect = ValueError mock_zamg.update.side_effect = ZamgApiError
assert "flow_id" in result assert "flow_id" in result
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], result["flow_id"],
user_input={CONF_STATION_ID: int(TEST_STATION_ID)}, user_input={CONF_STATION_ID: TEST_STATION_ID},
) )
assert result.get("type") == FlowResultType.ABORT assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "cannot_connect" assert result.get("reason") == "cannot_connect"
@ -91,7 +108,7 @@ async def test_user_flow_duplicate(
assert "flow_id" in result assert "flow_id" in result
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], result["flow_id"],
user_input={CONF_STATION_ID: int(TEST_STATION_ID)}, user_input={CONF_STATION_ID: TEST_STATION_ID},
) )
assert result.get("type") == FlowResultType.CREATE_ENTRY assert result.get("type") == FlowResultType.CREATE_ENTRY
assert "data" in result assert "data" in result
@ -107,7 +124,7 @@ async def test_user_flow_duplicate(
assert result.get("type") == FlowResultType.FORM assert result.get("type") == FlowResultType.FORM
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], result["flow_id"],
user_input={CONF_STATION_ID: int(TEST_STATION_ID)}, user_input={CONF_STATION_ID: TEST_STATION_ID},
) )
assert result.get("type") == FlowResultType.ABORT assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "already_configured" assert result.get("reason") == "already_configured"
@ -129,7 +146,7 @@ async def test_import_flow_duplicate(
assert "flow_id" in result assert "flow_id" in result
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], result["flow_id"],
user_input={CONF_STATION_ID: int(TEST_STATION_ID)}, user_input={CONF_STATION_ID: TEST_STATION_ID},
) )
assert result.get("type") == FlowResultType.CREATE_ENTRY assert result.get("type") == FlowResultType.CREATE_ENTRY
assert "data" in result assert "data" in result
@ -162,7 +179,7 @@ async def test_import_flow_duplicate_after_position(
assert "flow_id" in result assert "flow_id" in result
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], result["flow_id"],
user_input={CONF_STATION_ID: int(TEST_STATION_ID)}, user_input={CONF_STATION_ID: TEST_STATION_ID},
) )
assert result.get("type") == FlowResultType.CREATE_ENTRY assert result.get("type") == FlowResultType.CREATE_ENTRY
assert "data" in result assert "data" in result
@ -184,7 +201,7 @@ async def test_import_flow_no_name(
mock_zamg: MagicMock, mock_zamg: MagicMock,
mock_setup_entry: None, mock_setup_entry: None,
) -> None: ) -> None:
"""Test the full import flow from start to finish.""" """Test import flow without any name."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_IMPORT}, context={"source": SOURCE_IMPORT},
@ -192,3 +209,35 @@ async def test_import_flow_no_name(
) )
assert result.get("type") == FlowResultType.CREATE_ENTRY assert result.get("type") == FlowResultType.CREATE_ENTRY
assert result.get("data") == {CONF_STATION_ID: TEST_STATION_ID} assert result.get("data") == {CONF_STATION_ID: TEST_STATION_ID}
async def test_import_flow_invalid_station(
hass: HomeAssistant,
mock_zamg: MagicMock,
mock_setup_entry: None,
) -> None:
"""Test import flow with invalid station."""
mock_zamg.closest_station.side_effect = ZamgStationNotFoundError
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={CONF_STATION_ID: ""},
)
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "station_not_found"
async def test_import_flow_zamg_error(
hass: HomeAssistant,
mock_zamg: MagicMock,
mock_setup_entry: None,
) -> None:
"""Test import flow with error on getting zamg stations."""
mock_zamg.zamg_stations.side_effect = ZamgApiError
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={CONF_STATION_ID: ""},
)
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "cannot_connect"

View file

@ -0,0 +1,203 @@
"""Test Zamg component init."""
from unittest.mock import MagicMock
import pytest
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.components.weather import DOMAIN as WEATHER_DOMAIN
from homeassistant.components.zamg.const import CONF_STATION_ID, DOMAIN as ZAMG_DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from . import FIXTURE_CONFIG_ENTRY
from .conftest import (
TEST_STATION_ID,
TEST_STATION_ID_2,
TEST_STATION_NAME,
TEST_STATION_NAME_2,
)
from tests.common import MockConfigEntry
@pytest.mark.parametrize(
"entitydata,old_unique_id,new_unique_id,station_id",
[
(
{
"domain": WEATHER_DOMAIN,
"platform": ZAMG_DOMAIN,
"unique_id": f"{TEST_STATION_NAME}_{TEST_STATION_ID}",
"suggested_object_id": f"Zamg {TEST_STATION_NAME}",
"disabled_by": None,
},
f"{TEST_STATION_NAME}_{TEST_STATION_ID}",
TEST_STATION_ID,
TEST_STATION_ID,
),
(
{
"domain": WEATHER_DOMAIN,
"platform": ZAMG_DOMAIN,
"unique_id": f"{TEST_STATION_NAME_2}_{TEST_STATION_ID_2}",
"suggested_object_id": f"Zamg {TEST_STATION_NAME_2}",
"disabled_by": None,
},
f"{TEST_STATION_NAME_2}_{TEST_STATION_ID_2}",
TEST_STATION_ID_2,
TEST_STATION_ID_2,
),
(
{
"domain": SENSOR_DOMAIN,
"platform": ZAMG_DOMAIN,
"unique_id": f"{TEST_STATION_NAME_2}_{TEST_STATION_ID_2}_temperature",
"suggested_object_id": f"Zamg {TEST_STATION_NAME_2}",
"disabled_by": None,
},
f"{TEST_STATION_NAME_2}_{TEST_STATION_ID_2}_temperature",
f"{TEST_STATION_NAME_2}_{TEST_STATION_ID_2}_temperature",
TEST_STATION_ID_2,
),
],
)
async def test_migrate_unique_ids(
hass: HomeAssistant,
mock_zamg_coordinator: MagicMock,
entitydata: dict,
old_unique_id: str,
new_unique_id: str,
station_id: str,
) -> None:
"""Test successful migration of entity unique_ids."""
FIXTURE_CONFIG_ENTRY["data"][CONF_STATION_ID] = station_id
mock_config_entry = MockConfigEntry(**FIXTURE_CONFIG_ENTRY)
mock_config_entry.add_to_hass(hass)
entity_registry = er.async_get(hass)
entity: er.RegistryEntry = entity_registry.async_get_or_create(
**entitydata,
config_entry=mock_config_entry,
)
assert entity.unique_id == old_unique_id
assert await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
entity_migrated = entity_registry.async_get(entity.entity_id)
assert entity_migrated
assert entity_migrated.unique_id == new_unique_id
@pytest.mark.parametrize(
"entitydata,old_unique_id,new_unique_id,station_id",
[
(
{
"domain": WEATHER_DOMAIN,
"platform": ZAMG_DOMAIN,
"unique_id": f"{TEST_STATION_NAME}_{TEST_STATION_ID}",
"suggested_object_id": f"Zamg {TEST_STATION_NAME}",
"disabled_by": None,
},
f"{TEST_STATION_NAME}_{TEST_STATION_ID}",
TEST_STATION_ID,
TEST_STATION_ID,
),
],
)
async def test_dont_migrate_unique_ids(
hass: HomeAssistant,
mock_zamg_coordinator: MagicMock,
entitydata: dict,
old_unique_id: str,
new_unique_id: str,
station_id: str,
) -> None:
"""Test successful migration of entity unique_ids."""
FIXTURE_CONFIG_ENTRY["data"][CONF_STATION_ID] = station_id
mock_config_entry = MockConfigEntry(**FIXTURE_CONFIG_ENTRY)
mock_config_entry.add_to_hass(hass)
entity_registry = er.async_get(hass)
# create existing entry with new_unique_id
existing_entity = entity_registry.async_get_or_create(
WEATHER_DOMAIN,
ZAMG_DOMAIN,
unique_id=TEST_STATION_ID,
suggested_object_id=f"Zamg {TEST_STATION_NAME}",
config_entry=mock_config_entry,
)
entity: er.RegistryEntry = entity_registry.async_get_or_create(
**entitydata,
config_entry=mock_config_entry,
)
assert entity.unique_id == old_unique_id
assert await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
entity_migrated = entity_registry.async_get(entity.entity_id)
assert entity_migrated
assert entity_migrated.unique_id == old_unique_id
entity_not_changed = entity_registry.async_get(existing_entity.entity_id)
assert entity_not_changed
assert entity_not_changed.unique_id == new_unique_id
assert entity_migrated != entity_not_changed
@pytest.mark.parametrize(
"entitydata,unique_id",
[
(
{
"domain": WEATHER_DOMAIN,
"platform": ZAMG_DOMAIN,
"unique_id": TEST_STATION_ID,
"suggested_object_id": f"Zamg {TEST_STATION_NAME}",
"disabled_by": None,
},
TEST_STATION_ID,
),
],
)
async def test_unload_entry(
hass: HomeAssistant,
mock_zamg_coordinator: MagicMock,
entitydata: dict,
unique_id: str,
) -> None:
"""Test unload entity unique_ids."""
mock_config_entry = MockConfigEntry(**FIXTURE_CONFIG_ENTRY)
mock_config_entry.add_to_hass(hass)
entity_registry = er.async_get(hass)
entity_registry.async_get_or_create(
WEATHER_DOMAIN,
ZAMG_DOMAIN,
unique_id=TEST_STATION_ID,
suggested_object_id=f"Zamg {TEST_STATION_NAME}",
config_entry=mock_config_entry,
)
entity: er.RegistryEntry = entity_registry.async_get_or_create(
**entitydata,
config_entry=mock_config_entry,
)
assert entity.unique_id == unique_id
assert await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert await hass.config_entries.async_remove(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert hass.config_entries.async_get_entry(unique_id) is None