Support for nibe heat pumps with local access (#78542)

* Add nibe local integration

* Add sensor platform

* Enable sensor platform

* Fix manifest

* Correct domain after rename

* Adjust tests for rename

* Correct codeowners

* Add requirements for tests

* Grab coil by name

* Switch to home assistant error

* Config entry always exist

* Switch to create task

* Bump to 0.5.0

* Use new coils access

* Remove unneeded check

* Use single instance of logger

* Test invalid ip

* Don't allow coil to be None

* Remove sleep

* Initialize data in coordinator init

* Add utils to ignore

* Update homeassistant/components/nibe_heatpump/manifest.json

Co-authored-by: J. Nick Koston <nick@koston.org>

* Use generator instead

* Use tenacity as retry decorator

* Use package instead of name to get logger

* Skip broad exception handling

* Catch missing coil exception

* Add missing test

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Joakim Plate 2022-09-22 08:43:30 +02:00 committed by GitHub
parent 39315b7fe3
commit f5120872aa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 696 additions and 0 deletions

View file

@ -833,6 +833,8 @@ omit =
homeassistant/components/nextcloud/*
homeassistant/components/nfandroidtv/__init__.py
homeassistant/components/nfandroidtv/notify.py
homeassistant/components/nibe_heatpump/__init__.py
homeassistant/components/nibe_heatpump/sensor.py
homeassistant/components/niko_home_control/light.py
homeassistant/components/nilu/air_quality.py
homeassistant/components/nissan_leaf/*

View file

@ -747,6 +747,8 @@ build.json @home-assistant/supervisor
/tests/components/nextdns/ @bieniu
/homeassistant/components/nfandroidtv/ @tkdrob
/tests/components/nfandroidtv/ @tkdrob
/homeassistant/components/nibe_heatpump/ @elupus
/tests/components/nibe_heatpump/ @elupus
/homeassistant/components/nightscout/ @marciogranzotto
/tests/components/nightscout/ @marciogranzotto
/homeassistant/components/nilu/ @hfurubotten

View file

@ -0,0 +1,226 @@
"""The Nibe Heat Pump integration."""
from __future__ import annotations
from datetime import timedelta
from nibe.coil import Coil
from nibe.connection import Connection
from nibe.connection.nibegw import NibeGW
from nibe.exceptions import CoilNotFoundException, CoilReadException
from nibe.heatpump import HeatPump, Model
from tenacity import RetryError, retry, retry_if_exception_type, stop_after_attempt
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_IP_ADDRESS, CONF_MODEL, Platform
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady, HomeAssistantError
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.entity import DeviceInfo, async_generate_entity_id
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from .const import (
CONF_CONNECTION_TYPE,
CONF_CONNECTION_TYPE_NIBEGW,
CONF_LISTENING_PORT,
CONF_REMOTE_READ_PORT,
CONF_REMOTE_WRITE_PORT,
CONF_WORD_SWAP,
DOMAIN,
LOGGER,
)
PLATFORMS: list[Platform] = [Platform.SENSOR]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Nibe Heat Pump from a config entry."""
heatpump = HeatPump(Model[entry.data[CONF_MODEL]])
heatpump.word_swap = entry.data[CONF_WORD_SWAP]
heatpump.initialize()
connection_type = entry.data[CONF_CONNECTION_TYPE]
if connection_type == CONF_CONNECTION_TYPE_NIBEGW:
connection = NibeGW(
heatpump,
entry.data[CONF_IP_ADDRESS],
entry.data[CONF_REMOTE_READ_PORT],
entry.data[CONF_REMOTE_WRITE_PORT],
listening_port=entry.data[CONF_LISTENING_PORT],
)
else:
raise HomeAssistantError(f"Connection type {connection_type} is not supported.")
await connection.start()
coordinator = Coordinator(hass, heatpump, connection)
data = hass.data.setdefault(DOMAIN, {})
data[entry.entry_id] = coordinator
try:
await coordinator.async_config_entry_first_refresh()
except ConfigEntryNotReady:
await connection.stop()
raise
reg = dr.async_get(hass)
reg.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={(DOMAIN, entry.unique_id or entry.entry_id)},
manufacturer="NIBE Energy Systems",
model=heatpump.model.name,
name=heatpump.model.name,
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# Trigger a refresh again now that all platforms have registered
hass.async_create_task(coordinator.async_refresh())
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
coordinator: Coordinator = hass.data[DOMAIN].pop(entry.entry_id)
await coordinator.connection.stop()
return unload_ok
class Coordinator(DataUpdateCoordinator[dict[int, Coil]]):
"""Update coordinator for nibe heat pumps."""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
heatpump: HeatPump,
connection: Connection,
) -> None:
"""Initialize coordinator."""
super().__init__(
hass, LOGGER, name="Nibe Heat Pump", update_interval=timedelta(seconds=60)
)
self.data = {}
self.connection = connection
self.heatpump = heatpump
@property
def coils(self) -> list[Coil]:
"""Return the full coil database."""
return self.heatpump.get_coils()
@property
def unique_id(self) -> str:
"""Return unique id for this coordinator."""
return self.config_entry.unique_id or self.config_entry.entry_id
@property
def device_info(self) -> DeviceInfo:
"""Return device information for the main device."""
return DeviceInfo(identifiers={(DOMAIN, self.unique_id)})
def get_coil_value(self, coil: Coil) -> int | str | float | None:
"""Return a coil with data and check for validity."""
if coil := self.data.get(coil.address):
return coil.value
return None
def get_coil_float(self, coil: Coil) -> float | None:
"""Return a coil with float and check for validity."""
if value := self.get_coil_value(coil):
return float(value)
return None
async def async_write_coil(
self, coil: Coil | None, value: int | float | str
) -> None:
"""Write coil and update state."""
if not coil:
raise HomeAssistantError("No coil available")
coil.value = value
coil = await self.connection.write_coil(coil)
if self.data:
self.data[coil.address] = coil
self.async_update_listeners()
async def _async_update_data(self) -> dict[int, Coil]:
@retry(
retry=retry_if_exception_type(CoilReadException), stop=stop_after_attempt(2)
)
async def read_coil(coil: Coil):
return await self.connection.read_coil(coil)
callbacks: dict[int, list[CALLBACK_TYPE]] = {}
for update_callback, context in list(self._listeners.values()):
assert isinstance(context, set)
for address in context:
callbacks.setdefault(address, []).append(update_callback)
result: dict[int, Coil] = {}
for address, callback_list in callbacks.items():
try:
coil = self.heatpump.get_coil_by_address(address)
self.data[coil.address] = result[coil.address] = await read_coil(coil)
except (CoilReadException, RetryError) as exception:
self.logger.warning("Failed to update: %s", exception)
except CoilNotFoundException as exception:
self.logger.debug("Skipping missing coil: %s", exception)
for update_callback in callback_list:
update_callback()
return result
class CoilEntity(CoordinatorEntity[Coordinator]):
"""Base for coil based entities."""
_attr_has_entity_name = True
_attr_entity_registry_enabled_default = False
def __init__(
self, coordinator: Coordinator, coil: Coil, entity_format: str
) -> None:
"""Initialize base entity."""
super().__init__(coordinator, {coil.address})
self.entity_id = async_generate_entity_id(
entity_format, coil.name, hass=coordinator.hass
)
self._attr_name = coil.title
self._attr_unique_id = f"{coordinator.unique_id}-{coil.address}"
self._attr_device_info = coordinator.device_info
self._coil = coil
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self._coil.address in (
self.coordinator.data or {}
)
def _async_read_coil(self, coil: Coil):
"""Update state of entity based on coil data."""
async def _async_write_coil(self, value: int | float | str):
"""Write coil and update state."""
await self.coordinator.async_write_coil(self._coil, value)
def _handle_coordinator_update(self) -> None:
coil = self.coordinator.data.get(self._coil.address)
if coil is None:
return
self._coil = coil
self._async_read_coil(coil)
self.async_write_ha_state()

View file

@ -0,0 +1,133 @@
"""Config flow for Nibe Heat Pump integration."""
from __future__ import annotations
import errno
from typing import Any
from nibe.connection.nibegw import NibeGW
from nibe.exceptions import CoilNotFoundException, CoilReadException, CoilWriteException
from nibe.heatpump import HeatPump, Model
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_IP_ADDRESS, CONF_MODEL
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import config_validation as cv
from homeassistant.util.network import is_ipv4_address
from .const import (
CONF_CONNECTION_TYPE,
CONF_CONNECTION_TYPE_NIBEGW,
CONF_LISTENING_PORT,
CONF_REMOTE_READ_PORT,
CONF_REMOTE_WRITE_PORT,
CONF_WORD_SWAP,
DOMAIN,
LOGGER,
)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_MODEL): vol.In([e.name for e in Model]),
vol.Required(CONF_IP_ADDRESS): str,
vol.Required(CONF_LISTENING_PORT): cv.port,
vol.Required(CONF_REMOTE_READ_PORT): cv.port,
vol.Required(CONF_REMOTE_WRITE_PORT): cv.port,
}
)
class FieldError(Exception):
"""Field with invalid data."""
def __init__(self, message: str, field: str, error: str) -> None:
"""Set up error."""
super().__init__(message)
self.field = field
self.error = error
async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, Any]:
"""Validate the user input allows us to connect."""
if not is_ipv4_address(data[CONF_IP_ADDRESS]):
raise FieldError("Not a valid ipv4 address", CONF_IP_ADDRESS, "address")
heatpump = HeatPump(Model[data[CONF_MODEL]])
heatpump.initialize()
connection = NibeGW(
heatpump,
data[CONF_IP_ADDRESS],
data[CONF_REMOTE_READ_PORT],
data[CONF_REMOTE_WRITE_PORT],
listening_port=data[CONF_LISTENING_PORT],
)
try:
await connection.start()
except OSError as exception:
if exception.errno == errno.EADDRINUSE:
raise FieldError(
"Address already in use", "listening_port", "address_in_use"
) from exception
raise
try:
coil = heatpump.get_coil_by_name("modbus40-word-swap-48852")
coil = await connection.read_coil(coil)
word_swap = coil.value == "ON"
coil = await connection.write_coil(coil)
except CoilNotFoundException as exception:
raise FieldError(
"Model selected doesn't seem to support expected coils", "base", "model"
) from exception
except CoilReadException as exception:
raise FieldError("Timeout on read from pump", "base", "read") from exception
except CoilWriteException as exception:
raise FieldError("Timeout on writing to pump", "base", "write") from exception
finally:
await connection.stop()
return {
"title": f"{data[CONF_MODEL]} at {data[CONF_IP_ADDRESS]}",
CONF_WORD_SWAP: word_swap,
}
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Nibe Heat Pump."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the initial step."""
if user_input is None:
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA
)
errors = {}
try:
info = await validate_input(self.hass, user_input)
except FieldError as exception:
LOGGER.exception("Validation error")
errors[exception.field] = exception.error
except Exception: # pylint: disable=broad-except
LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
data = {
**user_input,
CONF_WORD_SWAP: info[CONF_WORD_SWAP],
CONF_CONNECTION_TYPE: CONF_CONNECTION_TYPE_NIBEGW,
}
return self.async_create_entry(title=info["title"], data=data)
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors
)

View file

@ -0,0 +1,12 @@
"""Constants for the Nibe Heat Pump integration."""
import logging
DOMAIN = "nibe_heatpump"
LOGGER = logging.getLogger(__package__)
CONF_LISTENING_PORT = "listening_port"
CONF_REMOTE_READ_PORT = "remote_read_port"
CONF_REMOTE_WRITE_PORT = "remote_write_port"
CONF_WORD_SWAP = "word_swap"
CONF_CONNECTION_TYPE = "connection_type"
CONF_CONNECTION_TYPE_NIBEGW = "nibegw"

View file

@ -0,0 +1,9 @@
{
"domain": "nibe_heatpump",
"name": "Nibe Heat Pump",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/nibe_heatpump",
"requirements": ["nibe==0.5.0", "tenacity==8.0.1"],
"codeowners": ["@elupus"],
"iot_class": "local_polling"
}

View file

@ -0,0 +1,77 @@
"""The Nibe Heat Pump sensors."""
from __future__ import annotations
from nibe.coil import Coil
from homeassistant.components.sensor import (
ENTITY_ID_FORMAT,
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ELECTRIC_CURRENT_AMPERE,
ELECTRIC_CURRENT_MILLIAMPERE,
ELECTRIC_POTENTIAL_MILLIVOLT,
ELECTRIC_POTENTIAL_VOLT,
ENERGY_KILO_WATT_HOUR,
ENERGY_MEGA_WATT_HOUR,
ENERGY_WATT_HOUR,
TEMP_CELSIUS,
TEMP_FAHRENHEIT,
TEMP_KELVIN,
TIME_HOURS,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import DOMAIN, CoilEntity, Coordinator
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up platform."""
coordinator: Coordinator = hass.data[DOMAIN][config_entry.entry_id]
async_add_entities(
Sensor(coordinator, coil)
for coil in coordinator.coils
if not coil.is_writable and not coil.is_boolean
)
class Sensor(SensorEntity, CoilEntity):
"""Sensor entity."""
_attr_entity_category = EntityCategory.DIAGNOSTIC
def __init__(self, coordinator: Coordinator, coil: Coil) -> None:
"""Initialize entity."""
super().__init__(coordinator, coil, ENTITY_ID_FORMAT)
self._attr_native_unit_of_measurement = coil.unit
unit = self.native_unit_of_measurement
if unit in {TEMP_CELSIUS, TEMP_FAHRENHEIT, TEMP_KELVIN}:
self._attr_device_class = SensorDeviceClass.TEMPERATURE
elif unit in {ELECTRIC_CURRENT_AMPERE, ELECTRIC_CURRENT_MILLIAMPERE}:
self._attr_device_class = SensorDeviceClass.CURRENT
elif unit in {ELECTRIC_POTENTIAL_VOLT, ELECTRIC_POTENTIAL_MILLIVOLT}:
self._attr_device_class = SensorDeviceClass.VOLTAGE
elif unit in {ENERGY_WATT_HOUR, ENERGY_KILO_WATT_HOUR, ENERGY_MEGA_WATT_HOUR}:
self._attr_device_class = SensorDeviceClass.ENERGY
elif unit in {TIME_HOURS}:
self._attr_device_class = SensorDeviceClass.DURATION
else:
self._attr_device_class = None
if unit:
self._attr_state_class = SensorStateClass.MEASUREMENT
def _async_read_coil(self, coil: Coil):
self._attr_native_value = coil.value

View file

@ -0,0 +1,25 @@
{
"config": {
"step": {
"user": {
"data": {
"ip_address": "Remote IP address",
"remote_read_port": "Remote read port",
"remote_write_port": "Remote write port",
"listening_port": "Local listening port"
}
}
},
"error": {
"write": "Error on write request to pump. Verify your `Remote write port` or `Remote IP address`.",
"read": "Error on read request from pump. Verify your `Remote read port` or `Remote IP address`.",
"address": "Invalid remote IP address specified. Address must be a IPV4 address.",
"address_in_use": "The selected listening port is already in use on this system.",
"model": "The model selected doesn't seem to support modbus40",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
}
}
}

View file

@ -0,0 +1,24 @@
{
"config": {
"abort": {
"already_configured": "Device is already configured"
},
"error": {
"address": "Invalid remote IP address specified. Address must be a IPV4 address.",
"address_in_use": "The selected listening port is already in use on this system. Reconfigure your gateway device to use a different address if the conflict can not be resolved.",
"read": "Error on read request from pump. Verify your `Remote read port` or `Remote IP address`.",
"unknown": "Unexpected error",
"write": "Error on write request to pump. Verify your `Remote write port` or `Remote IP address`."
},
"step": {
"user": {
"data": {
"ip_address": "Remote IP address",
"listening_port": "Local listening port",
"remote_read_port": "Remote read port",
"remote_write_port": "Remote write port"
}
}
}
}
}

View file

@ -254,6 +254,7 @@ FLOWS = {
"nexia",
"nextdns",
"nfandroidtv",
"nibe_heatpump",
"nightscout",
"nina",
"nmap_tracker",

View file

@ -1142,6 +1142,9 @@ nextcord==2.0.0a8
# homeassistant.components.nextdns
nextdns==1.1.1
# homeassistant.components.nibe_heatpump
nibe==0.5.0
# homeassistant.components.niko_home_control
niko-home-control==0.2.1
@ -2369,6 +2372,9 @@ temescal==0.5
# homeassistant.components.temper
temperusb==1.5.3
# homeassistant.components.nibe_heatpump
tenacity==8.0.1
# homeassistant.components.tensorflow
# tensorflow==2.5.0

View file

@ -823,6 +823,9 @@ nextcord==2.0.0a8
# homeassistant.components.nextdns
nextdns==1.1.1
# homeassistant.components.nibe_heatpump
nibe==0.5.0
# homeassistant.components.nfandroidtv
notifications-android-tv==0.1.5
@ -1621,6 +1624,9 @@ tellduslive==0.10.11
# homeassistant.components.lg_soundbar
temescal==0.5
# homeassistant.components.nibe_heatpump
tenacity==8.0.1
# homeassistant.components.powerwall
tesla-powerwall==0.3.18

View file

@ -0,0 +1 @@
"""Tests for the Nibe Heat Pump integration."""

View file

@ -0,0 +1,172 @@
"""Test the Nibe Heat Pump config flow."""
import errno
from unittest.mock import Mock, patch
from nibe.coil import Coil
from nibe.connection import Connection
from nibe.exceptions import CoilNotFoundException, CoilReadException, CoilWriteException
from pytest import fixture
from homeassistant import config_entries
from homeassistant.components.nibe_heatpump import DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
MOCK_FLOW_USERDATA = {
"model": "F1155",
"ip_address": "127.0.0.1",
"listening_port": 9999,
"remote_read_port": 10000,
"remote_write_port": 10001,
}
@fixture(autouse=True, name="mock_connection")
async def fixture_mock_connection():
"""Make sure we have a dummy connection."""
with patch(
"homeassistant.components.nibe_heatpump.config_flow.NibeGW", spec=Connection
) as mock_connection:
yield mock_connection
@fixture(autouse=True, name="mock_setup_entry")
async def fixture_mock_setup():
"""Make sure we never actually run setup."""
with patch(
"homeassistant.components.nibe_heatpump.async_setup_entry", return_value=True
) as mock_setup_entry:
yield mock_setup_entry
async def test_form(
hass: HomeAssistant, mock_connection: Mock, mock_setup_entry: Mock
) -> None:
"""Test we get the form."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == FlowResultType.FORM
assert result["errors"] is None
coil_wordswap = Coil(
48852, "modbus40-word-swap-48852", "Modbus40 Word Swap", "u8", min=0, max=1
)
coil_wordswap.value = "ON"
mock_connection.return_value.read_coil.return_value = coil_wordswap
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_USERDATA
)
await hass.async_block_till_done()
assert result2["type"] == FlowResultType.CREATE_ENTRY
assert result2["title"] == "F1155 at 127.0.0.1"
assert result2["data"] == {
"model": "F1155",
"ip_address": "127.0.0.1",
"listening_port": 9999,
"remote_read_port": 10000,
"remote_write_port": 10001,
"word_swap": True,
"connection_type": "nibegw",
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_address_inuse(hass: HomeAssistant, mock_connection: Mock) -> None:
"""Test we handle invalid auth."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
error = OSError()
error.errno = errno.EADDRINUSE
mock_connection.return_value.start.side_effect = error
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_USERDATA
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"listening_port": "address_in_use"}
async def test_read_timeout(hass: HomeAssistant, mock_connection: Mock) -> None:
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
mock_connection.return_value.read_coil.side_effect = CoilReadException()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_USERDATA
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": "read"}
async def test_write_timeout(hass: HomeAssistant, mock_connection: Mock) -> None:
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
mock_connection.return_value.write_coil.side_effect = CoilWriteException()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_USERDATA
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": "write"}
async def test_unexpected_exception(hass: HomeAssistant, mock_connection: Mock) -> None:
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
mock_connection.return_value.read_coil.side_effect = Exception()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], MOCK_FLOW_USERDATA
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": "unknown"}
async def test_invalid_ip(hass: HomeAssistant, mock_connection: Mock) -> None:
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
mock_connection.return_value.read_coil.side_effect = Exception()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {**MOCK_FLOW_USERDATA, "ip_address": "abcd"}
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"ip_address": "address"}
async def test_model_missing_coil(hass: HomeAssistant, mock_connection: Mock) -> None:
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
mock_connection.return_value.read_coil.side_effect = CoilNotFoundException()
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {**MOCK_FLOW_USERDATA}
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": "model"}