Add fibaro event platform (#101636)

This commit is contained in:
rappenze 2023-10-08 22:01:26 +02:00 committed by GitHub
parent db0c5bbbea
commit fb215479d4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 1 deletions

View file

@ -376,6 +376,7 @@ omit =
homeassistant/components/fibaro/binary_sensor.py
homeassistant/components/fibaro/climate.py
homeassistant/components/fibaro/cover.py
homeassistant/components/fibaro/event.py
homeassistant/components/fibaro/light.py
homeassistant/components/fibaro/lock.py
homeassistant/components/fibaro/sensor.py

View file

@ -2,7 +2,7 @@
from __future__ import annotations
from collections import defaultdict
from collections.abc import Mapping
from collections.abc import Callable, Mapping
import logging
from typing import Any
@ -10,6 +10,7 @@ from pyfibaro.fibaro_client import FibaroClient
from pyfibaro.fibaro_device import DeviceModel
from pyfibaro.fibaro_room import RoomModel
from pyfibaro.fibaro_scene import SceneModel
from pyfibaro.fibaro_state_resolver import FibaroEvent, FibaroStateResolver
from requests.exceptions import HTTPError
from homeassistant.config_entries import ConfigEntry
@ -46,6 +47,7 @@ PLATFORMS = [
Platform.SENSOR,
Platform.LOCK,
Platform.SWITCH,
Platform.EVENT,
]
FIBARO_TYPEMAP = {
@ -95,6 +97,8 @@ class FibaroController:
# All scenes
self._scenes: list[SceneModel] = []
self._callbacks: dict[int, list[Any]] = {} # Update value callbacks by deviceId
# Event callbacks by device id
self._event_callbacks: dict[int, list[Callable[[FibaroEvent], None]]] = {}
self.hub_serial: str # Unique serial number of the hub
self.hub_name: str # The friendly name of the hub
self.hub_software_version: str
@ -178,11 +182,31 @@ class FibaroController:
for callback in self._callbacks[item]:
callback()
resolver = FibaroStateResolver(state)
for event in resolver.get_events():
fibaro_id = event.fibaro_id
if (
event.event_type.lower() == "centralsceneevent"
and fibaro_id in self._event_callbacks
):
for callback in self._event_callbacks[fibaro_id]:
callback(event)
def register(self, device_id: int, callback: Any) -> None:
"""Register device with a callback for updates."""
self._callbacks.setdefault(device_id, [])
self._callbacks[device_id].append(callback)
def register_event(
self, device_id: int, callback: Callable[[FibaroEvent], None]
) -> None:
"""Register device with a callback for central scene events.
The callback receives one parameter with the event.
"""
self._event_callbacks.setdefault(device_id, [])
self._event_callbacks[device_id].append(callback)
def get_children(self, device_id: int) -> list[DeviceModel]:
"""Get a list of child devices."""
return [
@ -229,6 +253,8 @@ class FibaroController:
platform = Platform.COVER
elif "secure" in device.actions:
platform = Platform.LOCK
elif device.has_central_scene_event:
platform = Platform.EVENT
elif device.value.has_value:
if device.value.is_bool_value:
platform = Platform.BINARY_SENSOR

View file

@ -0,0 +1,68 @@
"""Support for Fibaro event entities."""
from __future__ import annotations
from pyfibaro.fibaro_device import DeviceModel, SceneEvent
from pyfibaro.fibaro_state_resolver import FibaroEvent
from homeassistant.components.event import (
ENTITY_ID_FORMAT,
EventDeviceClass,
EventEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import FibaroController, FibaroDevice
from .const import DOMAIN
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the Fibaro event entities."""
controller: FibaroController = hass.data[DOMAIN][entry.entry_id]
entities = []
for device in controller.fibaro_devices[Platform.EVENT]:
for scene_event in device.central_scene_event:
# Each scene event represents a button on a device
entities.append(FibaroEventEntity(device, scene_event))
async_add_entities(entities, True)
class FibaroEventEntity(FibaroDevice, EventEntity):
"""Representation of a Fibaro Event Entity."""
def __init__(self, fibaro_device: DeviceModel, scene_event: SceneEvent) -> None:
"""Initialize the Fibaro device."""
super().__init__(fibaro_device)
self.entity_id = ENTITY_ID_FORMAT.format(
f"{self.ha_id}_button_{scene_event.key_id}"
)
self._button = scene_event.key_id
self._attr_name = f"{fibaro_device.friendly_name} Button {scene_event.key_id}"
self._attr_device_class = EventDeviceClass.BUTTON
self._attr_event_types = scene_event.key_event_types
self._attr_unique_id = f"{fibaro_device.unique_id_str}.{scene_event.key_id}"
async def async_added_to_hass(self) -> None:
"""Call when entity is added to hass."""
await super().async_added_to_hass()
# Register event callback
self.controller.register_event(
self.fibaro_device.fibaro_id, self._event_callback
)
@callback
def _event_callback(self, event: FibaroEvent) -> None:
if event.key_id == self._button:
self._trigger_event(event.key_event_type)
self.schedule_update_ha_state()