Add an energy solar platform for solar forecasts (#54576)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
Paulus Schoutsen 2021-08-25 11:37:03 -07:00 committed by GitHub
parent 038121e87b
commit 7c5a0174ba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 243 additions and 64 deletions

View file

@ -3,12 +3,17 @@ from __future__ import annotations
import asyncio
import functools
from typing import Any, Awaitable, Callable, Dict, cast
from types import ModuleType
from typing import Any, Awaitable, Callable, cast
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.integration_platform import (
async_process_integration_platforms,
)
from homeassistant.helpers.singleton import singleton
from .const import DOMAIN
from .data import (
@ -18,14 +23,15 @@ from .data import (
EnergyPreferencesUpdate,
async_get_manager,
)
from .types import EnergyPlatform, GetSolarForecastType
from .validate import async_validate
EnergyWebSocketCommandHandler = Callable[
[HomeAssistant, websocket_api.ActiveConnection, Dict[str, Any], "EnergyManager"],
[HomeAssistant, websocket_api.ActiveConnection, "dict[str, Any]", "EnergyManager"],
None,
]
AsyncEnergyWebSocketCommandHandler = Callable[
[HomeAssistant, websocket_api.ActiveConnection, Dict[str, Any], "EnergyManager"],
[HomeAssistant, websocket_api.ActiveConnection, "dict[str, Any]", "EnergyManager"],
Awaitable[None],
]
@ -37,6 +43,28 @@ def async_setup(hass: HomeAssistant) -> None:
websocket_api.async_register_command(hass, ws_save_prefs)
websocket_api.async_register_command(hass, ws_info)
websocket_api.async_register_command(hass, ws_validate)
websocket_api.async_register_command(hass, ws_solar_forecast)
@singleton("energy_platforms")
async def async_get_energy_platforms(
hass: HomeAssistant,
) -> dict[str, GetSolarForecastType]:
"""Get energy platforms."""
platforms: dict[str, GetSolarForecastType] = {}
async def _process_energy_platform(
hass: HomeAssistant, domain: str, platform: ModuleType
) -> None:
"""Process energy platforms."""
if not hasattr(platform, "async_get_solar_forecast"):
return
platforms[domain] = cast(EnergyPlatform, platform).async_get_solar_forecast
await async_process_integration_platforms(hass, DOMAIN, _process_energy_platform)
return platforms
def _ws_with_manager(
@ -107,14 +135,21 @@ async def ws_save_prefs(
vol.Required("type"): "energy/info",
}
)
@callback
def ws_info(
@websocket_api.async_response
async def ws_info(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""Handle get info command."""
connection.send_result(msg["id"], hass.data[DOMAIN])
forecast_platforms = await async_get_energy_platforms(hass)
connection.send_result(
msg["id"],
{
"cost_sensors": hass.data[DOMAIN]["cost_sensors"],
"solar_forecast_domains": list(forecast_platforms),
},
)
@websocket_api.websocket_command(
@ -130,3 +165,56 @@ async def ws_validate(
) -> None:
"""Handle validate command."""
connection.send_result(msg["id"], (await async_validate(hass)).as_dict())
@websocket_api.websocket_command(
{
vol.Required("type"): "energy/solar_forecast",
}
)
@_ws_with_manager
async def ws_solar_forecast(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
manager: EnergyManager,
) -> None:
"""Handle solar forecast command."""
if manager.data is None:
connection.send_result(msg["id"], {})
return
config_entries: dict[str, str | None] = {}
for source in manager.data["energy_sources"]:
if (
source["type"] != "solar"
or source.get("config_entry_solar_forecast") is None
):
continue
# typing is not catching the above guard for config_entry_solar_forecast being none
for config_entry in source["config_entry_solar_forecast"]: # type: ignore[union-attr]
config_entries[config_entry] = None
if not config_entries:
connection.send_result(msg["id"], {})
return
forecasts = {}
forecast_platforms = await async_get_energy_platforms(hass)
for config_entry_id in config_entries:
config_entry = hass.config_entries.async_get_entry(config_entry_id)
# Filter out non-existing config entries or unsupported domains
if config_entry is None or config_entry.domain not in forecast_platforms:
continue
forecast = await forecast_platforms[config_entry.domain](hass, config_entry_id)
if forecast is not None:
forecasts[config_entry_id] = forecast
connection.send_result(msg["id"], forecasts)