Split out coordinator and add tests for nibe_heatpump (#103452)

* Separate coordinator in nibe heatpump

* Add tests for coordinator in nibe

* Correct errors in coordinator found during testing

* If coil is missing we should still write state
* async_shutdown did not call base class

* Add more tests for coordinator

* Add minimal test to climate
This commit is contained in:
Joakim Plate 2023-11-06 11:15:00 +01:00 committed by GitHub
parent cee8379628
commit a35f5dc6f5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 736 additions and 307 deletions

View file

@ -1,20 +1,11 @@
"""The Nibe Heat Pump integration."""
from __future__ import annotations
import asyncio
from collections import defaultdict
from collections.abc import Callable, Iterable
from datetime import timedelta
from typing import Any, Generic, TypeVar
from nibe.coil import Coil, CoilData
from nibe.connection import Connection
from nibe.connection.modbus import Modbus
from nibe.connection.nibegw import NibeGW, ProductInfo
from nibe.exceptions import CoilNotFoundException, ReadException
from nibe.heatpump import HeatPump, Model, Series
from nibe.heatpump import HeatPump, Model
from homeassistant.backports.functools import cached_property
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_IP_ADDRESS,
@ -22,16 +13,9 @@ from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP,
Platform,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import async_generate_entity_id
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
UpdateFailed,
)
from .const import (
CONF_CONNECTION_TYPE,
@ -44,8 +28,8 @@ from .const import (
CONF_REMOTE_WRITE_PORT,
CONF_WORD_SWAP,
DOMAIN,
LOGGER,
)
from .coordinator import Coordinator
PLATFORMS: list[Platform] = [
Platform.BINARY_SENSOR,
@ -131,218 +115,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
coordinator: Coordinator = hass.data[DOMAIN].pop(entry.entry_id)
await coordinator.async_shutdown()
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
_DataTypeT = TypeVar("_DataTypeT")
_ContextTypeT = TypeVar("_ContextTypeT")
class ContextCoordinator(
Generic[_DataTypeT, _ContextTypeT], DataUpdateCoordinator[_DataTypeT]
):
"""Update coordinator with context adjustments."""
@cached_property
def context_callbacks(self) -> dict[_ContextTypeT, list[CALLBACK_TYPE]]:
"""Return a dict of all callbacks registered for a given context."""
callbacks: dict[_ContextTypeT, list[CALLBACK_TYPE]] = defaultdict(list)
for update_callback, context in list(self._listeners.values()):
assert isinstance(context, set)
for address in context:
callbacks[address].append(update_callback)
return callbacks
@callback
def async_update_context_listeners(self, contexts: Iterable[_ContextTypeT]) -> None:
"""Update all listeners given a set of contexts."""
update_callbacks: set[CALLBACK_TYPE] = set()
for context in contexts:
update_callbacks.update(self.context_callbacks.get(context, []))
for update_callback in update_callbacks:
update_callback()
@callback
def async_add_listener(
self, update_callback: CALLBACK_TYPE, context: Any = None
) -> Callable[[], None]:
"""Wrap standard function to prune cached callback database."""
assert isinstance(context, set)
context -= {None}
release = super().async_add_listener(update_callback, context)
self.__dict__.pop("context_callbacks", None)
@callback
def release_update():
release()
self.__dict__.pop("context_callbacks", None)
return release_update
class Coordinator(ContextCoordinator[dict[int, CoilData], int]):
"""Update coordinator for nibe heat pumps."""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
heatpump: HeatPump,
connection: Connection,
) -> None:
"""Initialize coordinator."""
super().__init__(
hass, LOGGER, name="Nibe Heat Pump", update_interval=timedelta(seconds=60)
)
self.data = {}
self.seed: dict[int, CoilData] = {}
self.connection = connection
self.heatpump = heatpump
self.task: asyncio.Task | None = None
heatpump.subscribe(heatpump.COIL_UPDATE_EVENT, self._on_coil_update)
def _on_coil_update(self, data: CoilData):
"""Handle callback on coil updates."""
coil = data.coil
self.data[coil.address] = data
self.seed[coil.address] = data
self.async_update_context_listeners([coil.address])
@property
def series(self) -> Series:
"""Return which series of pump we are connected to."""
return self.heatpump.series
@property
def coils(self) -> list[Coil]:
"""Return the full coil database."""
return self.heatpump.get_coils()
@property
def unique_id(self) -> str:
"""Return unique id for this coordinator."""
return self.config_entry.unique_id or self.config_entry.entry_id
@property
def device_info(self) -> DeviceInfo:
"""Return device information for the main device."""
return DeviceInfo(identifiers={(DOMAIN, self.unique_id)})
def get_coil_value(self, coil: Coil) -> int | str | float | None:
"""Return a coil with data and check for validity."""
if coil_with_data := self.data.get(coil.address):
return coil_with_data.value
return None
def get_coil_float(self, coil: Coil) -> float | None:
"""Return a coil with float and check for validity."""
if value := self.get_coil_value(coil):
return float(value)
return None
async def async_write_coil(self, coil: Coil, value: int | float | str) -> None:
"""Write coil and update state."""
data = CoilData(coil, value)
await self.connection.write_coil(data)
self.data[coil.address] = data
self.async_update_context_listeners([coil.address])
async def async_read_coil(self, coil: Coil) -> CoilData:
"""Read coil and update state using callbacks."""
return await self.connection.read_coil(coil)
async def _async_update_data(self) -> dict[int, CoilData]:
self.task = asyncio.current_task()
try:
return await self._async_update_data_internal()
finally:
self.task = None
async def _async_update_data_internal(self) -> dict[int, CoilData]:
result: dict[int, CoilData] = {}
def _get_coils() -> Iterable[Coil]:
for address in sorted(self.context_callbacks.keys()):
if seed := self.seed.pop(address, None):
self.logger.debug("Skipping seeded coil: %d", address)
result[address] = seed
continue
try:
coil = self.heatpump.get_coil_by_address(address)
except CoilNotFoundException as exception:
self.logger.debug("Skipping missing coil: %s", exception)
continue
yield coil
try:
async for data in self.connection.read_coils(_get_coils()):
result[data.coil.address] = data
self.seed.pop(data.coil.address, None)
except ReadException as exception:
if not result:
raise UpdateFailed(f"Failed to update: {exception}") from exception
self.logger.debug(
"Some coils failed to update, and may be unsupported: %s", exception
)
return result
async def async_shutdown(self):
"""Make sure a coordinator is shut down as well as it's connection."""
if self.task:
self.task.cancel()
await asyncio.wait((self.task,))
self._unschedule_refresh()
await self.connection.stop()
class CoilEntity(CoordinatorEntity[Coordinator]):
"""Base for coil based entities."""
_attr_has_entity_name = True
_attr_entity_registry_enabled_default = False
def __init__(
self, coordinator: Coordinator, coil: Coil, entity_format: str
) -> None:
"""Initialize base entity."""
super().__init__(coordinator, {coil.address})
self.entity_id = async_generate_entity_id(
entity_format, coil.name, hass=coordinator.hass
)
self._attr_name = coil.title
self._attr_unique_id = f"{coordinator.unique_id}-{coil.address}"
self._attr_device_info = coordinator.device_info
self._coil = coil
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self._coil.address in (
self.coordinator.data or {}
)
def _async_read_coil(self, data: CoilData):
"""Update state of entity based on coil data."""
async def _async_write_coil(self, value: int | float | str):
"""Write coil and update state."""
await self.coordinator.async_write_coil(self._coil, value)
def _handle_coordinator_update(self) -> None:
data = self.coordinator.data.get(self._coil.address)
if data is None:
return
self._async_read_coil(data)
self.async_write_ha_state()

View file

@ -9,7 +9,8 @@ from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import DOMAIN, CoilEntity, Coordinator
from .const import DOMAIN
from .coordinator import CoilEntity, Coordinator
async def async_setup_entry(

View file

@ -11,7 +11,8 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import DOMAIN, LOGGER, Coordinator
from .const import DOMAIN, LOGGER
from .coordinator import Coordinator
async def async_setup_entry(

View file

@ -28,7 +28,6 @@ from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import Coordinator
from .const import (
DOMAIN,
LOGGER,
@ -37,6 +36,7 @@ from .const import (
VALUES_PRIORITY_COOLING,
VALUES_PRIORITY_HEATING,
)
from .coordinator import Coordinator
async def async_setup_entry(

View file

@ -0,0 +1,234 @@
"""The Nibe Heat Pump coordinator."""
from __future__ import annotations
import asyncio
from collections import defaultdict
from collections.abc import Callable, Iterable
from datetime import timedelta
from typing import Any, Generic, TypeVar
from nibe.coil import Coil, CoilData
from nibe.connection import Connection
from nibe.exceptions import CoilNotFoundException, ReadException
from nibe.heatpump import HeatPump, Series
from homeassistant.backports.functools import cached_property
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import async_generate_entity_id
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
UpdateFailed,
)
from .const import DOMAIN, LOGGER
_DataTypeT = TypeVar("_DataTypeT")
_ContextTypeT = TypeVar("_ContextTypeT")
class ContextCoordinator(
Generic[_DataTypeT, _ContextTypeT], DataUpdateCoordinator[_DataTypeT]
):
"""Update coordinator with context adjustments."""
@cached_property
def context_callbacks(self) -> dict[_ContextTypeT, list[CALLBACK_TYPE]]:
"""Return a dict of all callbacks registered for a given context."""
callbacks: dict[_ContextTypeT, list[CALLBACK_TYPE]] = defaultdict(list)
for update_callback, context in list(self._listeners.values()):
assert isinstance(context, set)
for address in context:
callbacks[address].append(update_callback)
return callbacks
@callback
def async_update_context_listeners(self, contexts: Iterable[_ContextTypeT]) -> None:
"""Update all listeners given a set of contexts."""
update_callbacks: set[CALLBACK_TYPE] = set()
for context in contexts:
update_callbacks.update(self.context_callbacks.get(context, []))
for update_callback in update_callbacks:
update_callback()
@callback
def async_add_listener(
self, update_callback: CALLBACK_TYPE, context: Any = None
) -> Callable[[], None]:
"""Wrap standard function to prune cached callback database."""
assert isinstance(context, set)
context -= {None}
release = super().async_add_listener(update_callback, context)
self.__dict__.pop("context_callbacks", None)
@callback
def release_update():
release()
self.__dict__.pop("context_callbacks", None)
return release_update
class Coordinator(ContextCoordinator[dict[int, CoilData], int]):
"""Update coordinator for nibe heat pumps."""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
heatpump: HeatPump,
connection: Connection,
) -> None:
"""Initialize coordinator."""
super().__init__(
hass, LOGGER, name="Nibe Heat Pump", update_interval=timedelta(seconds=60)
)
self.data = {}
self.seed: dict[int, CoilData] = {}
self.connection = connection
self.heatpump = heatpump
self.task: asyncio.Task | None = None
heatpump.subscribe(heatpump.COIL_UPDATE_EVENT, self._on_coil_update)
def _on_coil_update(self, data: CoilData):
"""Handle callback on coil updates."""
coil = data.coil
self.data[coil.address] = data
self.seed[coil.address] = data
self.async_update_context_listeners([coil.address])
@property
def series(self) -> Series:
"""Return which series of pump we are connected to."""
return self.heatpump.series
@property
def coils(self) -> list[Coil]:
"""Return the full coil database."""
return self.heatpump.get_coils()
@property
def unique_id(self) -> str:
"""Return unique id for this coordinator."""
return self.config_entry.unique_id or self.config_entry.entry_id
@property
def device_info(self) -> DeviceInfo:
"""Return device information for the main device."""
return DeviceInfo(identifiers={(DOMAIN, self.unique_id)})
def get_coil_value(self, coil: Coil) -> int | str | float | None:
"""Return a coil with data and check for validity."""
if coil_with_data := self.data.get(coil.address):
return coil_with_data.value
return None
def get_coil_float(self, coil: Coil) -> float | None:
"""Return a coil with float and check for validity."""
if value := self.get_coil_value(coil):
return float(value)
return None
async def async_write_coil(self, coil: Coil, value: int | float | str) -> None:
"""Write coil and update state."""
data = CoilData(coil, value)
await self.connection.write_coil(data)
self.data[coil.address] = data
self.async_update_context_listeners([coil.address])
async def async_read_coil(self, coil: Coil) -> CoilData:
"""Read coil and update state using callbacks."""
return await self.connection.read_coil(coil)
async def _async_update_data(self) -> dict[int, CoilData]:
self.task = asyncio.current_task()
try:
return await self._async_update_data_internal()
finally:
self.task = None
async def _async_update_data_internal(self) -> dict[int, CoilData]:
result: dict[int, CoilData] = {}
def _get_coils() -> Iterable[Coil]:
for address in sorted(self.context_callbacks.keys()):
if seed := self.seed.pop(address, None):
self.logger.debug("Skipping seeded coil: %d", address)
result[address] = seed
continue
try:
coil = self.heatpump.get_coil_by_address(address)
except CoilNotFoundException as exception:
self.logger.debug("Skipping missing coil: %s", exception)
continue
yield coil
try:
async for data in self.connection.read_coils(_get_coils()):
result[data.coil.address] = data
self.seed.pop(data.coil.address, None)
except ReadException as exception:
if not result:
raise UpdateFailed(f"Failed to update: {exception}") from exception
self.logger.debug(
"Some coils failed to update, and may be unsupported: %s", exception
)
return result
async def async_shutdown(self):
"""Make sure a coordinator is shut down as well as it's connection."""
await super().async_shutdown()
if self.task:
self.task.cancel()
await asyncio.wait((self.task,))
await self.connection.stop()
class CoilEntity(CoordinatorEntity[Coordinator]):
"""Base for coil based entities."""
_attr_has_entity_name = True
_attr_entity_registry_enabled_default = False
def __init__(
self, coordinator: Coordinator, coil: Coil, entity_format: str
) -> None:
"""Initialize base entity."""
super().__init__(coordinator, {coil.address})
self.entity_id = async_generate_entity_id(
entity_format, coil.name, hass=coordinator.hass
)
self._attr_name = coil.title
self._attr_unique_id = f"{coordinator.unique_id}-{coil.address}"
self._attr_device_info = coordinator.device_info
self._coil = coil
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self._coil.address in (
self.coordinator.data or {}
)
def _async_read_coil(self, data: CoilData):
"""Update state of entity based on coil data."""
async def _async_write_coil(self, value: int | float | str):
"""Write coil and update state."""
await self.coordinator.async_write_coil(self._coil, value)
def _handle_coordinator_update(self) -> None:
data = self.coordinator.data.get(self._coil.address)
if data is not None:
self._async_read_coil(data)
self.async_write_ha_state()

View file

@ -9,7 +9,8 @@ from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import DOMAIN, CoilEntity, Coordinator
from .const import DOMAIN
from .coordinator import CoilEntity, Coordinator
async def async_setup_entry(

View file

@ -9,7 +9,8 @@ from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import DOMAIN, CoilEntity, Coordinator
from .const import DOMAIN
from .coordinator import CoilEntity, Coordinator
async def async_setup_entry(

View file

@ -24,7 +24,8 @@ from homeassistant.const import (
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import DOMAIN, CoilEntity, Coordinator
from .const import DOMAIN
from .coordinator import CoilEntity, Coordinator
UNIT_DESCRIPTIONS = {
"°C": SensorEntityDescription(

View file

@ -11,7 +11,8 @@ from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import DOMAIN, CoilEntity, Coordinator
from .const import DOMAIN
from .coordinator import CoilEntity, Coordinator
async def async_setup_entry(

View file

@ -17,8 +17,13 @@ from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import DOMAIN, LOGGER, Coordinator
from .const import VALUES_TEMPORARY_LUX_INACTIVE, VALUES_TEMPORARY_LUX_ONE_TIME_INCREASE
from .const import (
DOMAIN,
LOGGER,
VALUES_TEMPORARY_LUX_INACTIVE,
VALUES_TEMPORARY_LUX_ONE_TIME_INCREASE,
)
from .coordinator import Coordinator
async def async_setup_entry(

View file

@ -1,8 +1,12 @@
"""Tests for the Nibe Heat Pump integration."""
from typing import Any
from unittest.mock import AsyncMock
from nibe.heatpump import Model
from nibe.coil import Coil, CoilData
from nibe.connection import Connection
from nibe.exceptions import ReadException
from nibe.heatpump import HeatPump, Model
from homeassistant.components.nibe_heatpump import DOMAIN
from homeassistant.config_entries import ConfigEntryState
@ -21,7 +25,39 @@ MOCK_ENTRY_DATA = {
}
async def async_add_entry(hass: HomeAssistant, data: dict[str, Any]) -> None:
class MockConnection(Connection):
"""A mock connection class."""
def __init__(self) -> None:
"""Initialize the mock connection."""
self.coils: dict[int, Any] = {}
self.heatpump: HeatPump
self.start = AsyncMock()
self.stop = AsyncMock()
self.write_coil = AsyncMock()
self.verify_connectivity = AsyncMock()
self.read_product_info = AsyncMock()
async def read_coil(self, coil: Coil, timeout: float = 0) -> CoilData:
"""Read of coils."""
if (data := self.coils.get(coil.address, None)) is None:
raise ReadException()
return CoilData(coil, data)
async def write_coil(self, coil_data: CoilData, timeout: float = 10.0) -> None:
"""Write a coil data to the heatpump."""
async def verify_connectivity(self):
"""Verify that we have functioning communication."""
def mock_coil_update(self, coil_id: int, value: int | float | str | None):
"""Trigger an out of band coil update."""
coil = self.heatpump.get_coil_by_address(coil_id)
self.coils[coil_id] = value
self.heatpump.notify_coil_update(CoilData(coil, value))
async def async_add_entry(hass: HomeAssistant, data: dict[str, Any]) -> MockConfigEntry:
"""Add entry and get the coordinator."""
entry = MockConfigEntry(domain=DOMAIN, title="Dummy", data=data)
@ -29,8 +65,9 @@ async def async_add_entry(hass: HomeAssistant, data: dict[str, Any]) -> None:
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert entry.state == ConfigEntryState.LOADED
return entry
async def async_add_model(hass: HomeAssistant, model: Model):
async def async_add_model(hass: HomeAssistant, model: Model) -> MockConfigEntry:
"""Add entry of specific model."""
await async_add_entry(hass, {**MOCK_ENTRY_DATA, "model": model.name})
return await async_add_entry(hass, {**MOCK_ENTRY_DATA, "model": model.name})

View file

@ -1,14 +1,18 @@
"""Test configuration for Nibe Heat Pump."""
from collections.abc import AsyncIterator, Generator, Iterable
from collections.abc import Generator
from contextlib import ExitStack
from typing import Any
from unittest.mock import AsyncMock, Mock, patch
from nibe.coil import Coil, CoilData
from nibe.connection import Connection
from nibe.exceptions import ReadException
from freezegun.api import FrozenDateTimeFactory
from nibe.exceptions import CoilNotFoundException
import pytest
from homeassistant.core import HomeAssistant
from . import MockConnection
from tests.common import async_fire_time_changed
@pytest.fixture
def mock_setup_entry() -> Generator[AsyncMock, None, None]:
@ -19,10 +23,22 @@ def mock_setup_entry() -> Generator[AsyncMock, None, None]:
yield mock_setup_entry
@pytest.fixture(autouse=True, name="mock_connection_constructor")
async def fixture_mock_connection_constructor():
@pytest.fixture(autouse=True, name="mock_connection_construct")
async def fixture_mock_connection_construct():
"""Fixture to catch constructor calls."""
return Mock()
@pytest.fixture(autouse=True, name="mock_connection")
async def fixture_mock_connection(mock_connection_construct):
"""Make sure we have a dummy connection."""
mock_constructor = Mock()
mock_connection = MockConnection()
def construct(heatpump, *args, **kwargs):
mock_connection_construct(heatpump, *args, **kwargs)
mock_connection.heatpump = heatpump
return mock_connection
with ExitStack() as stack:
places = [
"homeassistant.components.nibe_heatpump.config_flow.NibeGW",
@ -31,46 +47,43 @@ async def fixture_mock_connection_constructor():
"homeassistant.components.nibe_heatpump.Modbus",
]
for place in places:
stack.enter_context(patch(place, new=mock_constructor))
yield mock_constructor
@pytest.fixture(name="mock_connection")
def fixture_mock_connection(mock_connection_constructor: Mock):
"""Make sure we have a dummy connection."""
mock_connection = AsyncMock(spec=Connection)
mock_connection_constructor.return_value = mock_connection
return mock_connection
stack.enter_context(patch(place, new=construct))
yield mock_connection
@pytest.fixture(name="coils")
async def fixture_coils(mock_connection):
async def fixture_coils(mock_connection: MockConnection):
"""Return a dict with coil data."""
coils: dict[int, Any] = {}
async def read_coil(coil: Coil, timeout: float = 0) -> CoilData:
nonlocal coils
if (data := coils.get(coil.address, None)) is None:
raise ReadException()
return CoilData(coil, data)
async def read_coils(
coils: Iterable[Coil], timeout: float = 0
) -> AsyncIterator[Coil]:
for coil in coils:
yield await read_coil(coil, timeout)
mock_connection.read_coil = read_coil
mock_connection.read_coils = read_coils
# pylint: disable-next=import-outside-toplevel
from homeassistant.components.nibe_heatpump import HeatPump
get_coils_original = HeatPump.get_coils
get_coil_by_address_original = HeatPump.get_coil_by_address
def get_coils(x):
coils_data = get_coils_original(x)
return [coil for coil in coils_data if coil.address in coils]
return [coil for coil in coils_data if coil.address in mock_connection.coils]
with patch.object(HeatPump, "get_coils", new=get_coils):
yield coils
def get_coil_by_address(self, address):
coils_data = get_coil_by_address_original(self, address)
if coils_data.address not in mock_connection.coils:
raise CoilNotFoundException()
return coils_data
with patch.object(HeatPump, "get_coils", new=get_coils), patch.object(
HeatPump, "get_coil_by_address", new=get_coil_by_address
):
yield mock_connection.coils
@pytest.fixture(name="freezer_ticker")
async def fixture_freezer_ticker(hass: HomeAssistant, freezer: FrozenDateTimeFactory):
"""Tick time and perform actions."""
async def ticker(delay, block=True):
freezer.tick(delay)
async_fire_time_changed(hass)
if block:
await hass.async_block_till_done()
return ticker

View file

@ -0,0 +1,53 @@
# serializer version: 1
# name: test_basic[Model.S320-s1-climate.climate_system_s1][1. initial]
StateSnapshot({
'attributes': ReadOnlyDict({
'current_temperature': 20.5,
'friendly_name': 'Climate System S1',
'hvac_action': <HVACAction.HEATING: 'heating'>,
'hvac_modes': list([
<HVACMode.AUTO: 'auto'>,
<HVACMode.HEAT: 'heat'>,
<HVACMode.HEAT_COOL: 'heat_cool'>,
]),
'max_temp': 35.0,
'min_temp': 5.0,
'supported_features': <ClimateEntityFeature: 3>,
'target_temp_high': 30.0,
'target_temp_low': 21.0,
'target_temp_step': 0.5,
'temperature': None,
}),
'context': <ANY>,
'entity_id': 'climate.climate_system_s1',
'last_changed': <ANY>,
'last_updated': <ANY>,
'state': 'heat_cool',
})
# ---
# name: test_basic[Model.S320-s1-climate.climate_system_s1][2. idle]
StateSnapshot({
'attributes': ReadOnlyDict({
'current_temperature': 20.5,
'friendly_name': 'Climate System S1',
'hvac_action': <HVACAction.IDLE: 'idle'>,
'hvac_modes': list([
<HVACMode.AUTO: 'auto'>,
<HVACMode.HEAT: 'heat'>,
<HVACMode.HEAT_COOL: 'heat_cool'>,
]),
'max_temp': 35.0,
'min_temp': 5.0,
'supported_features': <ClimateEntityFeature: 3>,
'target_temp_high': 30.0,
'target_temp_low': 21.0,
'target_temp_step': 0.5,
'temperature': None,
}),
'context': <ANY>,
'entity_id': 'climate.climate_system_s1',
'last_changed': <ANY>,
'last_updated': <ANY>,
'state': 'heat_cool',
})
# ---

View file

@ -0,0 +1,133 @@
# serializer version: 1
# name: test_invalid_coil[Sensor is available]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'S320 Heating offset climate system 1',
'max': 10.0,
'min': -10.0,
'mode': <NumberMode.AUTO: 'auto'>,
'step': 1.0,
}),
'context': <ANY>,
'entity_id': 'number.heating_offset_climate_system_1_40031',
'last_changed': <ANY>,
'last_updated': <ANY>,
'state': '10.0',
})
# ---
# name: test_invalid_coil[Sensor is not available]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'S320 Heating offset climate system 1',
'max': 10.0,
'min': -10.0,
'mode': <NumberMode.AUTO: 'auto'>,
'step': 1.0,
}),
'context': <ANY>,
'entity_id': 'number.heating_offset_climate_system_1_40031',
'last_changed': <ANY>,
'last_updated': <ANY>,
'state': 'unavailable',
})
# ---
# name: test_partial_refresh[1. Sensor is available]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'S320 Heating offset climate system 1',
'max': 10.0,
'min': -10.0,
'mode': <NumberMode.AUTO: 'auto'>,
'step': 1.0,
}),
'context': <ANY>,
'entity_id': 'number.heating_offset_climate_system_1_40031',
'last_changed': <ANY>,
'last_updated': <ANY>,
'state': '10.0',
})
# ---
# name: test_partial_refresh[2. Sensor is not available]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'S320 Min supply climate system 1',
'max': 80.0,
'min': 5.0,
'mode': <NumberMode.AUTO: 'auto'>,
'step': 0.1,
'unit_of_measurement': '°C',
}),
'context': <ANY>,
'entity_id': 'number.min_supply_climate_system_1_40035',
'last_changed': <ANY>,
'last_updated': <ANY>,
'state': 'unavailable',
})
# ---
# name: test_partial_refresh[3. Sensor is available]
None
# ---
# name: test_pushed_update[1. initial values]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'S320 Heating offset climate system 1',
'max': 10.0,
'min': -10.0,
'mode': <NumberMode.AUTO: 'auto'>,
'step': 1.0,
}),
'context': <ANY>,
'entity_id': 'number.heating_offset_climate_system_1_40031',
'last_changed': <ANY>,
'last_updated': <ANY>,
'state': '10.0',
})
# ---
# name: test_pushed_update[2. pushed values]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'S320 Heating offset climate system 1',
'max': 10.0,
'min': -10.0,
'mode': <NumberMode.AUTO: 'auto'>,
'step': 1.0,
}),
'context': <ANY>,
'entity_id': 'number.heating_offset_climate_system_1_40031',
'last_changed': <ANY>,
'last_updated': <ANY>,
'state': '20.0',
})
# ---
# name: test_pushed_update[3. seeded values]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'S320 Heating offset climate system 1',
'max': 10.0,
'min': -10.0,
'mode': <NumberMode.AUTO: 'auto'>,
'step': 1.0,
}),
'context': <ANY>,
'entity_id': 'number.heating_offset_climate_system_1_40031',
'last_changed': <ANY>,
'last_updated': <ANY>,
'state': '20.0',
})
# ---
# name: test_pushed_update[4. final values]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'S320 Heating offset climate system 1',
'max': 10.0,
'min': -10.0,
'mode': <NumberMode.AUTO: 'auto'>,
'step': 1.0,
}),
'context': <ANY>,
'entity_id': 'number.heating_offset_climate_system_1_40031',
'last_changed': <ANY>,
'last_updated': <ANY>,
'state': '30.0',
})
# ---

View file

@ -2,7 +2,6 @@
from typing import Any
from unittest.mock import AsyncMock, patch
from freezegun.api import FrozenDateTimeFactory
from nibe.coil import CoilData
from nibe.coil_groups import UNIT_COILGROUPS
from nibe.heatpump import Model
@ -19,8 +18,6 @@ from homeassistant.core import HomeAssistant
from . import async_add_model
from tests.common import async_fire_time_changed
@pytest.fixture(autouse=True)
async def fixture_single_platform():
@ -42,7 +39,7 @@ async def test_reset_button(
model: Model,
entity_id: str,
coils: dict[int, Any],
freezer: FrozenDateTimeFactory,
freezer_ticker: Any,
):
"""Test reset button."""
@ -61,9 +58,7 @@ async def test_reset_button(
# Signal alarm
coils[unit.alarm] = 100
freezer.tick(60)
async_fire_time_changed(hass)
await hass.async_block_till_done()
await freezer_ticker(60)
state = hass.states.get(entity_id)
assert state

View file

@ -0,0 +1,58 @@
"""Test the Nibe Heat Pump config flow."""
from typing import Any
from unittest.mock import patch
from nibe.coil_groups import CLIMATE_COILGROUPS, UNIT_COILGROUPS
from nibe.heatpump import Model
import pytest
from syrupy import SnapshotAssertion
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from . import MockConnection, async_add_model
@pytest.fixture(autouse=True)
async def fixture_single_platform():
"""Only allow this platform to load."""
with patch("homeassistant.components.nibe_heatpump.PLATFORMS", [Platform.CLIMATE]):
yield
@pytest.mark.parametrize(
("model", "climate_id", "entity_id"),
[
(Model.S320, "s1", "climate.climate_system_s1"),
],
)
async def test_basic(
hass: HomeAssistant,
mock_connection: MockConnection,
model: Model,
climate_id: str,
entity_id: str,
coils: dict[int, Any],
entity_registry_enabled_by_default: None,
snapshot: SnapshotAssertion,
) -> None:
"""Test setting of value."""
climate = CLIMATE_COILGROUPS[model.series][climate_id]
unit = UNIT_COILGROUPS[model.series]["main"]
if climate.active_accessory is not None:
coils[climate.active_accessory] = "ON"
coils[climate.current] = 20.5
coils[climate.setpoint_heat] = 21.0
coils[climate.setpoint_cool] = 30.0
coils[climate.mixing_valve_state] = "ON"
coils[climate.use_room_sensor] = "ON"
coils[unit.prio] = "HEAT"
coils[unit.cooling_with_room_sensor] = "ON"
await async_add_model(hass, model)
assert hass.states.get(entity_id) == snapshot(name="1. initial")
mock_connection.mock_coil_update(unit.prio, "OFF")
assert hass.states.get(entity_id) == snapshot(name="2. idle")

View file

@ -1,7 +1,7 @@
"""Test the Nibe Heat Pump config flow."""
from unittest.mock import Mock
from typing import Any
from unittest.mock import AsyncMock, Mock
from nibe.coil import Coil
from nibe.exceptions import (
AddressInUseException,
CoilNotFoundException,
@ -54,16 +54,12 @@ async def _get_connection_form(
async def test_nibegw_form(
hass: HomeAssistant, mock_connection: Mock, mock_setup_entry: Mock
hass: HomeAssistant, coils: dict[int, Any], mock_setup_entry: Mock
) -> None:
"""Test we get the form."""
result = await _get_connection_form(hass, "nibegw")
coil_wordswap = Coil(
48852, "modbus40-word-swap-48852", "Modbus40 Word Swap", "u8", min=0, max=1
)
coil_wordswap.value = "ON"
mock_connection.read_coil.return_value = coil_wordswap
coils[48852] = 1
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_NIBEGW_USERDATA
@ -85,16 +81,12 @@ async def test_nibegw_form(
async def test_modbus_form(
hass: HomeAssistant, mock_connection: Mock, mock_setup_entry: Mock
hass: HomeAssistant, coils: dict[int, Any], mock_setup_entry: Mock
) -> None:
"""Test we get the form."""
result = await _get_connection_form(hass, "modbus")
coil = Coil(
40022, "reset-alarm-40022", "Reset Alarm", "u8", min=0, max=1, write=True
)
coil.value = "ON"
mock_connection.read_coil.return_value = coil
coils[40022] = 1
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_MODBUS_USERDATA
@ -113,12 +105,12 @@ async def test_modbus_form(
async def test_modbus_invalid_url(
hass: HomeAssistant, mock_connection_constructor: Mock
hass: HomeAssistant, mock_connection_construct: Mock
) -> None:
"""Test we handle invalid auth."""
result = await _get_connection_form(hass, "modbus")
mock_connection_constructor.side_effect = ValueError()
mock_connection_construct.side_effect = ValueError()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {**MOCK_FLOW_MODBUS_USERDATA, "modbus_url": "invalid://url"}
)
@ -131,6 +123,7 @@ async def test_nibegw_address_inuse(hass: HomeAssistant, mock_connection: Mock)
"""Test we handle invalid auth."""
result = await _get_connection_form(hass, "nibegw")
mock_connection.start = AsyncMock()
mock_connection.start.side_effect = AddressInUseException()
result2 = await hass.config_entries.flow.async_configure(

View file

@ -0,0 +1,130 @@
"""Test the Nibe Heat Pump config flow."""
import asyncio
from typing import Any
from unittest.mock import patch
from nibe.coil import Coil, CoilData
from nibe.heatpump import Model
import pytest
from syrupy import SnapshotAssertion
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from . import MockConnection, async_add_model
@pytest.fixture(autouse=True)
async def fixture_single_platform():
"""Only allow this platform to load."""
with patch("homeassistant.components.nibe_heatpump.PLATFORMS", [Platform.NUMBER]):
yield
async def test_partial_refresh(
hass: HomeAssistant,
coils: dict[int, Any],
entity_registry_enabled_by_default: None,
snapshot: SnapshotAssertion,
) -> None:
"""Test that coordinator can handle partial fields."""
coils[40031] = 10
coils[40035] = None
coils[40039] = 10
await async_add_model(hass, Model.S320)
data = hass.states.get("number.heating_offset_climate_system_1_40031")
assert data == snapshot(name="1. Sensor is available")
data = hass.states.get("number.min_supply_climate_system_1_40035")
assert data == snapshot(name="2. Sensor is not available")
data = hass.states.get("number.max_supply_climate_system_1_40035")
assert data == snapshot(name="3. Sensor is available")
async def test_invalid_coil(
hass: HomeAssistant,
coils: dict[int, Any],
entity_registry_enabled_by_default: None,
snapshot: SnapshotAssertion,
freezer_ticker: Any,
) -> None:
"""That update coordinator correctly marks entities unavailable with missing coils."""
entity_id = "number.heating_offset_climate_system_1_40031"
coil_id = 40031
coils[coil_id] = 10
await async_add_model(hass, Model.S320)
assert hass.states.get(entity_id) == snapshot(name="Sensor is available")
coils.pop(coil_id)
await freezer_ticker(60)
assert hass.states.get(entity_id) == snapshot(name="Sensor is not available")
async def test_pushed_update(
hass: HomeAssistant,
coils: dict[int, Any],
entity_registry_enabled_by_default: None,
snapshot: SnapshotAssertion,
mock_connection: MockConnection,
freezer_ticker: Any,
) -> None:
"""Test out of band pushed value, update directly and seed the next update."""
entity_id = "number.heating_offset_climate_system_1_40031"
coil_id = 40031
coils[coil_id] = 10
await async_add_model(hass, Model.S320)
assert hass.states.get(entity_id) == snapshot(name="1. initial values")
mock_connection.mock_coil_update(coil_id, 20)
assert hass.states.get(entity_id) == snapshot(name="2. pushed values")
coils[coil_id] = 30
await freezer_ticker(60)
assert hass.states.get(entity_id) == snapshot(name="3. seeded values")
await freezer_ticker(60)
assert hass.states.get(entity_id) == snapshot(name="4. final values")
async def test_shutdown(
hass: HomeAssistant,
coils: dict[int, Any],
entity_registry_enabled_by_default: None,
mock_connection: MockConnection,
freezer_ticker: Any,
) -> None:
"""Check that shutdown, cancel a long running update."""
coils[40031] = 10
entry = await async_add_model(hass, Model.S320)
mock_connection.start.assert_called_once()
done = asyncio.Event()
hang = asyncio.Event()
async def _read_coil_hang(coil: Coil, timeout: float = 0) -> CoilData:
try:
hang.set()
await done.wait() # infinite wait
except asyncio.CancelledError:
done.set()
mock_connection.read_coil = _read_coil_hang
await freezer_ticker(60, block=False)
await hang.wait()
await hass.config_entries.async_unload(entry.entry_id)
assert done.is_set()
mock_connection.stop.assert_called_once()