Add Nord Pool integration (#129983)

This commit is contained in:
G Johansson 2024-11-08 15:10:51 +01:00 committed by GitHub
parent 074418f8f7
commit 1f32e02ba2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 3628 additions and 0 deletions

View file

@ -340,6 +340,7 @@ homeassistant.components.nfandroidtv.*
homeassistant.components.nightscout.*
homeassistant.components.nissan_leaf.*
homeassistant.components.no_ip.*
homeassistant.components.nordpool.*
homeassistant.components.notify.*
homeassistant.components.notion.*
homeassistant.components.number.*

View file

@ -1012,6 +1012,8 @@ build.json @home-assistant/supervisor
/homeassistant/components/noaa_tides/ @jdelaney72
/homeassistant/components/nobo_hub/ @echoromeo @oyvindwe
/tests/components/nobo_hub/ @echoromeo @oyvindwe
/homeassistant/components/nordpool/ @gjohansson-ST
/tests/components/nordpool/ @gjohansson-ST
/homeassistant/components/notify/ @home-assistant/core
/tests/components/notify/ @home-assistant/core
/homeassistant/components/notify_events/ @matrozov @papajojo

View file

@ -0,0 +1,29 @@
"""The Nord Pool component."""
from __future__ import annotations
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util
from .const import PLATFORMS
from .coordinator import NordPoolDataUpdateCoordinator
type NordPoolConfigEntry = ConfigEntry[NordPoolDataUpdateCoordinator]
async def async_setup_entry(hass: HomeAssistant, entry: NordPoolConfigEntry) -> bool:
"""Set up Nord Pool from a config entry."""
coordinator = NordPoolDataUpdateCoordinator(hass, entry)
await coordinator.fetch_data(dt_util.utcnow())
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: NordPoolConfigEntry) -> bool:
"""Unload Nord Pool config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View file

@ -0,0 +1,92 @@
"""Adds config flow for Nord Pool integration."""
from __future__ import annotations
from typing import Any
from pynordpool import Currency, NordPoolClient, NordPoolError
from pynordpool.const import AREAS
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_CURRENCY
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.selector import (
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
SelectSelectorMode,
)
from homeassistant.util import dt as dt_util
from .const import CONF_AREAS, DEFAULT_NAME, DOMAIN
SELECT_AREAS = [
SelectOptionDict(value=area, label=name) for area, name in AREAS.items()
]
SELECT_CURRENCY = [currency.value for currency in Currency]
DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_AREAS, default=[]): SelectSelector(
SelectSelectorConfig(
options=SELECT_AREAS,
multiple=True,
mode=SelectSelectorMode.DROPDOWN,
sort=True,
)
),
vol.Required(CONF_CURRENCY, default="SEK"): SelectSelector(
SelectSelectorConfig(
options=SELECT_CURRENCY,
multiple=False,
mode=SelectSelectorMode.DROPDOWN,
sort=True,
)
),
}
)
async def test_api(hass: HomeAssistant, user_input: dict[str, Any]) -> dict[str, str]:
"""Test fetch data from Nord Pool."""
client = NordPoolClient(async_get_clientsession(hass))
try:
data = await client.async_get_delivery_period(
dt_util.now(),
Currency(user_input[CONF_CURRENCY]),
user_input[CONF_AREAS],
)
except NordPoolError:
return {"base": "cannot_connect"}
if not data.raw:
return {"base": "no_data"}
return {}
class NordpoolConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Nord Pool integration."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
errors: dict[str, str] = {}
if user_input:
errors = await test_api(self.hass, user_input)
if not errors:
return self.async_create_entry(
title=DEFAULT_NAME,
data=user_input,
)
return self.async_show_form(
step_id="user",
data_schema=DATA_SCHEMA,
errors=errors,
)

View file

@ -0,0 +1,14 @@
"""Constants for Nord Pool."""
import logging
from homeassistant.const import Platform
LOGGER = logging.getLogger(__package__)
DEFAULT_SCAN_INTERVAL = 60
DOMAIN = "nordpool"
PLATFORMS = [Platform.SENSOR]
DEFAULT_NAME = "Nord Pool"
CONF_AREAS = "areas"

View file

@ -0,0 +1,95 @@
"""DataUpdateCoordinator for the Nord Pool integration."""
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from pynordpool import (
Currency,
DeliveryPeriodData,
NordPoolAuthenticationError,
NordPoolClient,
NordPoolError,
NordPoolResponseError,
)
from homeassistant.const import CONF_CURRENCY
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
from .const import CONF_AREAS, DOMAIN, LOGGER
if TYPE_CHECKING:
from . import NordPoolConfigEntry
class NordPoolDataUpdateCoordinator(DataUpdateCoordinator[DeliveryPeriodData]):
"""A Nord Pool Data Update Coordinator."""
config_entry: NordPoolConfigEntry
def __init__(self, hass: HomeAssistant, config_entry: NordPoolConfigEntry) -> None:
"""Initialize the Nord Pool coordinator."""
super().__init__(
hass,
LOGGER,
config_entry=config_entry,
name=DOMAIN,
)
self.client = NordPoolClient(session=async_get_clientsession(hass))
self.unsub: Callable[[], None] | None = None
def get_next_interval(self, now: datetime) -> datetime:
"""Compute next time an update should occur."""
next_hour = dt_util.utcnow() + timedelta(hours=1)
next_run = datetime(
next_hour.year,
next_hour.month,
next_hour.day,
next_hour.hour,
tzinfo=dt_util.UTC,
)
LOGGER.debug("Next update at %s", next_run)
return next_run
async def async_shutdown(self) -> None:
"""Cancel any scheduled call, and ignore new runs."""
await super().async_shutdown()
if self.unsub:
self.unsub()
self.unsub = None
async def fetch_data(self, now: datetime) -> None:
"""Fetch data from Nord Pool."""
self.unsub = async_track_point_in_utc_time(
self.hass, self.fetch_data, self.get_next_interval(dt_util.utcnow())
)
try:
data = await self.client.async_get_delivery_period(
dt_util.now(),
Currency(self.config_entry.data[CONF_CURRENCY]),
self.config_entry.data[CONF_AREAS],
)
except NordPoolAuthenticationError as error:
LOGGER.error("Authentication error: %s", error)
self.async_set_update_error(error)
return
except NordPoolResponseError as error:
LOGGER.debug("Response error: %s", error)
self.async_set_update_error(error)
return
except NordPoolError as error:
LOGGER.debug("Connection error: %s", error)
self.async_set_update_error(error)
return
if not data.raw:
self.async_set_update_error(UpdateFailed("No data"))
return
self.async_set_updated_data(data)

View file

@ -0,0 +1,32 @@
"""Base entity for Nord Pool."""
from __future__ import annotations
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import EntityDescription
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .coordinator import NordPoolDataUpdateCoordinator
class NordpoolBaseEntity(CoordinatorEntity[NordPoolDataUpdateCoordinator]):
"""Representation of a Nord Pool base entity."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: NordPoolDataUpdateCoordinator,
entity_description: EntityDescription,
area: str,
) -> None:
"""Initiate Nord Pool base entity."""
super().__init__(coordinator)
self.entity_description = entity_description
self._attr_unique_id = f"{area}-{entity_description.key}"
self.area = area
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, area)},
name=f"Nord Pool {area}",
)

View file

@ -0,0 +1,42 @@
{
"entity": {
"sensor": {
"updated_at": {
"default": "mdi:clock-outline"
},
"currency": {
"default": "mdi:currency-usd"
},
"exchange_rate": {
"default": "mdi:currency-usd"
},
"current_price": {
"default": "mdi:cash"
},
"last_price": {
"default": "mdi:cash"
},
"next_price": {
"default": "mdi:cash"
},
"block_average": {
"default": "mdi:cash-multiple"
},
"block_min": {
"default": "mdi:cash-multiple"
},
"block_max": {
"default": "mdi:cash-multiple"
},
"block_start_time": {
"default": "mdi:clock-time-twelve-outline"
},
"block_end_time": {
"default": "mdi:clock-time-two-outline"
},
"daily_average": {
"default": "mdi:cash-multiple"
}
}
}
}

View file

@ -0,0 +1,12 @@
{
"domain": "nordpool",
"name": "Nord Pool",
"codeowners": ["@gjohansson-ST"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/nordpool",
"integration_type": "hub",
"iot_class": "cloud_polling",
"loggers": ["pynordpool"],
"requirements": ["pynordpool==0.2.1"],
"single_config_entry": true
}

View file

@ -0,0 +1,328 @@
"""Sensor platform for Nord Pool integration."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
from pynordpool import DeliveryPeriodData
from homeassistant.components.sensor import (
EntityCategory,
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util import dt as dt_util, slugify
from . import NordPoolConfigEntry
from .const import LOGGER
from .coordinator import NordPoolDataUpdateCoordinator
from .entity import NordpoolBaseEntity
PARALLEL_UPDATES = 0
def get_prices(data: DeliveryPeriodData) -> dict[str, tuple[float, float, float]]:
"""Return previous, current and next prices.
Output: {"SE3": (10.0, 10.5, 12.1)}
"""
last_price_entries: dict[str, float] = {}
current_price_entries: dict[str, float] = {}
next_price_entries: dict[str, float] = {}
current_time = dt_util.utcnow()
previous_time = current_time - timedelta(hours=1)
next_time = current_time + timedelta(hours=1)
price_data = data.entries
for entry in price_data:
if entry.start <= current_time <= entry.end:
current_price_entries = entry.entry
if entry.start <= previous_time <= entry.end:
last_price_entries = entry.entry
if entry.start <= next_time <= entry.end:
next_price_entries = entry.entry
result = {}
for area, price in current_price_entries.items():
result[area] = (last_price_entries[area], price, next_price_entries[area])
LOGGER.debug("Prices: %s", result)
return result
def get_blockprices(
data: DeliveryPeriodData,
) -> dict[str, dict[str, tuple[datetime, datetime, float, float, float]]]:
"""Return average, min and max for block prices.
Output: {"SE3": {"Off-peak 1": (_datetime_, _datetime_, 9.3, 10.5, 12.1)}}
"""
result: dict[str, dict[str, tuple[datetime, datetime, float, float, float]]] = {}
block_prices = data.block_prices
for entry in block_prices:
for _area in entry.average:
if _area not in result:
result[_area] = {}
result[_area][entry.name] = (
entry.start,
entry.end,
entry.average[_area]["average"],
entry.average[_area]["min"],
entry.average[_area]["max"],
)
LOGGER.debug("Block prices: %s", result)
return result
@dataclass(frozen=True, kw_only=True)
class NordpoolDefaultSensorEntityDescription(SensorEntityDescription):
"""Describes Nord Pool default sensor entity."""
value_fn: Callable[[DeliveryPeriodData], str | float | datetime | None]
@dataclass(frozen=True, kw_only=True)
class NordpoolPricesSensorEntityDescription(SensorEntityDescription):
"""Describes Nord Pool prices sensor entity."""
value_fn: Callable[[tuple[float, float, float]], float | None]
@dataclass(frozen=True, kw_only=True)
class NordpoolBlockPricesSensorEntityDescription(SensorEntityDescription):
"""Describes Nord Pool block prices sensor entity."""
value_fn: Callable[
[tuple[datetime, datetime, float, float, float]], float | datetime | None
]
DEFAULT_SENSOR_TYPES: tuple[NordpoolDefaultSensorEntityDescription, ...] = (
NordpoolDefaultSensorEntityDescription(
key="updated_at",
translation_key="updated_at",
device_class=SensorDeviceClass.TIMESTAMP,
value_fn=lambda data: data.updated_at,
entity_category=EntityCategory.DIAGNOSTIC,
),
NordpoolDefaultSensorEntityDescription(
key="currency",
translation_key="currency",
value_fn=lambda data: data.currency,
entity_category=EntityCategory.DIAGNOSTIC,
),
NordpoolDefaultSensorEntityDescription(
key="exchange_rate",
translation_key="exchange_rate",
value_fn=lambda data: data.exchange_rate,
state_class=SensorStateClass.MEASUREMENT,
entity_registry_enabled_default=False,
entity_category=EntityCategory.DIAGNOSTIC,
),
)
PRICES_SENSOR_TYPES: tuple[NordpoolPricesSensorEntityDescription, ...] = (
NordpoolPricesSensorEntityDescription(
key="current_price",
translation_key="current_price",
value_fn=lambda data: data[1] / 1000,
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=2,
),
NordpoolPricesSensorEntityDescription(
key="last_price",
translation_key="last_price",
value_fn=lambda data: data[0] / 1000,
suggested_display_precision=2,
),
NordpoolPricesSensorEntityDescription(
key="next_price",
translation_key="next_price",
value_fn=lambda data: data[2] / 1000,
suggested_display_precision=2,
),
)
BLOCK_PRICES_SENSOR_TYPES: tuple[NordpoolBlockPricesSensorEntityDescription, ...] = (
NordpoolBlockPricesSensorEntityDescription(
key="block_average",
translation_key="block_average",
value_fn=lambda data: data[2] / 1000,
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=2,
entity_registry_enabled_default=False,
),
NordpoolBlockPricesSensorEntityDescription(
key="block_min",
translation_key="block_min",
value_fn=lambda data: data[3] / 1000,
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=2,
entity_registry_enabled_default=False,
),
NordpoolBlockPricesSensorEntityDescription(
key="block_max",
translation_key="block_max",
value_fn=lambda data: data[4] / 1000,
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=2,
entity_registry_enabled_default=False,
),
NordpoolBlockPricesSensorEntityDescription(
key="block_start_time",
translation_key="block_start_time",
value_fn=lambda data: data[0],
device_class=SensorDeviceClass.TIMESTAMP,
entity_registry_enabled_default=False,
),
NordpoolBlockPricesSensorEntityDescription(
key="block_end_time",
translation_key="block_end_time",
value_fn=lambda data: data[1],
device_class=SensorDeviceClass.TIMESTAMP,
entity_registry_enabled_default=False,
),
)
DAILY_AVERAGE_PRICES_SENSOR_TYPES: tuple[SensorEntityDescription, ...] = (
SensorEntityDescription(
key="daily_average",
translation_key="daily_average",
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=2,
entity_registry_enabled_default=False,
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: NordPoolConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Nord Pool sensor platform."""
coordinator = entry.runtime_data
entities: list[NordpoolBaseEntity] = []
currency = entry.runtime_data.data.currency
for area in get_prices(entry.runtime_data.data):
LOGGER.debug("Setting up base sensors for area %s", area)
entities.extend(
NordpoolSensor(coordinator, description, area)
for description in DEFAULT_SENSOR_TYPES
)
LOGGER.debug(
"Setting up price sensors for area %s with currency %s", area, currency
)
entities.extend(
NordpoolPriceSensor(coordinator, description, area, currency)
for description in PRICES_SENSOR_TYPES
)
entities.extend(
NordpoolDailyAveragePriceSensor(coordinator, description, area, currency)
for description in DAILY_AVERAGE_PRICES_SENSOR_TYPES
)
for block_name in get_blockprices(coordinator.data)[area]:
LOGGER.debug(
"Setting up block price sensors for area %s with currency %s in block %s",
area,
currency,
block_name,
)
entities.extend(
NordpoolBlockPriceSensor(
coordinator, description, area, currency, block_name
)
for description in BLOCK_PRICES_SENSOR_TYPES
)
async_add_entities(entities)
class NordpoolSensor(NordpoolBaseEntity, SensorEntity):
"""Representation of a Nord Pool sensor."""
entity_description: NordpoolDefaultSensorEntityDescription
@property
def native_value(self) -> str | float | datetime | None:
"""Return value of sensor."""
return self.entity_description.value_fn(self.coordinator.data)
class NordpoolPriceSensor(NordpoolBaseEntity, SensorEntity):
"""Representation of a Nord Pool price sensor."""
entity_description: NordpoolPricesSensorEntityDescription
def __init__(
self,
coordinator: NordPoolDataUpdateCoordinator,
entity_description: NordpoolPricesSensorEntityDescription,
area: str,
currency: str,
) -> None:
"""Initiate Nord Pool sensor."""
super().__init__(coordinator, entity_description, area)
self._attr_native_unit_of_measurement = f"{currency}/kWh"
@property
def native_value(self) -> float | None:
"""Return value of sensor."""
return self.entity_description.value_fn(
get_prices(self.coordinator.data)[self.area]
)
class NordpoolBlockPriceSensor(NordpoolBaseEntity, SensorEntity):
"""Representation of a Nord Pool block price sensor."""
entity_description: NordpoolBlockPricesSensorEntityDescription
def __init__(
self,
coordinator: NordPoolDataUpdateCoordinator,
entity_description: NordpoolBlockPricesSensorEntityDescription,
area: str,
currency: str,
block_name: str,
) -> None:
"""Initiate Nord Pool sensor."""
super().__init__(coordinator, entity_description, area)
if entity_description.device_class is not SensorDeviceClass.TIMESTAMP:
self._attr_native_unit_of_measurement = f"{currency}/kWh"
self._attr_unique_id = f"{slugify(block_name)}-{area}-{entity_description.key}"
self.block_name = block_name
self._attr_translation_placeholders = {"block": block_name}
@property
def native_value(self) -> float | datetime | None:
"""Return value of sensor."""
return self.entity_description.value_fn(
get_blockprices(self.coordinator.data)[self.area][self.block_name]
)
class NordpoolDailyAveragePriceSensor(NordpoolBaseEntity, SensorEntity):
"""Representation of a Nord Pool daily average price sensor."""
entity_description: SensorEntityDescription
def __init__(
self,
coordinator: NordPoolDataUpdateCoordinator,
entity_description: SensorEntityDescription,
area: str,
currency: str,
) -> None:
"""Initiate Nord Pool sensor."""
super().__init__(coordinator, entity_description, area)
self._attr_native_unit_of_measurement = f"{currency}/kWh"
@property
def native_value(self) -> float | None:
"""Return value of sensor."""
return self.coordinator.data.area_average[self.area] / 1000

View file

@ -0,0 +1,56 @@
{
"config": {
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"no_data": "API connected but the response was empty"
},
"step": {
"user": {
"data": {
"currency": "Currency",
"areas": "Areas"
}
}
}
},
"entity": {
"sensor": {
"updated_at": {
"name": "Last updated"
},
"currency": {
"name": "Currency"
},
"exchange_rate": {
"name": "Exchange rate"
},
"current_price": {
"name": "Current price"
},
"last_price": {
"name": "Previous price"
},
"next_price": {
"name": "Next price"
},
"block_average": {
"name": "{block} average"
},
"block_min": {
"name": "{block} lowest price"
},
"block_max": {
"name": "{block} highest price"
},
"block_start_time": {
"name": "{block} time from"
},
"block_end_time": {
"name": "{block} time until"
},
"daily_average": {
"name": "Daily average"
}
}
}
}

View file

@ -408,6 +408,7 @@ FLOWS = {
"nina",
"nmap_tracker",
"nobo_hub",
"nordpool",
"notion",
"nuheat",
"nuki",

View file

@ -4187,6 +4187,13 @@
"config_flow": true,
"iot_class": "local_push"
},
"nordpool": {
"name": "Nord Pool",
"integration_type": "hub",
"config_flow": true,
"iot_class": "cloud_polling",
"single_config_entry": true
},
"norway_air": {
"name": "Om Luftkvalitet i Norge (Norway Air)",
"integration_type": "hub",

View file

@ -3156,6 +3156,16 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.nordpool.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.notify.*]
check_untyped_defs = true
disallow_incomplete_defs = true

View file

@ -2095,6 +2095,9 @@ pynetio==0.1.9.1
# homeassistant.components.nobo_hub
pynobo==1.8.1
# homeassistant.components.nordpool
pynordpool==0.2.1
# homeassistant.components.nuki
pynuki==1.6.3

View file

@ -1688,6 +1688,9 @@ pynetgear==0.10.10
# homeassistant.components.nobo_hub
pynobo==1.8.1
# homeassistant.components.nordpool
pynordpool==0.2.1
# homeassistant.components.nuki
pynuki==1.6.3

View file

@ -0,0 +1,9 @@
"""Tests for the Nord Pool integration."""
from homeassistant.components.nordpool.const import CONF_AREAS
from homeassistant.const import CONF_CURRENCY
ENTRY_CONFIG = {
CONF_AREAS: ["SE3", "SE4"],
CONF_CURRENCY: "SEK",
}

View file

@ -0,0 +1,76 @@
"""Fixtures for the Nord Pool integration."""
from __future__ import annotations
from datetime import datetime
import json
from typing import Any
from unittest.mock import patch
from pynordpool import NordPoolClient
from pynordpool.const import Currency
from pynordpool.model import DeliveryPeriodData
import pytest
from homeassistant.components.nordpool.const import DOMAIN
from homeassistant.config_entries import SOURCE_USER
from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util
from . import ENTRY_CONFIG
from tests.common import MockConfigEntry, load_fixture
from tests.test_util.aiohttp import AiohttpClientMocker
@pytest.mark.freeze_time("2024-11-05T18:00:00+00:00")
@pytest.fixture
async def load_int(
hass: HomeAssistant, get_data: DeliveryPeriodData
) -> MockConfigEntry:
"""Set up the Nord Pool integration in Home Assistant."""
config_entry = MockConfigEntry(
domain=DOMAIN,
source=SOURCE_USER,
data=ENTRY_CONFIG,
)
config_entry.add_to_hass(hass)
with (
patch(
"homeassistant.components.nordpool.coordinator.NordPoolClient.async_get_delivery_period",
return_value=get_data,
),
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return config_entry
@pytest.fixture(name="get_data")
async def get_data_from_library(
hass: HomeAssistant, aioclient_mock: AiohttpClientMocker, load_json: dict[str, Any]
) -> DeliveryPeriodData:
"""Retrieve data from Nord Pool library."""
client = NordPoolClient(aioclient_mock.create_session(hass.loop))
with patch("pynordpool.NordPoolClient._get", return_value=load_json):
output = await client.async_get_delivery_period(
datetime(2024, 11, 5, 13, tzinfo=dt_util.UTC), Currency.SEK, ["SE3", "SE4"]
)
await client._session.close()
return output
@pytest.fixture(name="load_json")
def load_json_from_fixture(load_data: str) -> dict[str, Any]:
"""Load fixture with json data and return."""
return json.loads(load_data)
@pytest.fixture(name="load_data", scope="package")
def load_data_from_fixture() -> str:
"""Load fixture with fixture data and return."""
return load_fixture("delivery_period.json", DOMAIN)

View file

@ -0,0 +1,272 @@
{
"deliveryDateCET": "2024-11-05",
"version": 3,
"updatedAt": "2024-11-04T12:15:03.9456464Z",
"deliveryAreas": ["SE3", "SE4"],
"market": "DayAhead",
"multiAreaEntries": [
{
"deliveryStart": "2024-11-04T23:00:00Z",
"deliveryEnd": "2024-11-05T00:00:00Z",
"entryPerArea": {
"SE3": 250.73,
"SE4": 283.79
}
},
{
"deliveryStart": "2024-11-05T00:00:00Z",
"deliveryEnd": "2024-11-05T01:00:00Z",
"entryPerArea": {
"SE3": 76.36,
"SE4": 81.36
}
},
{
"deliveryStart": "2024-11-05T01:00:00Z",
"deliveryEnd": "2024-11-05T02:00:00Z",
"entryPerArea": {
"SE3": 73.92,
"SE4": 79.15
}
},
{
"deliveryStart": "2024-11-05T02:00:00Z",
"deliveryEnd": "2024-11-05T03:00:00Z",
"entryPerArea": {
"SE3": 61.69,
"SE4": 65.19
}
},
{
"deliveryStart": "2024-11-05T03:00:00Z",
"deliveryEnd": "2024-11-05T04:00:00Z",
"entryPerArea": {
"SE3": 64.6,
"SE4": 68.44
}
},
{
"deliveryStart": "2024-11-05T04:00:00Z",
"deliveryEnd": "2024-11-05T05:00:00Z",
"entryPerArea": {
"SE3": 453.27,
"SE4": 516.71
}
},
{
"deliveryStart": "2024-11-05T05:00:00Z",
"deliveryEnd": "2024-11-05T06:00:00Z",
"entryPerArea": {
"SE3": 996.28,
"SE4": 1240.85
}
},
{
"deliveryStart": "2024-11-05T06:00:00Z",
"deliveryEnd": "2024-11-05T07:00:00Z",
"entryPerArea": {
"SE3": 1406.14,
"SE4": 1648.25
}
},
{
"deliveryStart": "2024-11-05T07:00:00Z",
"deliveryEnd": "2024-11-05T08:00:00Z",
"entryPerArea": {
"SE3": 1346.54,
"SE4": 1570.5
}
},
{
"deliveryStart": "2024-11-05T08:00:00Z",
"deliveryEnd": "2024-11-05T09:00:00Z",
"entryPerArea": {
"SE3": 1150.28,
"SE4": 1345.37
}
},
{
"deliveryStart": "2024-11-05T09:00:00Z",
"deliveryEnd": "2024-11-05T10:00:00Z",
"entryPerArea": {
"SE3": 1031.32,
"SE4": 1206.51
}
},
{
"deliveryStart": "2024-11-05T10:00:00Z",
"deliveryEnd": "2024-11-05T11:00:00Z",
"entryPerArea": {
"SE3": 927.37,
"SE4": 1085.8
}
},
{
"deliveryStart": "2024-11-05T11:00:00Z",
"deliveryEnd": "2024-11-05T12:00:00Z",
"entryPerArea": {
"SE3": 925.05,
"SE4": 1081.72
}
},
{
"deliveryStart": "2024-11-05T12:00:00Z",
"deliveryEnd": "2024-11-05T13:00:00Z",
"entryPerArea": {
"SE3": 949.49,
"SE4": 1130.38
}
},
{
"deliveryStart": "2024-11-05T13:00:00Z",
"deliveryEnd": "2024-11-05T14:00:00Z",
"entryPerArea": {
"SE3": 1042.03,
"SE4": 1256.91
}
},
{
"deliveryStart": "2024-11-05T14:00:00Z",
"deliveryEnd": "2024-11-05T15:00:00Z",
"entryPerArea": {
"SE3": 1258.89,
"SE4": 1765.82
}
},
{
"deliveryStart": "2024-11-05T15:00:00Z",
"deliveryEnd": "2024-11-05T16:00:00Z",
"entryPerArea": {
"SE3": 1816.45,
"SE4": 2522.55
}
},
{
"deliveryStart": "2024-11-05T16:00:00Z",
"deliveryEnd": "2024-11-05T17:00:00Z",
"entryPerArea": {
"SE3": 2512.65,
"SE4": 3533.03
}
},
{
"deliveryStart": "2024-11-05T17:00:00Z",
"deliveryEnd": "2024-11-05T18:00:00Z",
"entryPerArea": {
"SE3": 1819.83,
"SE4": 2524.06
}
},
{
"deliveryStart": "2024-11-05T18:00:00Z",
"deliveryEnd": "2024-11-05T19:00:00Z",
"entryPerArea": {
"SE3": 1011.77,
"SE4": 1804.46
}
},
{
"deliveryStart": "2024-11-05T19:00:00Z",
"deliveryEnd": "2024-11-05T20:00:00Z",
"entryPerArea": {
"SE3": 835.53,
"SE4": 1112.57
}
},
{
"deliveryStart": "2024-11-05T20:00:00Z",
"deliveryEnd": "2024-11-05T21:00:00Z",
"entryPerArea": {
"SE3": 796.19,
"SE4": 1051.69
}
},
{
"deliveryStart": "2024-11-05T21:00:00Z",
"deliveryEnd": "2024-11-05T22:00:00Z",
"entryPerArea": {
"SE3": 522.3,
"SE4": 662.44
}
},
{
"deliveryStart": "2024-11-05T22:00:00Z",
"deliveryEnd": "2024-11-05T23:00:00Z",
"entryPerArea": {
"SE3": 289.14,
"SE4": 349.21
}
}
],
"blockPriceAggregates": [
{
"blockName": "Off-peak 1",
"deliveryStart": "2024-11-04T23:00:00Z",
"deliveryEnd": "2024-11-05T07:00:00Z",
"averagePricePerArea": {
"SE3": {
"average": 422.87,
"min": 61.69,
"max": 1406.14
},
"SE4": {
"average": 497.97,
"min": 65.19,
"max": 1648.25
}
}
},
{
"blockName": "Peak",
"deliveryStart": "2024-11-05T07:00:00Z",
"deliveryEnd": "2024-11-05T19:00:00Z",
"averagePricePerArea": {
"SE3": {
"average": 1315.97,
"min": 925.05,
"max": 2512.65
},
"SE4": {
"average": 1735.59,
"min": 1081.72,
"max": 3533.03
}
}
},
{
"blockName": "Off-peak 2",
"deliveryStart": "2024-11-05T19:00:00Z",
"deliveryEnd": "2024-11-05T23:00:00Z",
"averagePricePerArea": {
"SE3": {
"average": 610.79,
"min": 289.14,
"max": 835.53
},
"SE4": {
"average": 793.98,
"min": 349.21,
"max": 1112.57
}
}
}
],
"currency": "SEK",
"exchangeRate": 11.6402,
"areaStates": [
{
"state": "Final",
"areas": ["SE3", "SE4"]
}
],
"areaAverages": [
{
"areaCode": "SE3",
"price": 900.74
},
{
"areaCode": "SE4",
"price": 1166.12
}
]
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,151 @@
"""Test the Nord Pool config flow."""
from __future__ import annotations
from dataclasses import replace
from unittest.mock import patch
from pynordpool import (
DeliveryPeriodData,
NordPoolAuthenticationError,
NordPoolConnectionError,
NordPoolError,
NordPoolResponseError,
)
import pytest
from homeassistant import config_entries
from homeassistant.components.nordpool.const import DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from . import ENTRY_CONFIG
@pytest.mark.freeze_time("2024-11-05T18:00:00+00:00")
async def test_form(hass: HomeAssistant, get_data: DeliveryPeriodData) -> None:
"""Test we get the form."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["step_id"] == "user"
assert result["type"] is FlowResultType.FORM
with (
patch(
"homeassistant.components.nordpool.coordinator.NordPoolClient.async_get_delivery_period",
return_value=get_data,
),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
ENTRY_CONFIG,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["version"] == 1
assert result["title"] == "Nord Pool"
assert result["data"] == {"areas": ["SE3", "SE4"], "currency": "SEK"}
@pytest.mark.freeze_time("2024-11-05T18:00:00+00:00")
async def test_single_config_entry(
hass: HomeAssistant, load_int: None, get_data: DeliveryPeriodData
) -> None:
"""Test abort for single config entry."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "single_instance_allowed"
@pytest.mark.freeze_time("2024-11-05T18:00:00+00:00")
@pytest.mark.parametrize(
("error_message", "p_error"),
[
(NordPoolConnectionError, "cannot_connect"),
(NordPoolAuthenticationError, "cannot_connect"),
(NordPoolError, "cannot_connect"),
(NordPoolResponseError, "cannot_connect"),
],
)
async def test_cannot_connect(
hass: HomeAssistant,
get_data: DeliveryPeriodData,
error_message: Exception,
p_error: str,
) -> None:
"""Test cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == config_entries.SOURCE_USER
with patch(
"homeassistant.components.nordpool.coordinator.NordPoolClient.async_get_delivery_period",
side_effect=error_message,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input=ENTRY_CONFIG,
)
assert result["errors"] == {"base": p_error}
with patch(
"homeassistant.components.nordpool.coordinator.NordPoolClient.async_get_delivery_period",
return_value=get_data,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input=ENTRY_CONFIG,
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Nord Pool"
assert result["data"] == {"areas": ["SE3", "SE4"], "currency": "SEK"}
@pytest.mark.freeze_time("2024-11-05T18:00:00+00:00")
async def test_empty_data(hass: HomeAssistant, get_data: DeliveryPeriodData) -> None:
"""Test empty data error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == config_entries.SOURCE_USER
invalid_data = replace(get_data, raw={})
with patch(
"homeassistant.components.nordpool.coordinator.NordPoolClient.async_get_delivery_period",
return_value=invalid_data,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input=ENTRY_CONFIG,
)
assert result["errors"] == {"base": "no_data"}
with patch(
"homeassistant.components.nordpool.coordinator.NordPoolClient.async_get_delivery_period",
return_value=get_data,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input=ENTRY_CONFIG,
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Nord Pool"
assert result["data"] == {"areas": ["SE3", "SE4"], "currency": "SEK"}

View file

@ -0,0 +1,114 @@
"""The test for the Nord Pool coordinator."""
from __future__ import annotations
from datetime import timedelta
from unittest.mock import patch
from freezegun.api import FrozenDateTimeFactory
from pynordpool import (
DeliveryPeriodData,
NordPoolAuthenticationError,
NordPoolError,
NordPoolResponseError,
)
import pytest
from homeassistant.components.nordpool.const import DOMAIN
from homeassistant.config_entries import SOURCE_USER
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util
from . import ENTRY_CONFIG
from tests.common import MockConfigEntry, async_fire_time_changed
@pytest.mark.freeze_time("2024-11-05T12:00:00+00:00")
async def test_coordinator(
hass: HomeAssistant,
get_data: DeliveryPeriodData,
freezer: FrozenDateTimeFactory,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test the Nord Pool coordinator with errors."""
config_entry = MockConfigEntry(
domain=DOMAIN,
source=SOURCE_USER,
data=ENTRY_CONFIG,
)
config_entry.add_to_hass(hass)
with (
patch(
"homeassistant.components.nordpool.coordinator.NordPoolClient.async_get_delivery_period",
) as mock_data,
):
mock_data.return_value = get_data
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
mock_data.assert_called_once()
state = hass.states.get("sensor.nord_pool_se3_current_price")
assert state.state == "0.94949"
mock_data.reset_mock()
mock_data.side_effect = NordPoolError("error")
freezer.tick(timedelta(hours=1))
async_fire_time_changed(hass)
await hass.async_block_till_done(wait_background_tasks=True)
mock_data.assert_called_once()
state = hass.states.get("sensor.nord_pool_se3_current_price")
assert state.state == STATE_UNAVAILABLE
mock_data.reset_mock()
assert "Authentication error" not in caplog.text
mock_data.side_effect = NordPoolAuthenticationError("Authentication error")
freezer.tick(timedelta(hours=1))
async_fire_time_changed(hass)
await hass.async_block_till_done(wait_background_tasks=True)
mock_data.assert_called_once()
state = hass.states.get("sensor.nord_pool_se3_current_price")
assert state.state == STATE_UNAVAILABLE
assert "Authentication error" in caplog.text
mock_data.reset_mock()
assert "Response error" not in caplog.text
mock_data.side_effect = NordPoolResponseError("Response error")
freezer.tick(timedelta(hours=1))
async_fire_time_changed(hass)
await hass.async_block_till_done(wait_background_tasks=True)
mock_data.assert_called_once()
state = hass.states.get("sensor.nord_pool_se3_current_price")
assert state.state == STATE_UNAVAILABLE
assert "Response error" in caplog.text
mock_data.reset_mock()
mock_data.return_value = DeliveryPeriodData(
raw={},
requested_date="2024-11-05",
updated_at=dt_util.utcnow(),
entries=[],
block_prices=[],
currency="SEK",
exchange_rate=1,
area_average={},
)
mock_data.side_effect = None
freezer.tick(timedelta(hours=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
mock_data.assert_called_once()
state = hass.states.get("sensor.nord_pool_se3_current_price")
assert state.state == STATE_UNAVAILABLE
mock_data.reset_mock()
mock_data.return_value = get_data
mock_data.side_effect = None
freezer.tick(timedelta(hours=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
mock_data.assert_called_once()
state = hass.states.get("sensor.nord_pool_se3_current_price")
assert state.state == "1.81983"

View file

@ -0,0 +1,39 @@
"""Test for Nord Pool component Init."""
from __future__ import annotations
from unittest.mock import patch
from pynordpool import DeliveryPeriodData
from homeassistant.components.nordpool.const import DOMAIN
from homeassistant.config_entries import SOURCE_USER, ConfigEntryState
from homeassistant.core import HomeAssistant
from . import ENTRY_CONFIG
from tests.common import MockConfigEntry
async def test_unload_entry(hass: HomeAssistant, get_data: DeliveryPeriodData) -> None:
"""Test load and unload an entry."""
entry = MockConfigEntry(
domain=DOMAIN,
source=SOURCE_USER,
data=ENTRY_CONFIG,
)
entry.add_to_hass(hass)
with (
patch(
"homeassistant.components.nordpool.coordinator.NordPoolClient.async_get_delivery_period",
return_value=get_data,
),
):
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done(wait_background_tasks=True)
assert entry.state is ConfigEntryState.LOADED
assert await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
assert entry.state is ConfigEntryState.NOT_LOADED

View file

@ -0,0 +1,25 @@
"""The test for the Nord Pool sensor platform."""
from __future__ import annotations
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from tests.common import snapshot_platform
@pytest.mark.freeze_time("2024-11-05T18:00:00+00:00")
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
async def test_sensor(
hass: HomeAssistant,
load_int: ConfigEntry,
entity_registry: er.EntityRegistry,
snapshot: SnapshotAssertion,
) -> None:
"""Test the Nord Pool sensor."""
await snapshot_platform(hass, entity_registry, snapshot, load_int.entry_id)