Add time remaining sensors for RainMachine programs (#73878)

This commit is contained in:
Aaron Bach 2022-06-29 08:27:34 -06:00 committed by GitHub
parent f5d8487768
commit e6d115e765
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -3,10 +3,12 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, cast
from regenmaschine.controller import Controller from regenmaschine.controller import Controller
from homeassistant.components.sensor import ( from homeassistant.components.sensor import (
RestoreSensor,
SensorDeviceClass, SensorDeviceClass,
SensorEntity, SensorEntity,
SensorEntityDescription, SensorEntityDescription,
@ -24,6 +26,7 @@ from . import RainMachineEntity
from .const import ( from .const import (
DATA_CONTROLLER, DATA_CONTROLLER,
DATA_COORDINATOR, DATA_COORDINATOR,
DATA_PROGRAMS,
DATA_PROVISION_SETTINGS, DATA_PROVISION_SETTINGS,
DATA_RESTRICTIONS_UNIVERSAL, DATA_RESTRICTIONS_UNIVERSAL,
DATA_ZONES, DATA_ZONES,
@ -44,6 +47,7 @@ TYPE_FLOW_SENSOR_CONSUMED_LITERS = "flow_sensor_consumed_liters"
TYPE_FLOW_SENSOR_START_INDEX = "flow_sensor_start_index" TYPE_FLOW_SENSOR_START_INDEX = "flow_sensor_start_index"
TYPE_FLOW_SENSOR_WATERING_CLICKS = "flow_sensor_watering_clicks" TYPE_FLOW_SENSOR_WATERING_CLICKS = "flow_sensor_watering_clicks"
TYPE_FREEZE_TEMP = "freeze_protect_temp" TYPE_FREEZE_TEMP = "freeze_protect_temp"
TYPE_PROGRAM_RUN_COMPLETION_TIME = "program_run_completion_time"
TYPE_ZONE_RUN_COMPLETION_TIME = "zone_run_completion_time" TYPE_ZONE_RUN_COMPLETION_TIME = "zone_run_completion_time"
@ -143,7 +147,26 @@ async def async_setup_entry(
) )
] ]
program_coordinator = coordinators[DATA_PROGRAMS]
zone_coordinator = coordinators[DATA_ZONES] zone_coordinator = coordinators[DATA_ZONES]
for uid, program in program_coordinator.data.items():
sensors.append(
ProgramTimeRemainingSensor(
entry,
program_coordinator,
zone_coordinator,
controller,
RainMachineSensorDescriptionUid(
key=f"{TYPE_PROGRAM_RUN_COMPLETION_TIME}_{uid}",
name=f"{program['name']} Run Completion Time",
device_class=SensorDeviceClass.TIMESTAMP,
entity_category=EntityCategory.DIAGNOSTIC,
uid=uid,
),
)
)
for uid, zone in zone_coordinator.data.items(): for uid, zone in zone_coordinator.data.items():
sensors.append( sensors.append(
ZoneTimeRemainingSensor( ZoneTimeRemainingSensor(
@ -163,6 +186,106 @@ async def async_setup_entry(
async_add_entities(sensors) async_add_entities(sensors)
class TimeRemainingSensor(RainMachineEntity, RestoreSensor):
"""Define a sensor that shows the amount of time remaining for an activity."""
entity_description: RainMachineSensorDescriptionUid
def __init__(
self,
entry: ConfigEntry,
coordinator: DataUpdateCoordinator,
controller: Controller,
description: EntityDescription,
) -> None:
"""Initialize."""
super().__init__(entry, coordinator, controller, description)
self._current_run_state: RunStates | None = None
self._previous_run_state: RunStates | None = None
@property
def activity_data(self) -> dict[str, Any]:
"""Return the core data for this entity."""
return cast(dict[str, Any], self.coordinator.data[self.entity_description.uid])
@property
def status_key(self) -> str:
"""Return the data key that contains the activity status."""
return "state"
async def async_added_to_hass(self) -> None:
"""Handle entity which will be added."""
if restored_data := await self.async_get_last_sensor_data():
self._attr_native_value = restored_data.native_value
await super().async_added_to_hass()
def calculate_seconds_remaining(self) -> int:
"""Calculate the number of seconds remaining."""
raise NotImplementedError
@callback
def update_from_latest_data(self) -> None:
"""Update the state."""
self._previous_run_state = self._current_run_state
self._current_run_state = RUN_STATE_MAP.get(self.activity_data[self.status_key])
now = utcnow()
if (
self._current_run_state == RunStates.NOT_RUNNING
and self._previous_run_state in (RunStates.QUEUED, RunStates.RUNNING)
):
# If the activity goes from queued/running to not running, update the
# state to be right now (i.e., the time the zone stopped running):
self._attr_native_value = now
elif self._current_run_state == RunStates.RUNNING:
seconds_remaining = self.calculate_seconds_remaining()
new_timestamp = now + timedelta(seconds=seconds_remaining)
assert isinstance(self._attr_native_value, datetime)
if (
self._attr_native_value
and new_timestamp - self._attr_native_value
< DEFAULT_ZONE_COMPLETION_TIME_WOBBLE_TOLERANCE
):
# If the deviation between the previous and new timestamps is less
# than a "wobble tolerance," don't spam the state machine:
return
self._attr_native_value = new_timestamp
class ProgramTimeRemainingSensor(TimeRemainingSensor):
"""Define a sensor that shows the amount of time remaining for a program."""
def __init__(
self,
entry: ConfigEntry,
program_coordinator: DataUpdateCoordinator,
zone_coordinator: DataUpdateCoordinator,
controller: Controller,
description: EntityDescription,
) -> None:
"""Initialize."""
super().__init__(entry, program_coordinator, controller, description)
self._zone_coordinator = zone_coordinator
@property
def status_key(self) -> str:
"""Return the data key that contains the activity status."""
return "status"
def calculate_seconds_remaining(self) -> int:
"""Calculate the number of seconds remaining."""
return sum(
self._zone_coordinator.data[zone["id"]]["remaining"]
for zone in [z for z in self.activity_data["wateringTimes"] if z["active"]]
)
class ProvisionSettingsSensor(RainMachineEntity, SensorEntity): class ProvisionSettingsSensor(RainMachineEntity, SensorEntity):
"""Define a sensor that handles provisioning data.""" """Define a sensor that handles provisioning data."""
@ -203,47 +326,11 @@ class UniversalRestrictionsSensor(RainMachineEntity, SensorEntity):
self._attr_native_value = self.coordinator.data.get("freezeProtectTemp") self._attr_native_value = self.coordinator.data.get("freezeProtectTemp")
class ZoneTimeRemainingSensor(RainMachineEntity, SensorEntity): class ZoneTimeRemainingSensor(TimeRemainingSensor):
"""Define a sensor that shows the amount of time remaining for a zone.""" """Define a sensor that shows the amount of time remaining for a zone."""
entity_description: RainMachineSensorDescriptionUid def calculate_seconds_remaining(self) -> int:
"""Calculate the number of seconds remaining."""
def __init__( return cast(
self, int, self.coordinator.data[self.entity_description.uid]["remaining"]
entry: ConfigEntry, )
coordinator: DataUpdateCoordinator,
controller: Controller,
description: EntityDescription,
) -> None:
"""Initialize."""
super().__init__(entry, coordinator, controller, description)
self._running_or_queued: bool = False
@callback
def update_from_latest_data(self) -> None:
"""Update the state."""
data = self.coordinator.data[self.entity_description.uid]
now = utcnow()
if RUN_STATE_MAP.get(data["state"]) == RunStates.NOT_RUNNING:
if self._running_or_queued:
# If we go from running to not running, update the state to be right
# now (i.e., the time the zone stopped running):
self._attr_native_value = now
self._running_or_queued = False
return
self._running_or_queued = True
new_timestamp = now + timedelta(seconds=data["remaining"])
if self._attr_native_value:
assert isinstance(self._attr_native_value, datetime)
if (
new_timestamp - self._attr_native_value
) < DEFAULT_ZONE_COMPLETION_TIME_WOBBLE_TOLERANCE:
# If the deviation between the previous and new timestamps is less than
# a "wobble tolerance," don't spam the state machine:
return
self._attr_native_value = new_timestamp