Add Flume binary sensors (#77327)

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Jeef 2022-10-06 19:09:38 -06:00 committed by GitHub
parent e1047320a9
commit 22d6ce967d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 311 additions and 27 deletions

View file

@ -407,9 +407,11 @@ omit =
homeassistant/components/flick_electric/sensor.py
homeassistant/components/flock/notify.py
homeassistant/components/flume/__init__.py
homeassistant/components/flume/binary_sensor.py
homeassistant/components/flume/coordinator.py
homeassistant/components/flume/entity.py
homeassistant/components/flume/sensor.py
homeassistant/components/flume/util.py
homeassistant/components/folder/sensor.py
homeassistant/components/folder_watcher/*
homeassistant/components/foobot/sensor.py

View file

@ -0,0 +1,159 @@
"""Flume binary sensors."""
from __future__ import annotations
from dataclasses import dataclass
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
BinarySensorEntityDescription,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import (
DOMAIN,
FLUME_AUTH,
FLUME_DEVICES,
FLUME_TYPE_BRIDGE,
FLUME_TYPE_SENSOR,
KEY_DEVICE_ID,
KEY_DEVICE_LOCATION,
KEY_DEVICE_LOCATION_NAME,
KEY_DEVICE_TYPE,
NOTIFICATION_HIGH_FLOW,
NOTIFICATION_LEAK_DETECTED,
)
from .coordinator import (
FlumeDeviceConnectionUpdateCoordinator,
FlumeNotificationDataUpdateCoordinator,
)
from .entity import FlumeEntity
from .util import get_valid_flume_devices
@dataclass
class FlumeBinarySensorRequiredKeysMixin:
"""Mixin for required keys."""
event_rule: str
@dataclass
class FlumeBinarySensorEntityDescription(
BinarySensorEntityDescription, FlumeBinarySensorRequiredKeysMixin
):
"""Describes a binary sensor entity."""
FLUME_BINARY_NOTIFICATION_SENSORS: tuple[FlumeBinarySensorEntityDescription, ...] = (
FlumeBinarySensorEntityDescription(
key="leak",
name="Leak detected",
entity_category=EntityCategory.DIAGNOSTIC,
event_rule=NOTIFICATION_LEAK_DETECTED,
icon="mdi:pipe-leak",
),
FlumeBinarySensorEntityDescription(
key="flow",
name="High flow",
entity_category=EntityCategory.DIAGNOSTIC,
event_rule=NOTIFICATION_HIGH_FLOW,
icon="mdi:waves",
),
)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up a Flume binary sensor.."""
flume_domain_data = hass.data[DOMAIN][config_entry.entry_id]
flume_auth = flume_domain_data[FLUME_AUTH]
flume_devices = flume_domain_data[FLUME_DEVICES]
flume_entity_list: list[
FlumeNotificationBinarySensor | FlumeConnectionBinarySensor
] = []
connection_coordinator = FlumeDeviceConnectionUpdateCoordinator(
hass=hass, flume_devices=flume_devices
)
notification_coordinator = FlumeNotificationDataUpdateCoordinator(
hass=hass, auth=flume_auth
)
flume_devices = get_valid_flume_devices(flume_devices)
for device in flume_devices:
device_id = device[KEY_DEVICE_ID]
device_location_name = device[KEY_DEVICE_LOCATION][KEY_DEVICE_LOCATION_NAME]
connection_sensor = FlumeConnectionBinarySensor(
coordinator=connection_coordinator,
description=BinarySensorEntityDescription(
name="Connected",
key="connected",
),
device_id=device_id,
location_name=device_location_name,
is_bridge=(device[KEY_DEVICE_TYPE] is FLUME_TYPE_BRIDGE),
)
flume_entity_list.append(connection_sensor)
if device[KEY_DEVICE_TYPE] != FLUME_TYPE_SENSOR:
continue
# Build notification sensors
flume_entity_list.extend(
[
FlumeNotificationBinarySensor(
coordinator=notification_coordinator,
description=description,
device_id=device_id,
location_name=device_location_name,
)
for description in FLUME_BINARY_NOTIFICATION_SENSORS
]
)
if flume_entity_list:
async_add_entities(flume_entity_list)
class FlumeNotificationBinarySensor(FlumeEntity, BinarySensorEntity):
"""Binary sensor class."""
entity_description: FlumeBinarySensorEntityDescription
coordinator: FlumeNotificationDataUpdateCoordinator
@property
def is_on(self) -> bool:
"""Return on state."""
return bool(
(
notifications := self.coordinator.active_notifications_by_device.get(
self.device_id
)
)
and self.entity_description.event_rule in notifications
)
class FlumeConnectionBinarySensor(FlumeEntity, BinarySensorEntity):
"""Binary Sensor class for WIFI Connection status."""
entity_description: FlumeBinarySensorEntityDescription
coordinator: FlumeDeviceConnectionUpdateCoordinator
_attr_entity_category = EntityCategory.DIAGNOSTIC
_attr_device_class = BinarySensorDeviceClass.CONNECTIVITY
@property
def is_on(self) -> bool:
"""Return connection status."""
return bool(
(connected := self.coordinator.connected) and connected[self.device_id]
)

View file

@ -8,17 +8,24 @@ from homeassistant.const import Platform
DOMAIN = "flume"
PLATFORMS = [Platform.SENSOR]
PLATFORMS = [
Platform.BINARY_SENSOR,
Platform.SENSOR,
]
DEFAULT_NAME = "Flume Sensor"
# Flume API limits individual endpoints to 120 queries per hour
NOTIFICATION_SCAN_INTERVAL = timedelta(minutes=1)
DEVICE_SCAN_INTERVAL = timedelta(minutes=1)
DEVICE_CONNECTION_SCAN_INTERVAL = timedelta(minutes=1)
_LOGGER = logging.getLogger(__package__)
FLUME_TYPE_BRIDGE = 1
FLUME_TYPE_SENSOR = 2
FLUME_AUTH = "flume_auth"
FLUME_HTTP_SESSION = "http_session"
FLUME_DEVICES = "devices"
@ -33,3 +40,10 @@ KEY_DEVICE_ID = "id"
KEY_DEVICE_LOCATION = "location"
KEY_DEVICE_LOCATION_NAME = "name"
KEY_DEVICE_LOCATION_TIMEZONE = "tz"
NOTIFICATION_HIGH_FLOW = "High Flow Alert"
NOTIFICATION_BRIDGE_DISCONNECT = "Bridge Disconnection"
BRIDGE_NOTIFICATION_KEY = "connected"
BRIDGE_NOTIFICATION_RULE = "Bridge Disconnection"
NOTIFICATION_LEAK_DETECTED = "Flume Smart Leak Alert"

View file

@ -1,10 +1,21 @@
"""The IntelliFire integration."""
from __future__ import annotations
from typing import Any
import pyflume
from pyflume import FlumeDeviceList
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import _LOGGER, DEVICE_SCAN_INTERVAL, DOMAIN
from .const import (
_LOGGER,
DEVICE_CONNECTION_SCAN_INTERVAL,
DEVICE_SCAN_INTERVAL,
DOMAIN,
NOTIFICATION_SCAN_INTERVAL,
)
class FlumeDeviceDataUpdateCoordinator(DataUpdateCoordinator[None]):
@ -23,13 +34,89 @@ class FlumeDeviceDataUpdateCoordinator(DataUpdateCoordinator[None]):
async def _async_update_data(self) -> None:
"""Get the latest data from the Flume."""
_LOGGER.debug("Updating Flume data")
try:
await self.hass.async_add_executor_job(self.flume_device.update_force)
except Exception as ex:
raise UpdateFailed(f"Error communicating with flume API: {ex}") from ex
_LOGGER.debug(
"Flume update details: values=%s query_payload=%s",
"Flume Device Data Update values=%s query_payload=%s",
self.flume_device.values,
self.flume_device.query_payload,
)
class FlumeDeviceConnectionUpdateCoordinator(DataUpdateCoordinator[None]):
"""Date update coordinator to read connected status from Devices endpoint."""
def __init__(self, hass: HomeAssistant, flume_devices: FlumeDeviceList) -> None:
"""Initialize the Coordinator."""
super().__init__(
hass,
name=DOMAIN,
logger=_LOGGER,
update_interval=DEVICE_CONNECTION_SCAN_INTERVAL,
)
self.flume_devices = flume_devices
self.connected: dict[str, bool] = {}
def _update_connectivity(self) -> None:
"""Update device connectivity.."""
self.connected = {
device["id"]: device["connected"]
for device in self.flume_devices.get_devices()
}
_LOGGER.debug("Connectivity %s", self.connected)
async def _async_update_data(self) -> None:
"""Update the device list."""
try:
await self.hass.async_add_executor_job(self._update_connectivity)
except Exception as ex:
raise UpdateFailed(f"Error communicating with flume API: {ex}") from ex
class FlumeNotificationDataUpdateCoordinator(DataUpdateCoordinator[None]):
"""Data update coordinator for flume notifications."""
def __init__(self, hass: HomeAssistant, auth) -> None:
"""Initialize the Coordinator."""
super().__init__(
hass,
name=DOMAIN,
logger=_LOGGER,
update_interval=NOTIFICATION_SCAN_INTERVAL,
)
self.auth = auth
self.active_notifications_by_device: dict = {}
self.notifications: list[dict[str, Any]]
def _update_lists(self):
"""Query flume for notification list."""
self.notifications: list[dict[str, Any]] = pyflume.FlumeNotificationList(
self.auth, read="true"
).notification_list
_LOGGER.debug("Notifications %s", self.notifications)
active_notifications_by_device: dict[str, set[str]] = {}
for notification in self.notifications:
if (
not notification.get("device_id")
or not notification.get("extra")
or "event_rule_name" not in notification["extra"]
):
continue
device_id = notification["device_id"]
rule = notification["extra"]["event_rule_name"]
active_notifications_by_device.setdefault(device_id, set()).add(rule)
self.active_notifications_by_device = active_notifications_by_device
async def _async_update_data(self) -> None:
"""Update data."""
_LOGGER.debug("Updating Flume Notification")
try:
await self.hass.async_add_executor_job(self._update_lists)
except Exception as ex:
raise UpdateFailed(f"Error communicating with flume API: {ex}") from ex

View file

@ -2,13 +2,15 @@
from __future__ import annotations
from homeassistant.helpers.entity import DeviceInfo, EntityDescription
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from .const import DOMAIN
from .coordinator import FlumeDeviceDataUpdateCoordinator
class FlumeEntity(CoordinatorEntity[FlumeDeviceDataUpdateCoordinator]):
class FlumeEntity(CoordinatorEntity[DataUpdateCoordinator]):
"""Base entity class."""
_attr_attribution = "Data provided by Flume API"
@ -16,20 +18,29 @@ class FlumeEntity(CoordinatorEntity[FlumeDeviceDataUpdateCoordinator]):
def __init__(
self,
coordinator: FlumeDeviceDataUpdateCoordinator,
coordinator: DataUpdateCoordinator,
description: EntityDescription,
device_id: str,
location_name: str,
is_bridge: bool = False,
) -> None:
"""Class initializer."""
super().__init__(coordinator)
self.entity_description = description
self.device_id = device_id
if is_bridge:
name = "Flume Bridge"
else:
name = "Flume Sensor"
self._attr_unique_id = f"{description.key}_{device_id}"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, device_id)},
manufacturer="Flume, Inc.",
model="Flume Smart Water Monitor",
name=f"Flume {device_id}",
name=f"{name} {location_name}",
configuration_url="https://portal.flumewater.com",
)

View file

@ -22,11 +22,13 @@ from .const import (
FLUME_TYPE_SENSOR,
KEY_DEVICE_ID,
KEY_DEVICE_LOCATION,
KEY_DEVICE_LOCATION_NAME,
KEY_DEVICE_LOCATION_TIMEZONE,
KEY_DEVICE_TYPE,
)
from .coordinator import FlumeDeviceDataUpdateCoordinator
from .entity import FlumeEntity
from .util import get_valid_flume_devices
FLUME_QUERIES_SENSOR: tuple[SensorEntityDescription, ...] = (
SensorEntityDescription(
@ -78,21 +80,20 @@ async def async_setup_entry(
"""Set up the Flume sensor."""
flume_domain_data = hass.data[DOMAIN][config_entry.entry_id]
flume_devices = flume_domain_data[FLUME_DEVICES]
flume_auth = flume_domain_data[FLUME_AUTH]
http_session = flume_domain_data[FLUME_HTTP_SESSION]
flume_devices = flume_domain_data[FLUME_DEVICES]
flume_devices = [
device
for device in get_valid_flume_devices(flume_devices)
if device[KEY_DEVICE_TYPE] == FLUME_TYPE_SENSOR
]
flume_entity_list = []
for device in flume_devices.device_list:
if (
device[KEY_DEVICE_TYPE] != FLUME_TYPE_SENSOR
or KEY_DEVICE_LOCATION not in device
):
continue
for device in flume_devices:
device_id = device[KEY_DEVICE_ID]
device_timezone = device[KEY_DEVICE_LOCATION][KEY_DEVICE_LOCATION_TIMEZONE]
device_location_name = device[KEY_DEVICE_LOCATION][KEY_DEVICE_LOCATION_NAME]
flume_device = FlumeData(
flume_auth,
@ -113,6 +114,7 @@ async def async_setup_entry(
coordinator=coordinator,
description=description,
device_id=device_id,
location_name=device_location_name,
)
for description in FLUME_QUERIES_SENSOR
]
@ -127,15 +129,6 @@ class FlumeSensor(FlumeEntity, SensorEntity):
coordinator: FlumeDeviceDataUpdateCoordinator
def __init__(
self,
coordinator: FlumeDeviceDataUpdateCoordinator,
device_id: str,
description: SensorEntityDescription,
) -> None:
"""Inlitializer function with type hints."""
super().__init__(coordinator, description, device_id)
@property
def native_value(self):
"""Return the state of the sensor."""

View file

@ -0,0 +1,18 @@
"""Utilities for Flume."""
from __future__ import annotations
from typing import Any
from pyflume import FlumeDeviceList
from .const import KEY_DEVICE_LOCATION, KEY_DEVICE_LOCATION_NAME
def get_valid_flume_devices(flume_devices: FlumeDeviceList) -> list[dict[str, Any]]:
"""Return a list of Flume devices that have a valid location."""
return [
device
for device in flume_devices.device_list
if KEY_DEVICE_LOCATION_NAME in device[KEY_DEVICE_LOCATION]
]