hass-core/homeassistant/components/lektrico/coordinator.py
Lektri.co 5bd736029f
Add lektrico integration (#102371)
* Add Lektrico Integration

* Make the changes proposed by Lash-L: new coordinator.py, new entity.py; use: translation_key, last_update_sucess, PlatformNotReady; remove: global variables

* Replace FlowResult with ConfigFlowResult and add tests.

* Remove unused lines.

* Remove Options from condif_flow

* Fix ruff and mypy.

* Fix CODEOWNERS.

* Run python3 -m script.hassfest.

* Correct rebase mistake.

* Make modifications suggested by emontnemery.

* Add pytest fixtures.

* Remove meaningless patches.

* Update .coveragerc

* Replace CONF_FRIENDLY_NAME with CONF_NAME.

* Remove underscores.

* Update tests.

* Update test file with is and no config_entries. .

* Set serial_number in DeviceInfo and add return type of the async_update_data to DataUpdateCoordinator.

* Use suggested_unit_of_measurement for KILO_WATT and replace Any in value_fn (sensor file).

* Add device class duration to charging_time sensor.

* Change raising  PlatformNotReady to raising IntegrationError.

* Test the unique id of the entry.

* Rename PF Lx with Power factor Lx and remove PF from strings.json.

* Remove comment.

* Make state and limit reason sensors to be enum sensors.

* Use result variable to check unique_id in test.

* Remove CONF_NAME from entry and __init__ from LektricoFlowHandler.

* Remove session parameter from LektricoDeviceDataUpdateCoordinator.

* Use config_entry: ConfigEntry in coordinator.

* Replace Connected,NeedAuth with Waiting for Authentication.

* Use lektricowifi 0.0.29.

* Use lektricowifi 0.0.39

* Use lektricowifi 0.0.40

* Use lektricowifi 0.0.41

* Replace hass.data with entry.runtime_data

* Delete .coveragerc

* Restructure the user step

* Fix tests

* Add returned value of _async_update_data to class DataUpdateCoordinator

* Use hw_version at DeviceInfo

* Remove a variable

* Use StateType

* Replace friendly_name with device_name

* Use sentence case in translation strings

* Uncomment and fix test_discovered_zeroconf

* Add type LektricoConfigEntry

* Remove commented code

* Remove the type of coordinator in sensor async_setup_entry

* Make zeroconf test end in ABORT, not FORM

* Remove all async_block_till_done from tests

* End test_user_setup_device_offline with CREATE_ENTRY

* Patch the full Device

* Add snapshot tests

* Overwrite the type LektricoSensorEntityDescription outside of the constructor

* Test separate already_configured for zeroconf

---------

Co-authored-by: mihaela.tarjoianu <mihaela.tarjoianu@scada.ro>
Co-authored-by: Erik Montnemery <erik@montnemery.com>
2024-08-30 13:20:15 +02:00

52 lines
1.7 KiB
Python

"""Coordinator for the Lektrico Charging Station integration."""
from __future__ import annotations
from datetime import timedelta
from typing import Any
from lektricowifi import Device, DeviceConnectionError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_HW_VERSION,
ATTR_SERIAL_NUMBER,
CONF_HOST,
CONF_TYPE,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.httpx_client import get_async_client
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import LOGGER
SCAN_INTERVAL = timedelta(seconds=10)
class LektricoDeviceDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Data update coordinator for Lektrico device."""
config_entry: ConfigEntry
def __init__(self, hass: HomeAssistant, device_name: str) -> None:
"""Initialize a Lektrico Device."""
super().__init__(
hass,
LOGGER,
name=device_name,
update_interval=SCAN_INTERVAL,
)
self.device = Device(
self.config_entry.data[CONF_HOST],
asyncClient=get_async_client(hass),
)
self.serial_number: str = self.config_entry.data[ATTR_SERIAL_NUMBER]
self.board_revision: str = self.config_entry.data[ATTR_HW_VERSION]
self.device_type: str = self.config_entry.data[CONF_TYPE]
async def _async_update_data(self) -> dict[str, Any]:
"""Async Update device state."""
try:
return await self.device.device_info(self.device_type)
except DeviceConnectionError as lek_ex:
raise UpdateFailed(lek_ex) from lek_ex