hass-core/homeassistant/components/fyta/coordinator.py
dontinelli 6eaf3402c6
Add re-auth-flow to fyta integration (#114972)
* add re-auth-flow to fyta integration

* add strings for reauth-flow

* resolve typing error

* update based on review comments

* Update homeassistant/components/fyta/config_flow.py

Co-authored-by: G Johansson <goran.johansson@shiftit.se>

* add async_auth

* adjustment based on review commet

* Update test_config_flow.py

* remove credentials

* Update homeassistant/components/fyta/config_flow.py

Co-authored-by: G Johansson <goran.johansson@shiftit.se>

* Update tests/components/fyta/test_config_flow.py

Co-authored-by: G Johansson <goran.johansson@shiftit.se>

* Update tests/components/fyta/test_config_flow.py

Co-authored-by: G Johansson <goran.johansson@shiftit.se>

* Update conftest.py

* Update test_config_flow.py

* Aktualisieren von conftest.py

* Update test_config_flow.py

---------

Co-authored-by: G Johansson <goran.johansson@shiftit.se>
2024-04-12 20:33:24 +02:00

55 lines
1.7 KiB
Python

"""Coordinator for FYTA integration."""
from datetime import datetime, timedelta
import logging
from typing import Any
from fyta_cli.fyta_connector import FytaConnector
from fyta_cli.fyta_exceptions import (
FytaAuthentificationError,
FytaConnectionError,
FytaPasswordError,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
class FytaCoordinator(DataUpdateCoordinator[dict[int, dict[str, Any]]]):
"""Fyta custom coordinator."""
config_entry: ConfigEntry
def __init__(self, hass: HomeAssistant, fyta: FytaConnector) -> None:
"""Initialize my coordinator."""
super().__init__(
hass,
_LOGGER,
name="FYTA Coordinator",
update_interval=timedelta(seconds=60),
)
self.fyta = fyta
async def _async_update_data(
self,
) -> dict[int, dict[str, Any]]:
"""Fetch data from API endpoint."""
if self.fyta.expiration is None or self.fyta.expiration < datetime.now():
await self.renew_authentication()
return await self.fyta.update_all_plants()
async def renew_authentication(self) -> None:
"""Renew access token for FYTA API."""
try:
await self.fyta.login()
except FytaConnectionError as ex:
raise ConfigEntryNotReady from ex
except (FytaAuthentificationError, FytaPasswordError) as ex:
raise ConfigEntryAuthFailed from ex