ESPHome trigger reconnect immediately when mDNS record received (#48129)
This commit is contained in:
parent
46a3b80a2d
commit
0193f16ae9
2 changed files with 207 additions and 64 deletions
|
@ -18,6 +18,7 @@ from aioesphomeapi import (
|
||||||
UserServiceArgType,
|
UserServiceArgType,
|
||||||
)
|
)
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
from zeroconf import DNSPointer, DNSRecord, RecordUpdateListener, Zeroconf
|
||||||
|
|
||||||
from homeassistant import const
|
from homeassistant import const
|
||||||
from homeassistant.components import zeroconf
|
from homeassistant.components import zeroconf
|
||||||
|
@ -199,7 +200,9 @@ async def async_setup_entry(hass: HomeAssistantType, entry: ConfigEntry) -> bool
|
||||||
# Re-connection logic will trigger after this
|
# Re-connection logic will trigger after this
|
||||||
await cli.disconnect()
|
await cli.disconnect()
|
||||||
|
|
||||||
try_connect = await _setup_auto_reconnect_logic(hass, cli, entry, host, on_login)
|
reconnect_logic = ReconnectLogic(
|
||||||
|
hass, cli, entry, host, on_login, zeroconf_instance
|
||||||
|
)
|
||||||
|
|
||||||
async def complete_setup() -> None:
|
async def complete_setup() -> None:
|
||||||
"""Complete the config entry setup."""
|
"""Complete the config entry setup."""
|
||||||
|
@ -207,85 +210,228 @@ async def async_setup_entry(hass: HomeAssistantType, entry: ConfigEntry) -> bool
|
||||||
await entry_data.async_update_static_infos(hass, entry, infos)
|
await entry_data.async_update_static_infos(hass, entry, infos)
|
||||||
await _setup_services(hass, entry_data, services)
|
await _setup_services(hass, entry_data, services)
|
||||||
|
|
||||||
# Create connection attempt outside of HA's tracked task in order
|
await reconnect_logic.start()
|
||||||
# not to delay startup.
|
entry_data.cleanup_callbacks.append(reconnect_logic.stop_callback)
|
||||||
hass.loop.create_task(try_connect(is_disconnect=False))
|
|
||||||
|
|
||||||
hass.async_create_task(complete_setup())
|
hass.async_create_task(complete_setup())
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
async def _setup_auto_reconnect_logic(
|
class ReconnectLogic(RecordUpdateListener):
|
||||||
hass: HomeAssistantType, cli: APIClient, entry: ConfigEntry, host: str, on_login
|
"""Reconnectiong logic handler for ESPHome config entries.
|
||||||
):
|
|
||||||
"""Set up the re-connect logic for the API client."""
|
|
||||||
|
|
||||||
async def try_connect(tries: int = 0, is_disconnect: bool = True) -> None:
|
Contains two reconnect strategies:
|
||||||
"""Try connecting to the API client. Will retry if not successful."""
|
- Connect with increasing time between connection attempts.
|
||||||
if entry.entry_id not in hass.data[DOMAIN]:
|
- Listen to zeroconf mDNS records, if any records are found for this device, try reconnecting immediately.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
hass: HomeAssistantType,
|
||||||
|
cli: APIClient,
|
||||||
|
entry: ConfigEntry,
|
||||||
|
host: str,
|
||||||
|
on_login,
|
||||||
|
zc: Zeroconf,
|
||||||
|
):
|
||||||
|
"""Initialize ReconnectingLogic."""
|
||||||
|
self._hass = hass
|
||||||
|
self._cli = cli
|
||||||
|
self._entry = entry
|
||||||
|
self._host = host
|
||||||
|
self._on_login = on_login
|
||||||
|
self._zc = zc
|
||||||
|
# Flag to check if the device is connected
|
||||||
|
self._connected = True
|
||||||
|
self._connected_lock = asyncio.Lock()
|
||||||
|
# Event the different strategies use for issuing a reconnect attempt.
|
||||||
|
self._reconnect_event = asyncio.Event()
|
||||||
|
# The task containing the infinite reconnect loop while running
|
||||||
|
self._loop_task: asyncio.Task | None = None
|
||||||
|
# How many reconnect attempts have there been already, used for exponential wait time
|
||||||
|
self._tries = 0
|
||||||
|
self._tries_lock = asyncio.Lock()
|
||||||
|
# Track the wait task to cancel it on HA shutdown
|
||||||
|
self._wait_task: asyncio.Task | None = None
|
||||||
|
self._wait_task_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _entry_data(self) -> RuntimeEntryData | None:
|
||||||
|
return self._hass.data[DOMAIN].get(self._entry.entry_id)
|
||||||
|
|
||||||
|
async def _on_disconnect(self):
|
||||||
|
"""Log and issue callbacks when disconnecting."""
|
||||||
|
if self._entry_data is None:
|
||||||
|
return
|
||||||
|
# This can happen often depending on WiFi signal strength.
|
||||||
|
# So therefore all these connection warnings are logged
|
||||||
|
# as infos. The "unavailable" logic will still trigger so the
|
||||||
|
# user knows if the device is not connected.
|
||||||
|
_LOGGER.info("Disconnected from ESPHome API for %s", self._host)
|
||||||
|
|
||||||
|
# Run disconnect hooks
|
||||||
|
for disconnect_cb in self._entry_data.disconnect_callbacks:
|
||||||
|
disconnect_cb()
|
||||||
|
self._entry_data.disconnect_callbacks = []
|
||||||
|
self._entry_data.available = False
|
||||||
|
self._entry_data.async_update_device_state(self._hass)
|
||||||
|
|
||||||
|
# Reset tries
|
||||||
|
async with self._tries_lock:
|
||||||
|
self._tries = 0
|
||||||
|
# Connected needs to be reset before the reconnect event (opposite order of check)
|
||||||
|
async with self._connected_lock:
|
||||||
|
self._connected = False
|
||||||
|
self._reconnect_event.set()
|
||||||
|
|
||||||
|
async def _wait_and_start_reconnect(self):
|
||||||
|
"""Wait for exponentially increasing time to issue next reconnect event."""
|
||||||
|
async with self._tries_lock:
|
||||||
|
tries = self._tries
|
||||||
|
# If not first re-try, wait and print message
|
||||||
|
# Cap wait time at 1 minute. This is because while working on the
|
||||||
|
# device (e.g. soldering stuff), users don't want to have to wait
|
||||||
|
# a long time for their device to show up in HA again (this was
|
||||||
|
# mentioned a lot in early feedback)
|
||||||
|
tries = min(tries, 10) # prevent OverflowError
|
||||||
|
wait_time = int(round(min(1.8 ** tries, 60.0)))
|
||||||
|
if tries == 1:
|
||||||
|
_LOGGER.info("Trying to reconnect to %s in the background", self._host)
|
||||||
|
_LOGGER.debug("Retrying %s in %d seconds", self._host, wait_time)
|
||||||
|
await asyncio.sleep(wait_time)
|
||||||
|
async with self._wait_task_lock:
|
||||||
|
self._wait_task = None
|
||||||
|
self._reconnect_event.set()
|
||||||
|
|
||||||
|
async def _try_connect(self):
|
||||||
|
"""Try connecting to the API client."""
|
||||||
|
async with self._tries_lock:
|
||||||
|
tries = self._tries
|
||||||
|
self._tries += 1
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._cli.connect(on_stop=self._on_disconnect, login=True)
|
||||||
|
except APIConnectionError as error:
|
||||||
|
level = logging.WARNING if tries == 0 else logging.DEBUG
|
||||||
|
_LOGGER.log(
|
||||||
|
level,
|
||||||
|
"Can't connect to ESPHome API for %s (%s): %s",
|
||||||
|
self._entry.unique_id,
|
||||||
|
self._host,
|
||||||
|
error,
|
||||||
|
)
|
||||||
|
# Schedule re-connect in event loop in order not to delay HA
|
||||||
|
# startup. First connect is scheduled in tracked tasks.
|
||||||
|
async with self._wait_task_lock:
|
||||||
|
# Allow only one wait task at a time
|
||||||
|
# can happen if mDNS record received while waiting, then use existing wait task
|
||||||
|
if self._wait_task is not None:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._wait_task = self._hass.loop.create_task(
|
||||||
|
self._wait_and_start_reconnect()
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
_LOGGER.info("Successfully connected to %s", self._host)
|
||||||
|
async with self._tries_lock:
|
||||||
|
self._tries = 0
|
||||||
|
async with self._connected_lock:
|
||||||
|
self._connected = True
|
||||||
|
self._hass.async_create_task(self._on_login())
|
||||||
|
|
||||||
|
async def _reconnect_once(self):
|
||||||
|
# Wait and clear reconnection event
|
||||||
|
await self._reconnect_event.wait()
|
||||||
|
self._reconnect_event.clear()
|
||||||
|
|
||||||
|
# If in connected state, do not try to connect again.
|
||||||
|
async with self._connected_lock:
|
||||||
|
if self._connected:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check if the entry got removed or disabled, in which case we shouldn't reconnect
|
||||||
|
if self._entry.entry_id not in self._hass.data[DOMAIN]:
|
||||||
# When removing/disconnecting manually
|
# When removing/disconnecting manually
|
||||||
return
|
return
|
||||||
|
|
||||||
device_registry = await hass.helpers.device_registry.async_get_registry()
|
device_registry = self._hass.helpers.device_registry.async_get(self._hass)
|
||||||
devices = dr.async_entries_for_config_entry(device_registry, entry.entry_id)
|
devices = dr.async_entries_for_config_entry(
|
||||||
|
device_registry, self._entry.entry_id
|
||||||
|
)
|
||||||
for device in devices:
|
for device in devices:
|
||||||
# There is only one device in ESPHome
|
# There is only one device in ESPHome
|
||||||
if device.disabled:
|
if device.disabled:
|
||||||
# Don't attempt to connect if it's disabled
|
# Don't attempt to connect if it's disabled
|
||||||
return
|
return
|
||||||
|
|
||||||
data: RuntimeEntryData = hass.data[DOMAIN][entry.entry_id]
|
await self._try_connect()
|
||||||
for disconnect_cb in data.disconnect_callbacks:
|
|
||||||
disconnect_cb()
|
|
||||||
data.disconnect_callbacks = []
|
|
||||||
data.available = False
|
|
||||||
data.async_update_device_state(hass)
|
|
||||||
|
|
||||||
if is_disconnect:
|
|
||||||
# This can happen often depending on WiFi signal strength.
|
|
||||||
# So therefore all these connection warnings are logged
|
|
||||||
# as infos. The "unavailable" logic will still trigger so the
|
|
||||||
# user knows if the device is not connected.
|
|
||||||
_LOGGER.info("Disconnected from ESPHome API for %s", host)
|
|
||||||
|
|
||||||
if tries != 0:
|
|
||||||
# If not first re-try, wait and print message
|
|
||||||
# Cap wait time at 1 minute. This is because while working on the
|
|
||||||
# device (e.g. soldering stuff), users don't want to have to wait
|
|
||||||
# a long time for their device to show up in HA again (this was
|
|
||||||
# mentioned a lot in early feedback)
|
|
||||||
#
|
|
||||||
# In the future another API will be set up so that the ESP can
|
|
||||||
# notify HA of connectivity directly, but for new we'll use a
|
|
||||||
# really short reconnect interval.
|
|
||||||
tries = min(tries, 10) # prevent OverflowError
|
|
||||||
wait_time = int(round(min(1.8 ** tries, 60.0)))
|
|
||||||
if tries == 1:
|
|
||||||
_LOGGER.info("Trying to reconnect to %s in the background", host)
|
|
||||||
_LOGGER.debug("Retrying %s in %d seconds", host, wait_time)
|
|
||||||
await asyncio.sleep(wait_time)
|
|
||||||
|
|
||||||
|
async def _reconnect_loop(self):
|
||||||
|
while True:
|
||||||
try:
|
try:
|
||||||
await cli.connect(on_stop=try_connect, login=True)
|
await self._reconnect_once()
|
||||||
except APIConnectionError as error:
|
except asyncio.CancelledError: # pylint: disable=try-except-raise
|
||||||
level = logging.WARNING if tries == 0 else logging.DEBUG
|
raise
|
||||||
_LOGGER.log(
|
except Exception: # pylint: disable=broad-except
|
||||||
level,
|
_LOGGER.error("Caught exception while reconnecting", exc_info=True)
|
||||||
"Can't connect to ESPHome API for %s (%s): %s",
|
|
||||||
entry.unique_id,
|
|
||||||
host,
|
|
||||||
error,
|
|
||||||
)
|
|
||||||
# Schedule re-connect in event loop in order not to delay HA
|
|
||||||
# startup. First connect is scheduled in tracked tasks.
|
|
||||||
data.reconnect_task = hass.loop.create_task(
|
|
||||||
try_connect(tries + 1, is_disconnect=False)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
_LOGGER.info("Successfully connected to %s", host)
|
|
||||||
hass.async_create_task(on_login())
|
|
||||||
|
|
||||||
return try_connect
|
async def start(self):
|
||||||
|
"""Start the reconnecting logic background task."""
|
||||||
|
# Create reconnection loop outside of HA's tracked tasks in order
|
||||||
|
# not to delay startup.
|
||||||
|
self._loop_task = self._hass.loop.create_task(self._reconnect_loop())
|
||||||
|
# Listen for mDNS records so we can reconnect directly if a received mDNS record
|
||||||
|
# indicates the node is up again
|
||||||
|
await self._hass.async_add_executor_job(self._zc.add_listener, self, None)
|
||||||
|
|
||||||
|
async with self._connected_lock:
|
||||||
|
self._connected = False
|
||||||
|
self._reconnect_event.set()
|
||||||
|
|
||||||
|
async def stop(self):
|
||||||
|
"""Stop the reconnecting logic background task. Does not disconnect the client."""
|
||||||
|
if self._loop_task is not None:
|
||||||
|
self._loop_task.cancel()
|
||||||
|
self._loop_task = None
|
||||||
|
await self._hass.async_add_executor_job(self._zc.remove_listener, self)
|
||||||
|
async with self._wait_task_lock:
|
||||||
|
if self._wait_task is not None:
|
||||||
|
self._wait_task.cancel()
|
||||||
|
self._wait_task = None
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def stop_callback(self):
|
||||||
|
"""Stop as an async callback function."""
|
||||||
|
self._hass.async_create_task(self.stop())
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def _set_reconnect(self):
|
||||||
|
self._reconnect_event.set()
|
||||||
|
|
||||||
|
def update_record(self, zc: Zeroconf, now: float, record: DNSRecord) -> None:
|
||||||
|
"""Listen to zeroconf updated mDNS records."""
|
||||||
|
if not isinstance(record, DNSPointer):
|
||||||
|
# We only consider PTR records and match using the alias name
|
||||||
|
return
|
||||||
|
if self._entry_data is None or self._entry_data.device_info is None:
|
||||||
|
# Either the entry was already teared down or we haven't received device info yet
|
||||||
|
return
|
||||||
|
filter_alias = f"{self._entry_data.device_info.name}._esphomelib._tcp.local."
|
||||||
|
if record.alias != filter_alias:
|
||||||
|
return
|
||||||
|
|
||||||
|
# This is a mDNS record from the device and could mean it just woke up
|
||||||
|
# Check if already connected, no lock needed for this access
|
||||||
|
if self._connected:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Tell reconnection logic to retry connection attempt now (even before reconnect timer finishes)
|
||||||
|
_LOGGER.debug(
|
||||||
|
"%s: Triggering reconnect because of received mDNS record %s",
|
||||||
|
self._host,
|
||||||
|
record,
|
||||||
|
)
|
||||||
|
self._hass.add_job(self._set_reconnect)
|
||||||
|
|
||||||
|
|
||||||
async def _async_setup_device_registry(
|
async def _async_setup_device_registry(
|
||||||
|
@ -421,8 +567,6 @@ async def _cleanup_instance(
|
||||||
) -> RuntimeEntryData:
|
) -> RuntimeEntryData:
|
||||||
"""Cleanup the esphome client if it exists."""
|
"""Cleanup the esphome client if it exists."""
|
||||||
data: RuntimeEntryData = hass.data[DOMAIN].pop(entry.entry_id)
|
data: RuntimeEntryData = hass.data[DOMAIN].pop(entry.entry_id)
|
||||||
if data.reconnect_task is not None:
|
|
||||||
data.reconnect_task.cancel()
|
|
||||||
for disconnect_cb in data.disconnect_callbacks:
|
for disconnect_cb in data.disconnect_callbacks:
|
||||||
disconnect_cb()
|
disconnect_cb()
|
||||||
for cleanup_callback in data.cleanup_callbacks:
|
for cleanup_callback in data.cleanup_callbacks:
|
||||||
|
|
|
@ -54,7 +54,6 @@ class RuntimeEntryData:
|
||||||
entry_id: str = attr.ib()
|
entry_id: str = attr.ib()
|
||||||
client: APIClient = attr.ib()
|
client: APIClient = attr.ib()
|
||||||
store: Store = attr.ib()
|
store: Store = attr.ib()
|
||||||
reconnect_task: asyncio.Task | None = attr.ib(default=None)
|
|
||||||
state: dict[str, dict[str, Any]] = attr.ib(factory=dict)
|
state: dict[str, dict[str, Any]] = attr.ib(factory=dict)
|
||||||
info: dict[str, dict[str, Any]] = attr.ib(factory=dict)
|
info: dict[str, dict[str, Any]] = attr.ib(factory=dict)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue