Add config flow to System Monitor (#104906)

* Initial commit for config flow to System Monitor

* sensors

* Fixes

* Works

* Add import

* entity_registry_enabled_default = False

* entity_category = diagnostic

* Create issue

* issue in config flow

* Tests

* test requirement

* codeowner

* Fix names

* processes

* Fix type

* reviews

* get info during startup once

* Select process

* Legacy import of resources

* requirements

* Allow custom

* Fix tests

* strings

* strings

* Always enable process sensors

* Fix docstrings

* skip remove sensors if no sensors

* Modify sensors

* Fix tests
This commit is contained in:
G Johansson 2023-12-26 18:29:32 +01:00 committed by GitHub
parent 2cd6c2b6bf
commit 4f0ee20ec5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 687 additions and 24 deletions

View file

@ -15,26 +15,29 @@ import psutil
import voluptuous as vol
from homeassistant.components.sensor import (
DOMAIN as SENSOR_DOMAIN,
PLATFORM_SCHEMA,
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
CONF_RESOURCES,
CONF_SCAN_INTERVAL,
CONF_TYPE,
EVENT_HOMEASSISTANT_STOP,
PERCENTAGE,
STATE_OFF,
STATE_ON,
EntityCategory,
UnitOfDataRate,
UnitOfInformation,
UnitOfTemperature,
)
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
@ -46,6 +49,9 @@ from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import slugify
import homeassistant.util.dt as dt_util
from .const import CONF_PROCESS, DOMAIN, NETWORK_TYPES
from .util import get_all_disk_mounts, get_all_network_interfaces
_LOGGER = logging.getLogger(__name__)
CONF_ARG = "arg"
@ -261,6 +267,17 @@ def check_required_arg(value: Any) -> Any:
return value
def check_legacy_resource(resource: str, resources: list[str]) -> bool:
"""Return True if legacy resource was configured."""
# This function to check legacy resources can be removed
# once we are removing the import from YAML
if resource in resources:
_LOGGER.debug("Checking %s in %s returns True", resource, ", ".join(resources))
return True
_LOGGER.debug("Checking %s in %s returns False", resource, ", ".join(resources))
return False
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_RESOURCES, default={CONF_TYPE: "disk_use"}): vol.All(
@ -334,39 +351,126 @@ async def async_setup_platform(
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the system monitor sensors."""
processes = [
resource[CONF_ARG]
for resource in config[CONF_RESOURCES]
if resource[CONF_TYPE] == "process"
]
legacy_config: list[dict[str, str]] = config[CONF_RESOURCES]
resources = []
for resource_conf in legacy_config:
if (_type := resource_conf[CONF_TYPE]).startswith("disk_"):
if (arg := resource_conf.get(CONF_ARG)) is None:
resources.append(f"{_type}_/")
continue
resources.append(f"{_type}_{arg}")
continue
resources.append(f"{_type}_{resource_conf.get(CONF_ARG, '')}")
_LOGGER.debug(
"Importing config with processes: %s, resources: %s", processes, resources
)
# With removal of the import also cleanup legacy_resources logic in setup_entry
# Also cleanup entry.options["resources"] which is only imported for legacy reasons
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={"processes": processes, "legacy_resources": resources},
)
)
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up System Montor sensors based on a config entry."""
entities = []
sensor_registry: dict[tuple[str, str], SensorData] = {}
legacy_resources: list[str] = entry.options.get("resources", [])
disk_arguments = await hass.async_add_executor_job(get_all_disk_mounts)
network_arguments = await hass.async_add_executor_job(get_all_network_interfaces)
cpu_temperature = await hass.async_add_executor_job(_read_cpu_temperature)
for resource in config[CONF_RESOURCES]:
type_ = resource[CONF_TYPE]
# Initialize the sensor argument if none was provided.
# For disk monitoring default to "/" (root) to prevent runtime errors, if argument was not specified.
if CONF_ARG not in resource:
argument = ""
if resource[CONF_TYPE].startswith("disk_"):
argument = "/"
else:
argument = resource[CONF_ARG]
_LOGGER.debug("Setup from options %s", entry.options)
for _type, sensor_description in SENSOR_TYPES.items():
if _type.startswith("disk_"):
for argument in disk_arguments:
sensor_registry[(_type, argument)] = SensorData(
argument, None, None, None, None
)
is_enabled = check_legacy_resource(
f"{_type}_{argument}", legacy_resources
)
entities.append(
SystemMonitorSensor(
sensor_registry,
sensor_description,
entry.entry_id,
argument,
is_enabled,
)
)
continue
if _type in NETWORK_TYPES:
for argument in network_arguments:
sensor_registry[(_type, argument)] = SensorData(
argument, None, None, None, None
)
is_enabled = check_legacy_resource(
f"{_type}_{argument}", legacy_resources
)
entities.append(
SystemMonitorSensor(
sensor_registry,
sensor_description,
entry.entry_id,
argument,
is_enabled,
)
)
continue
# Verify if we can retrieve CPU / processor temperatures.
# If not, do not create the entity and add a warning to the log
if (
type_ == "processor_temperature"
and await hass.async_add_executor_job(_read_cpu_temperature) is None
):
if _type == "processor_temperature" and cpu_temperature is None:
_LOGGER.warning("Cannot read CPU / processor temperature information")
continue
sensor_registry[(type_, argument)] = SensorData(
argument, None, None, None, None
)
if _type == "process":
_entry: dict[str, list] = entry.options.get(SENSOR_DOMAIN, {})
for argument in _entry.get(CONF_PROCESS, []):
sensor_registry[(_type, argument)] = SensorData(
argument, None, None, None, None
)
entities.append(
SystemMonitorSensor(
sensor_registry,
sensor_description,
entry.entry_id,
argument,
True,
)
)
continue
sensor_registry[(_type, "")] = SensorData("", None, None, None, None)
is_enabled = check_legacy_resource(f"{_type}_", legacy_resources)
entities.append(
SystemMonitorSensor(sensor_registry, SENSOR_TYPES[type_], argument)
SystemMonitorSensor(
sensor_registry,
sensor_description,
entry.entry_id,
"",
is_enabled,
)
)
scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
scan_interval = DEFAULT_SCAN_INTERVAL
await async_setup_sensor_registry_updates(hass, sensor_registry, scan_interval)
async_add_entities(entities)
@ -433,12 +537,16 @@ class SystemMonitorSensor(SensorEntity):
"""Implementation of a system monitor sensor."""
should_poll = False
_attr_has_entity_name = True
_attr_entity_category = EntityCategory.DIAGNOSTIC
def __init__(
self,
sensor_registry: dict[tuple[str, str], SensorData],
sensor_description: SysMonitorSensorEntityDescription,
entry_id: str,
argument: str = "",
legacy_enabled: bool = False,
) -> None:
"""Initialize the sensor."""
self.entity_description = sensor_description
@ -446,6 +554,13 @@ class SystemMonitorSensor(SensorEntity):
self._attr_unique_id: str = slugify(f"{sensor_description.key}_{argument}")
self._sensor_registry = sensor_registry
self._argument: str = argument
self._attr_entity_registry_enabled_default = legacy_enabled
self._attr_device_info = DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, entry_id)},
manufacturer="System Monitor",
name="System Monitor",
)
@property
def native_value(self) -> str | datetime | None: