hass-core/homeassistant/components/knx/knx_entity.py
Matthias Alphart 9351f300b0
Update xknx to 3.0.0 - more DPT definitions (#122891)
* Support DPTComplex objects and validate sensor types

* Gracefully start and stop xknx device objects

* Use non-awaitable XknxDevice callbacks

* Use non-awaitable xknx.TelegramQueue callbacks

* Use non-awaitable xknx.ConnectionManager callbacks

* Remove unnecessary `hass.async_block_till_done()` calls

* Wait for StateUpdater logic to proceed when receiving responses

* Update import module paths for specific DPTs

* Support Enum data types

* New HVAC mode names

* HVAC Enums instead of Enum member value strings

* New date and time devices

* Update xknx to 3.0.0

* Fix expose tests and DPTEnumData check

* ruff and mypy fixes
2024-07-31 09:10:36 +02:00

51 lines
1.5 KiB
Python

"""Base class for KNX devices."""
from __future__ import annotations
from typing import cast
from xknx.devices import Device as XknxDevice
from homeassistant.helpers.entity import Entity
from . import KNXModule
from .const import DOMAIN
class KnxEntity(Entity):
"""Representation of a KNX entity."""
_attr_should_poll = False
def __init__(self, device: XknxDevice) -> None:
"""Set up device."""
self._device = device
@property
def name(self) -> str:
"""Return the name of the KNX device."""
return self._device.name
@property
def available(self) -> bool:
"""Return True if entity is available."""
knx_module = cast(KNXModule, self.hass.data[DOMAIN])
return knx_module.connected
async def async_update(self) -> None:
"""Request a state update from KNX bus."""
await self._device.sync()
def after_update_callback(self, _device: XknxDevice) -> None:
"""Call after device was updated."""
self.async_write_ha_state()
async def async_added_to_hass(self) -> None:
"""Store register state change callback and start device object."""
self._device.register_device_updated_cb(self.after_update_callback)
self._device.xknx.devices.async_add(self._device)
async def async_will_remove_from_hass(self) -> None:
"""Disconnect device object when removed."""
self._device.unregister_device_updated_cb(self.after_update_callback)
self._device.xknx.devices.async_remove(self._device)