Move all System Monitor updates into single Data Update Coordinator (#112055)

This commit is contained in:
G Johansson 2024-03-03 18:24:04 +01:00 committed by GitHub
parent faee9d996d
commit cdd7b94a95
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 395 additions and 399 deletions

View file

@ -10,7 +10,9 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from .const import DOMAIN
from .const import DOMAIN, DOMAIN_COORDINATOR
from .coordinator import SystemMonitorCoordinator
from .util import get_all_disk_mounts
_LOGGER = logging.getLogger(__name__)
@ -21,6 +23,25 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up System Monitor from a config entry."""
psutil_wrapper = await hass.async_add_executor_job(ha_psutil.PsutilWrapper)
hass.data[DOMAIN] = psutil_wrapper
disk_arguments = list(await hass.async_add_executor_job(get_all_disk_mounts, hass))
legacy_resources: set[str] = set(entry.options.get("resources", []))
for resource in legacy_resources:
if resource.startswith("disk_"):
split_index = resource.rfind("_")
_type = resource[:split_index]
argument = resource[split_index + 1 :]
_LOGGER.debug("Loading legacy %s with argument %s", _type, argument)
disk_arguments.append(argument)
_LOGGER.debug("disk arguments to be added: %s", disk_arguments)
coordinator: SystemMonitorCoordinator = SystemMonitorCoordinator(
hass, psutil_wrapper, disk_arguments
)
await coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN_COORDINATOR] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(update_listener))
return True

View file

@ -7,10 +7,9 @@ from dataclasses import dataclass
from functools import lru_cache
import logging
import sys
from typing import Generic, Literal
from typing import Literal
from psutil import NoSuchProcess, Process
import psutil_home_assistant as ha_psutil
from psutil import NoSuchProcess
from homeassistant.components.binary_sensor import (
DOMAIN as BINARY_SENSOR_DOMAIN,
@ -26,8 +25,8 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import slugify
from .const import CONF_PROCESS, DOMAIN
from .coordinator import MonitorCoordinator, SystemMonitorProcessCoordinator, dataT
from .const import CONF_PROCESS, DOMAIN, DOMAIN_COORDINATOR
from .coordinator import SystemMonitorCoordinator
_LOGGER = logging.getLogger(__name__)
@ -51,10 +50,10 @@ def get_cpu_icon() -> Literal["mdi:cpu-64-bit", "mdi:cpu-32-bit"]:
return "mdi:cpu-32-bit"
def get_process(entity: SystemMonitorSensor[list[Process]]) -> bool:
def get_process(entity: SystemMonitorSensor) -> bool:
"""Return process."""
state = False
for proc in entity.coordinator.data:
for proc in entity.coordinator.data.processes:
try:
_LOGGER.debug("process %s for argument %s", proc.name(), entity.argument)
if entity.argument == proc.name():
@ -70,21 +69,21 @@ def get_process(entity: SystemMonitorSensor[list[Process]]) -> bool:
@dataclass(frozen=True, kw_only=True)
class SysMonitorBinarySensorEntityDescription(
BinarySensorEntityDescription, Generic[dataT]
):
class SysMonitorBinarySensorEntityDescription(BinarySensorEntityDescription):
"""Describes System Monitor binary sensor entities."""
value_fn: Callable[[SystemMonitorSensor[dataT]], bool]
value_fn: Callable[[SystemMonitorSensor], bool]
add_to_update: Callable[[SystemMonitorSensor], tuple[str, str]]
SENSOR_TYPES: tuple[SysMonitorBinarySensorEntityDescription[list[Process]], ...] = (
SysMonitorBinarySensorEntityDescription[list[Process]](
SENSOR_TYPES: tuple[SysMonitorBinarySensorEntityDescription, ...] = (
SysMonitorBinarySensorEntityDescription(
key="binary_process",
translation_key="process",
icon=get_cpu_icon(),
value_fn=get_process,
device_class=BinarySensorDeviceClass.RUNNING,
add_to_update=lambda entity: ("processes", ""),
),
)
@ -93,20 +92,15 @@ async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up System Montor binary sensors based on a config entry."""
psutil_wrapper: ha_psutil.PsutilWrapper = hass.data[DOMAIN]
entities: list[SystemMonitorSensor] = []
process_coordinator = SystemMonitorProcessCoordinator(
hass, psutil_wrapper, "Process coordinator"
)
await process_coordinator.async_request_refresh()
coordinator: SystemMonitorCoordinator = hass.data[DOMAIN_COORDINATOR]
for sensor_description in SENSOR_TYPES:
_entry = entry.options.get(BINARY_SENSOR_DOMAIN, {})
for argument in _entry.get(CONF_PROCESS, []):
entities.append(
SystemMonitorSensor(
process_coordinator,
coordinator,
sensor_description,
entry.entry_id,
argument,
@ -116,18 +110,18 @@ async def async_setup_entry(
class SystemMonitorSensor(
CoordinatorEntity[MonitorCoordinator[dataT]], BinarySensorEntity
CoordinatorEntity[SystemMonitorCoordinator], BinarySensorEntity
):
"""Implementation of a system monitor binary sensor."""
_attr_has_entity_name = True
_attr_entity_category = EntityCategory.DIAGNOSTIC
entity_description: SysMonitorBinarySensorEntityDescription[dataT]
entity_description: SysMonitorBinarySensorEntityDescription
def __init__(
self,
coordinator: MonitorCoordinator[dataT],
sensor_description: SysMonitorBinarySensorEntityDescription[dataT],
coordinator: SystemMonitorCoordinator,
sensor_description: SysMonitorBinarySensorEntityDescription,
entry_id: str,
argument: str,
) -> None:
@ -144,6 +138,20 @@ class SystemMonitorSensor(
)
self.argument = argument
async def async_added_to_hass(self) -> None:
"""When added to hass."""
self.coordinator.update_subscribers[
self.entity_description.add_to_update(self)
].add(self.entity_id)
return await super().async_added_to_hass()
async def async_will_remove_from_hass(self) -> None:
"""When removed from hass."""
self.coordinator.update_subscribers[
self.entity_description.add_to_update(self)
].remove(self.entity_id)
return await super().async_will_remove_from_hass()
@property
def is_on(self) -> bool | None:
"""Return true if the binary sensor is on."""

View file

@ -1,7 +1,7 @@
"""Constants for System Monitor."""
DOMAIN = "systemmonitor"
DOMAIN_COORDINATORS = "systemmonitor_coordinators"
DOMAIN_COORDINATOR = "systemmonitor_coordinator"
CONF_INDEX = "index"
CONF_PROCESS = "process"

View file

@ -2,11 +2,11 @@
from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass
from datetime import datetime
import logging
import os
from typing import NamedTuple, TypeVar
from typing import Any, NamedTuple
from psutil import Process
from psutil._common import sdiskusage, shwtemp, snetio, snicaddr, sswap
@ -14,15 +14,43 @@ import psutil_home_assistant as ha_psutil
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_component import DEFAULT_SCAN_INTERVAL
from homeassistant.helpers.update_coordinator import (
TimestampDataUpdateCoordinator,
UpdateFailed,
)
from homeassistant.helpers.update_coordinator import TimestampDataUpdateCoordinator
from homeassistant.util import dt as dt_util
_LOGGER = logging.getLogger(__name__)
@dataclass(frozen=True, kw_only=True, slots=True)
class SensorData:
"""Sensor data."""
disk_usage: dict[str, sdiskusage]
swap: sswap
memory: VirtualMemory
io_counters: dict[str, snetio]
addresses: dict[str, list[snicaddr]]
load: tuple[float, float, float]
cpu_percent: float | None
boot_time: datetime
processes: list[Process]
temperatures: dict[str, list[shwtemp]]
def as_dict(self) -> dict[str, Any]:
"""Return as dict."""
return {
"disk_usage": {k: str(v) for k, v in self.disk_usage.items()},
"swap": str(self.swap),
"memory": str(self.memory),
"io_counters": {k: str(v) for k, v in self.io_counters.items()},
"addresses": {k: str(v) for k, v in self.addresses.items()},
"load": str(self.load),
"cpu_percent": str(self.cpu_percent),
"boot_time": str(self.boot_time),
"processes": str(self.processes),
"temperatures": {k: str(v) for k, v in self.temperatures.items()},
}
class VirtualMemory(NamedTuple):
"""Represents virtual memory.
@ -37,177 +65,148 @@ class VirtualMemory(NamedTuple):
free: float
dataT = TypeVar(
"dataT",
bound=datetime
| dict[str, list[shwtemp]]
| dict[str, list[snicaddr]]
| dict[str, snetio]
| float
| list[Process]
| sswap
| VirtualMemory
| tuple[float, float, float]
| sdiskusage
| None,
)
class MonitorCoordinator(TimestampDataUpdateCoordinator[dataT]):
"""A System monitor Base Data Update Coordinator."""
def __init__(
self, hass: HomeAssistant, psutil_wrapper: ha_psutil.PsutilWrapper, name: str
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name=f"System Monitor {name}",
update_interval=DEFAULT_SCAN_INTERVAL,
always_update=False,
)
self._psutil = psutil_wrapper.psutil
async def _async_update_data(self) -> dataT:
"""Fetch data."""
return await self.hass.async_add_executor_job(self.update_data)
@abstractmethod
def update_data(self) -> dataT:
"""To be extended by data update coordinators."""
class SystemMonitorDiskCoordinator(MonitorCoordinator[sdiskusage]):
"""A System monitor Disk Data Update Coordinator."""
class SystemMonitorCoordinator(TimestampDataUpdateCoordinator[SensorData]):
"""A System monitor Data Update Coordinator."""
def __init__(
self,
hass: HomeAssistant,
psutil_wrapper: ha_psutil.PsutilWrapper,
name: str,
argument: str,
arguments: list[str],
) -> None:
"""Initialize the disk coordinator."""
super().__init__(hass, psutil_wrapper, name)
self._argument = argument
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name="System Monitor update coordinator",
update_interval=DEFAULT_SCAN_INTERVAL,
always_update=False,
)
self._psutil = psutil_wrapper.psutil
self._arguments = arguments
self.boot_time: datetime | None = None
def update_data(self) -> sdiskusage:
self._initial_update: bool = True
self.update_subscribers: dict[
tuple[str, str], set[str]
] = self.set_subscribers_tuples(arguments)
def set_subscribers_tuples(
self, arguments: list[str]
) -> dict[tuple[str, str], set[str]]:
"""Set tuples in subscribers dictionary."""
_disk_defaults: dict[tuple[str, str], set[str]] = {}
for argument in arguments:
_disk_defaults[("disks", argument)] = set()
return {
**_disk_defaults,
("swap", ""): set(),
("memory", ""): set(),
("io_counters", ""): set(),
("addresses", ""): set(),
("load", ""): set(),
("cpu_percent", ""): set(),
("boot", ""): set(),
("processes", ""): set(),
("temperatures", ""): set(),
}
async def _async_update_data(self) -> SensorData:
"""Fetch data."""
_LOGGER.debug("Update list is: %s", self.update_subscribers)
_data = await self.hass.async_add_executor_job(self.update_data)
load: tuple = (None, None, None)
if self.update_subscribers[("load", "")] or self._initial_update:
load = os.getloadavg()
_LOGGER.debug("Load: %s", load)
cpu_percent: float | None = None
if self.update_subscribers[("cpu_percent", "")] or self._initial_update:
cpu_percent = self._psutil.cpu_percent(interval=None)
_LOGGER.debug("cpu_percent: %s", cpu_percent)
self._initial_update = False
return SensorData(
disk_usage=_data["disks"],
swap=_data["swap"],
memory=_data["memory"],
io_counters=_data["io_counters"],
addresses=_data["addresses"],
load=load,
cpu_percent=cpu_percent,
boot_time=_data["boot_time"],
processes=_data["processes"],
temperatures=_data["temperatures"],
)
def update_data(self) -> dict[str, Any]:
"""To be extended by data update coordinators."""
disks: dict[str, sdiskusage] = {}
for argument in self._arguments:
if self.update_subscribers[("disks", argument)] or self._initial_update:
try:
usage: sdiskusage = self._psutil.disk_usage(self._argument)
_LOGGER.debug("sdiskusage: %s", usage)
return usage
usage: sdiskusage = self._psutil.disk_usage(argument)
_LOGGER.debug("sdiskusagefor %s: %s", argument, usage)
except PermissionError as err:
raise UpdateFailed(f"No permission to access {self._argument}") from err
_LOGGER.warning(
"No permission to access %s, error %s", argument, err
)
except OSError as err:
raise UpdateFailed(f"OS error for {self._argument}") from err
_LOGGER.warning("OS error for %s, error %s", argument, err)
else:
disks[argument] = usage
class SystemMonitorSwapCoordinator(MonitorCoordinator[sswap]):
"""A System monitor Swap Data Update Coordinator."""
def update_data(self) -> sswap:
"""Fetch data."""
swap: sswap = self._psutil.swap_memory()
swap: sswap | None = None
if self.update_subscribers[("swap", "")] or self._initial_update:
swap = self._psutil.swap_memory()
_LOGGER.debug("sswap: %s", swap)
return swap
class SystemMonitorMemoryCoordinator(MonitorCoordinator[VirtualMemory]):
"""A System monitor Memory Data Update Coordinator."""
def update_data(self) -> VirtualMemory:
"""Fetch data."""
memory = None
if self.update_subscribers[("memory", "")] or self._initial_update:
memory = self._psutil.virtual_memory()
_LOGGER.debug("memory: %s", memory)
return VirtualMemory(
memory = VirtualMemory(
memory.total, memory.available, memory.percent, memory.used, memory.free
)
class SystemMonitorNetIOCoordinator(MonitorCoordinator[dict[str, snetio]]):
"""A System monitor Network IO Data Update Coordinator."""
def update_data(self) -> dict[str, snetio]:
"""Fetch data."""
io_counters: dict[str, snetio] = self._psutil.net_io_counters(pernic=True)
io_counters: dict[str, snetio] | None = None
if self.update_subscribers[("io_counters", "")] or self._initial_update:
io_counters = self._psutil.net_io_counters(pernic=True)
_LOGGER.debug("io_counters: %s", io_counters)
return io_counters
class SystemMonitorNetAddrCoordinator(MonitorCoordinator[dict[str, list[snicaddr]]]):
"""A System monitor Network Address Data Update Coordinator."""
def update_data(self) -> dict[str, list[snicaddr]]:
"""Fetch data."""
addresses: dict[str, list[snicaddr]] = self._psutil.net_if_addrs()
addresses: dict[str, list[snicaddr]] | None = None
if self.update_subscribers[("addresses", "")] or self._initial_update:
addresses = self._psutil.net_if_addrs()
_LOGGER.debug("ip_addresses: %s", addresses)
return addresses
if self._initial_update:
# Boot time only needs to refresh on first pass
self.boot_time = dt_util.utc_from_timestamp(self._psutil.boot_time())
_LOGGER.debug("boot time: %s", self.boot_time)
class SystemMonitorLoadCoordinator(
MonitorCoordinator[tuple[float, float, float] | None]
):
"""A System monitor Load Data Update Coordinator."""
def update_data(self) -> tuple[float, float, float] | None:
"""Coordinator is not async."""
async def _async_update_data(self) -> tuple[float, float, float] | None:
"""Fetch data."""
return os.getloadavg()
class SystemMonitorProcessorCoordinator(MonitorCoordinator[float | None]):
"""A System monitor Processor Data Update Coordinator."""
def update_data(self) -> float | None:
"""Coordinator is not async."""
async def _async_update_data(self) -> float | None:
"""Get cpu usage.
Unlikely the rest of the coordinators, this one is async
since it does not block and we need to make sure it runs
in the same thread every time as psutil checks the thread
tid and compares it against the previous one.
"""
cpu_percent: float = self._psutil.cpu_percent(interval=None)
_LOGGER.debug("cpu_percent: %s", cpu_percent)
if cpu_percent > 0.0:
return cpu_percent
return None
class SystemMonitorBootTimeCoordinator(MonitorCoordinator[datetime]):
"""A System monitor Processor Data Update Coordinator."""
def update_data(self) -> datetime:
"""Fetch data."""
boot_time = dt_util.utc_from_timestamp(self._psutil.boot_time())
_LOGGER.debug("boot time: %s", boot_time)
return boot_time
class SystemMonitorProcessCoordinator(MonitorCoordinator[list[Process]]):
"""A System monitor Process Data Update Coordinator."""
def update_data(self) -> list[Process]:
"""Fetch data."""
processes = None
if self.update_subscribers[("processes", "")] or self._initial_update:
processes = self._psutil.process_iter()
_LOGGER.debug("processes: %s", processes)
return list(processes)
processes = list(processes)
class SystemMonitorCPUtempCoordinator(MonitorCoordinator[dict[str, list[shwtemp]]]):
"""A System monitor CPU Temperature Data Update Coordinator."""
def update_data(self) -> dict[str, list[shwtemp]]:
"""Fetch data."""
temps: dict[str, list[shwtemp]] = {}
if self.update_subscribers[("temperatures", "")] or self._initial_update:
try:
temps: dict[str, list[shwtemp]] = self._psutil.sensors_temperatures()
temps = self._psutil.sensors_temperatures()
_LOGGER.debug("temps: %s", temps)
return temps
except AttributeError as err:
raise UpdateFailed("OS does not provide temperature sensors") from err
except AttributeError:
_LOGGER.debug("OS does not provide temperature sensors")
return {
"disks": disks,
"swap": swap,
"memory": memory,
"io_counters": io_counters,
"addresses": addresses,
"boot_time": self.boot_time,
"processes": processes,
"temperatures": temps,
}

View file

@ -6,22 +6,20 @@ from typing import Any
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .const import DOMAIN_COORDINATORS
from .coordinator import MonitorCoordinator
from .const import DOMAIN_COORDINATOR
from .coordinator import SystemMonitorCoordinator
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: ConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for Sensibo config entry."""
coordinators: dict[str, MonitorCoordinator] = hass.data[DOMAIN_COORDINATORS]
coordinator: SystemMonitorCoordinator = hass.data[DOMAIN_COORDINATOR]
diag_data = {}
for _type, coordinator in coordinators.items():
diag_data[_type] = {
diag_data = {
"last_update_success": coordinator.last_update_success,
"last_update": str(coordinator.last_update_success_time),
"data": str(coordinator.data),
"data": coordinator.data.as_dict(),
}
return {

View file

@ -10,11 +10,9 @@ import logging
import socket
import sys
import time
from typing import Any, Generic, Literal
from typing import Any, Literal
from psutil import NoSuchProcess, Process
from psutil._common import sdiskusage, shwtemp, snetio, snicaddr, sswap
import psutil_home_assistant as ha_psutil
from psutil import NoSuchProcess
import voluptuous as vol
from homeassistant.components.sensor import (
@ -47,22 +45,8 @@ from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateTyp
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import slugify
from .const import CONF_PROCESS, DOMAIN, DOMAIN_COORDINATORS, NET_IO_TYPES
from .coordinator import (
MonitorCoordinator,
SystemMonitorBootTimeCoordinator,
SystemMonitorCPUtempCoordinator,
SystemMonitorDiskCoordinator,
SystemMonitorLoadCoordinator,
SystemMonitorMemoryCoordinator,
SystemMonitorNetAddrCoordinator,
SystemMonitorNetIOCoordinator,
SystemMonitorProcessCoordinator,
SystemMonitorProcessorCoordinator,
SystemMonitorSwapCoordinator,
VirtualMemory,
dataT,
)
from .const import CONF_PROCESS, DOMAIN, DOMAIN_COORDINATOR, NET_IO_TYPES
from .coordinator import SystemMonitorCoordinator
from .util import get_all_disk_mounts, get_all_network_interfaces, read_cpu_temperature
_LOGGER = logging.getLogger(__name__)
@ -88,16 +72,16 @@ def get_cpu_icon() -> Literal["mdi:cpu-64-bit", "mdi:cpu-32-bit"]:
def get_processor_temperature(
entity: SystemMonitorSensor[dict[str, list[shwtemp]]],
entity: SystemMonitorSensor,
) -> float | None:
"""Return processor temperature."""
return read_cpu_temperature(entity.hass, entity.coordinator.data)
return read_cpu_temperature(entity.hass, entity.coordinator.data.temperatures)
def get_process(entity: SystemMonitorSensor[list[Process]]) -> str:
def get_process(entity: SystemMonitorSensor) -> str:
"""Return process."""
state = STATE_OFF
for proc in entity.coordinator.data:
for proc in entity.coordinator.data.processes:
try:
_LOGGER.debug("process %s for argument %s", proc.name(), entity.argument)
if entity.argument == proc.name():
@ -112,26 +96,26 @@ def get_process(entity: SystemMonitorSensor[list[Process]]) -> str:
return state
def get_network(entity: SystemMonitorSensor[dict[str, snetio]]) -> float | None:
def get_network(entity: SystemMonitorSensor) -> float | None:
"""Return network in and out."""
counters = entity.coordinator.data
counters = entity.coordinator.data.io_counters
if entity.argument in counters:
counter = counters[entity.argument][IO_COUNTER[entity.entity_description.key]]
return round(counter / 1024**2, 1)
return None
def get_packets(entity: SystemMonitorSensor[dict[str, snetio]]) -> float | None:
def get_packets(entity: SystemMonitorSensor) -> float | None:
"""Return packets in and out."""
counters = entity.coordinator.data
counters = entity.coordinator.data.io_counters
if entity.argument in counters:
return counters[entity.argument][IO_COUNTER[entity.entity_description.key]]
return None
def get_throughput(entity: SystemMonitorSensor[dict[str, snetio]]) -> float | None:
def get_throughput(entity: SystemMonitorSensor) -> float | None:
"""Return network throughput in and out."""
counters = entity.coordinator.data
counters = entity.coordinator.data.io_counters
state = None
if entity.argument in counters:
counter = counters[entity.argument][IO_COUNTER[entity.entity_description.key]]
@ -151,10 +135,10 @@ def get_throughput(entity: SystemMonitorSensor[dict[str, snetio]]) -> float | No
def get_ip_address(
entity: SystemMonitorSensor[dict[str, list[snicaddr]]],
entity: SystemMonitorSensor,
) -> str | None:
"""Return network ip address."""
addresses = entity.coordinator.data
addresses = entity.coordinator.data.addresses
if entity.argument in addresses:
for addr in addresses[entity.argument]:
if addr.family == IF_ADDRS_FAMILY[entity.entity_description.key]:
@ -163,16 +147,18 @@ def get_ip_address(
@dataclass(frozen=True, kw_only=True)
class SysMonitorSensorEntityDescription(SensorEntityDescription, Generic[dataT]):
class SysMonitorSensorEntityDescription(SensorEntityDescription):
"""Describes System Monitor sensor entities."""
value_fn: Callable[[SystemMonitorSensor[dataT]], StateType | datetime]
value_fn: Callable[[SystemMonitorSensor], StateType | datetime]
add_to_update: Callable[[SystemMonitorSensor], tuple[str, str]]
none_is_unavailable: bool = False
mandatory_arg: bool = False
placeholder: str | None = None
SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription[Any]] = {
"disk_free": SysMonitorSensorEntityDescription[sdiskusage](
SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription] = {
"disk_free": SysMonitorSensorEntityDescription(
key="disk_free",
translation_key="disk_free",
placeholder="mount_point",
@ -180,9 +166,15 @@ SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription[Any]] = {
device_class=SensorDeviceClass.DATA_SIZE,
icon="mdi:harddisk",
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: round(entity.coordinator.data.free / 1024**3, 1),
value_fn=lambda entity: round(
entity.coordinator.data.disk_usage[entity.argument].free / 1024**3, 1
)
if entity.argument in entity.coordinator.data.disk_usage
else None,
none_is_unavailable=True,
add_to_update=lambda entity: ("disks", entity.argument),
),
"disk_use": SysMonitorSensorEntityDescription[sdiskusage](
"disk_use": SysMonitorSensorEntityDescription(
key="disk_use",
translation_key="disk_use",
placeholder="mount_point",
@ -190,70 +182,91 @@ SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription[Any]] = {
device_class=SensorDeviceClass.DATA_SIZE,
icon="mdi:harddisk",
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: round(entity.coordinator.data.used / 1024**3, 1),
value_fn=lambda entity: round(
entity.coordinator.data.disk_usage[entity.argument].used / 1024**3, 1
)
if entity.argument in entity.coordinator.data.disk_usage
else None,
none_is_unavailable=True,
add_to_update=lambda entity: ("disks", entity.argument),
),
"disk_use_percent": SysMonitorSensorEntityDescription[sdiskusage](
"disk_use_percent": SysMonitorSensorEntityDescription(
key="disk_use_percent",
translation_key="disk_use_percent",
placeholder="mount_point",
native_unit_of_measurement=PERCENTAGE,
icon="mdi:harddisk",
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: entity.coordinator.data.percent,
value_fn=lambda entity: entity.coordinator.data.disk_usage[
entity.argument
].percent
if entity.argument in entity.coordinator.data.disk_usage
else None,
none_is_unavailable=True,
add_to_update=lambda entity: ("disks", entity.argument),
),
"ipv4_address": SysMonitorSensorEntityDescription[dict[str, list[snicaddr]]](
"ipv4_address": SysMonitorSensorEntityDescription(
key="ipv4_address",
translation_key="ipv4_address",
placeholder="ip_address",
icon="mdi:ip-network",
mandatory_arg=True,
value_fn=get_ip_address,
add_to_update=lambda entity: ("addresses", ""),
),
"ipv6_address": SysMonitorSensorEntityDescription[dict[str, list[snicaddr]]](
"ipv6_address": SysMonitorSensorEntityDescription(
key="ipv6_address",
translation_key="ipv6_address",
placeholder="ip_address",
icon="mdi:ip-network",
mandatory_arg=True,
value_fn=get_ip_address,
add_to_update=lambda entity: ("addresses", ""),
),
"last_boot": SysMonitorSensorEntityDescription[datetime](
"last_boot": SysMonitorSensorEntityDescription(
key="last_boot",
translation_key="last_boot",
device_class=SensorDeviceClass.TIMESTAMP,
value_fn=lambda entity: entity.coordinator.data,
value_fn=lambda entity: entity.coordinator.data.boot_time,
add_to_update=lambda entity: ("boot", ""),
),
"load_15m": SysMonitorSensorEntityDescription[tuple[float, float, float]](
"load_15m": SysMonitorSensorEntityDescription(
key="load_15m",
translation_key="load_15m",
icon=get_cpu_icon(),
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: round(entity.coordinator.data[2], 2),
value_fn=lambda entity: round(entity.coordinator.data.load[2], 2),
add_to_update=lambda entity: ("load", ""),
),
"load_1m": SysMonitorSensorEntityDescription[tuple[float, float, float]](
"load_1m": SysMonitorSensorEntityDescription(
key="load_1m",
translation_key="load_1m",
icon=get_cpu_icon(),
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: round(entity.coordinator.data[0], 2),
value_fn=lambda entity: round(entity.coordinator.data.load[0], 2),
add_to_update=lambda entity: ("load", ""),
),
"load_5m": SysMonitorSensorEntityDescription[tuple[float, float, float]](
"load_5m": SysMonitorSensorEntityDescription(
key="load_5m",
translation_key="load_5m",
icon=get_cpu_icon(),
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: round(entity.coordinator.data[1], 2),
value_fn=lambda entity: round(entity.coordinator.data.load[1], 2),
add_to_update=lambda entity: ("load", ""),
),
"memory_free": SysMonitorSensorEntityDescription[VirtualMemory](
"memory_free": SysMonitorSensorEntityDescription(
key="memory_free",
translation_key="memory_free",
native_unit_of_measurement=UnitOfInformation.MEBIBYTES,
device_class=SensorDeviceClass.DATA_SIZE,
icon="mdi:memory",
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: round(entity.coordinator.data.available / 1024**2, 1),
value_fn=lambda entity: round(
entity.coordinator.data.memory.available / 1024**2, 1
),
"memory_use": SysMonitorSensorEntityDescription[VirtualMemory](
add_to_update=lambda entity: ("memory", ""),
),
"memory_use": SysMonitorSensorEntityDescription(
key="memory_use",
translation_key="memory_use",
native_unit_of_measurement=UnitOfInformation.MEBIBYTES,
@ -261,20 +274,25 @@ SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription[Any]] = {
icon="mdi:memory",
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: round(
(entity.coordinator.data.total - entity.coordinator.data.available)
(
entity.coordinator.data.memory.total
- entity.coordinator.data.memory.available
)
/ 1024**2,
1,
),
add_to_update=lambda entity: ("memory", ""),
),
"memory_use_percent": SysMonitorSensorEntityDescription[VirtualMemory](
"memory_use_percent": SysMonitorSensorEntityDescription(
key="memory_use_percent",
translation_key="memory_use_percent",
native_unit_of_measurement=PERCENTAGE,
icon="mdi:memory",
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: entity.coordinator.data.percent,
value_fn=lambda entity: entity.coordinator.data.memory.percent,
add_to_update=lambda entity: ("memory", ""),
),
"network_in": SysMonitorSensorEntityDescription[dict[str, snetio]](
"network_in": SysMonitorSensorEntityDescription(
key="network_in",
translation_key="network_in",
placeholder="interface",
@ -284,8 +302,9 @@ SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription[Any]] = {
state_class=SensorStateClass.TOTAL_INCREASING,
mandatory_arg=True,
value_fn=get_network,
add_to_update=lambda entity: ("io_counters", ""),
),
"network_out": SysMonitorSensorEntityDescription[dict[str, snetio]](
"network_out": SysMonitorSensorEntityDescription(
key="network_out",
translation_key="network_out",
placeholder="interface",
@ -295,8 +314,9 @@ SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription[Any]] = {
state_class=SensorStateClass.TOTAL_INCREASING,
mandatory_arg=True,
value_fn=get_network,
add_to_update=lambda entity: ("io_counters", ""),
),
"packets_in": SysMonitorSensorEntityDescription[dict[str, snetio]](
"packets_in": SysMonitorSensorEntityDescription(
key="packets_in",
translation_key="packets_in",
placeholder="interface",
@ -304,8 +324,9 @@ SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription[Any]] = {
state_class=SensorStateClass.TOTAL_INCREASING,
mandatory_arg=True,
value_fn=get_packets,
add_to_update=lambda entity: ("io_counters", ""),
),
"packets_out": SysMonitorSensorEntityDescription[dict[str, snetio]](
"packets_out": SysMonitorSensorEntityDescription(
key="packets_out",
translation_key="packets_out",
placeholder="interface",
@ -313,8 +334,9 @@ SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription[Any]] = {
state_class=SensorStateClass.TOTAL_INCREASING,
mandatory_arg=True,
value_fn=get_packets,
add_to_update=lambda entity: ("io_counters", ""),
),
"throughput_network_in": SysMonitorSensorEntityDescription[dict[str, snetio]](
"throughput_network_in": SysMonitorSensorEntityDescription(
key="throughput_network_in",
translation_key="throughput_network_in",
placeholder="interface",
@ -323,8 +345,9 @@ SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription[Any]] = {
state_class=SensorStateClass.MEASUREMENT,
mandatory_arg=True,
value_fn=get_throughput,
add_to_update=lambda entity: ("io_counters", ""),
),
"throughput_network_out": SysMonitorSensorEntityDescription[dict[str, snetio]](
"throughput_network_out": SysMonitorSensorEntityDescription(
key="throughput_network_out",
translation_key="throughput_network_out",
placeholder="interface",
@ -333,60 +356,68 @@ SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription[Any]] = {
state_class=SensorStateClass.MEASUREMENT,
mandatory_arg=True,
value_fn=get_throughput,
add_to_update=lambda entity: ("io_counters", ""),
),
"process": SysMonitorSensorEntityDescription[list[Process]](
"process": SysMonitorSensorEntityDescription(
key="process",
translation_key="process",
placeholder="process",
icon=get_cpu_icon(),
mandatory_arg=True,
value_fn=get_process,
add_to_update=lambda entity: ("processes", ""),
),
"processor_use": SysMonitorSensorEntityDescription[float](
"processor_use": SysMonitorSensorEntityDescription(
key="processor_use",
translation_key="processor_use",
native_unit_of_measurement=PERCENTAGE,
icon=get_cpu_icon(),
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: (
round(entity.coordinator.data) if entity.coordinator.data else None
round(entity.coordinator.data.cpu_percent)
if entity.coordinator.data.cpu_percent
else None
),
add_to_update=lambda entity: ("cpu_percent", ""),
),
"processor_temperature": SysMonitorSensorEntityDescription[
dict[str, list[shwtemp]]
](
"processor_temperature": SysMonitorSensorEntityDescription(
key="processor_temperature",
translation_key="processor_temperature",
native_unit_of_measurement=UnitOfTemperature.CELSIUS,
device_class=SensorDeviceClass.TEMPERATURE,
state_class=SensorStateClass.MEASUREMENT,
value_fn=get_processor_temperature,
none_is_unavailable=True,
add_to_update=lambda entity: ("temperatures", ""),
),
"swap_free": SysMonitorSensorEntityDescription[sswap](
"swap_free": SysMonitorSensorEntityDescription(
key="swap_free",
translation_key="swap_free",
native_unit_of_measurement=UnitOfInformation.MEBIBYTES,
device_class=SensorDeviceClass.DATA_SIZE,
icon="mdi:harddisk",
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: round(entity.coordinator.data.free / 1024**2, 1),
value_fn=lambda entity: round(entity.coordinator.data.swap.free / 1024**2, 1),
add_to_update=lambda entity: ("swap", ""),
),
"swap_use": SysMonitorSensorEntityDescription[sswap](
"swap_use": SysMonitorSensorEntityDescription(
key="swap_use",
translation_key="swap_use",
native_unit_of_measurement=UnitOfInformation.MEBIBYTES,
device_class=SensorDeviceClass.DATA_SIZE,
icon="mdi:harddisk",
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: round(entity.coordinator.data.used / 1024**2, 1),
value_fn=lambda entity: round(entity.coordinator.data.swap.used / 1024**2, 1),
add_to_update=lambda entity: ("swap", ""),
),
"swap_use_percent": SysMonitorSensorEntityDescription[sswap](
"swap_use_percent": SysMonitorSensorEntityDescription(
key="swap_use_percent",
translation_key="swap_use_percent",
native_unit_of_measurement=PERCENTAGE,
icon="mdi:harddisk",
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: entity.coordinator.data.percent,
value_fn=lambda entity: entity.coordinator.data.swap.percent,
add_to_update=lambda entity: ("swap", ""),
),
}
@ -489,7 +520,7 @@ async def async_setup_entry( # noqa: C901
entities: list[SystemMonitorSensor] = []
legacy_resources: set[str] = set(entry.options.get("resources", []))
loaded_resources: set[str] = set()
psutil_wrapper: ha_psutil.PsutilWrapper = hass.data[DOMAIN]
coordinator: SystemMonitorCoordinator = hass.data[DOMAIN_COORDINATOR]
def get_arguments() -> dict[str, Any]:
"""Return startup information."""
@ -507,44 +538,6 @@ async def async_setup_entry( # noqa: C901
startup_arguments = await hass.async_add_executor_job(get_arguments)
disk_coordinators: dict[str, SystemMonitorDiskCoordinator] = {}
for argument in startup_arguments["disk_arguments"]:
disk_coordinators[argument] = SystemMonitorDiskCoordinator(
hass, psutil_wrapper, f"Disk {argument} coordinator", argument
)
swap_coordinator = SystemMonitorSwapCoordinator(
hass, psutil_wrapper, "Swap coordinator"
)
memory_coordinator = SystemMonitorMemoryCoordinator(
hass, psutil_wrapper, "Memory coordinator"
)
net_io_coordinator = SystemMonitorNetIOCoordinator(
hass, psutil_wrapper, "Net IO coordnator"
)
net_addr_coordinator = SystemMonitorNetAddrCoordinator(
hass, psutil_wrapper, "Net address coordinator"
)
system_load_coordinator = SystemMonitorLoadCoordinator(
hass, psutil_wrapper, "System load coordinator"
)
processor_coordinator = SystemMonitorProcessorCoordinator(
hass, psutil_wrapper, "Processor coordinator"
)
boot_time_coordinator = SystemMonitorBootTimeCoordinator(
hass, psutil_wrapper, "Boot time coordinator"
)
process_coordinator = SystemMonitorProcessCoordinator(
hass, psutil_wrapper, "Process coordinator"
)
cpu_temp_coordinator = SystemMonitorCPUtempCoordinator(
hass, psutil_wrapper, "CPU temperature coordinator"
)
for argument in startup_arguments["disk_arguments"]:
disk_coordinators[argument] = SystemMonitorDiskCoordinator(
hass, psutil_wrapper, f"Disk {argument} coordinator", argument
)
_LOGGER.debug("Setup from options %s", entry.options)
for _type, sensor_description in SENSOR_TYPES.items():
@ -556,7 +549,7 @@ async def async_setup_entry( # noqa: C901
loaded_resources.add(slugify(f"{_type}_{argument}"))
entities.append(
SystemMonitorSensor(
disk_coordinators[argument],
coordinator,
sensor_description,
entry.entry_id,
argument,
@ -573,7 +566,7 @@ async def async_setup_entry( # noqa: C901
loaded_resources.add(slugify(f"{_type}_{argument}"))
entities.append(
SystemMonitorSensor(
net_addr_coordinator,
coordinator,
sensor_description,
entry.entry_id,
argument,
@ -588,7 +581,7 @@ async def async_setup_entry( # noqa: C901
loaded_resources.add(slugify(f"{_type}_{argument}"))
entities.append(
SystemMonitorSensor(
boot_time_coordinator,
coordinator,
sensor_description,
entry.entry_id,
argument,
@ -603,7 +596,7 @@ async def async_setup_entry( # noqa: C901
loaded_resources.add(slugify(f"{_type}_{argument}"))
entities.append(
SystemMonitorSensor(
system_load_coordinator,
coordinator,
sensor_description,
entry.entry_id,
argument,
@ -618,7 +611,7 @@ async def async_setup_entry( # noqa: C901
loaded_resources.add(slugify(f"{_type}_{argument}"))
entities.append(
SystemMonitorSensor(
memory_coordinator,
coordinator,
sensor_description,
entry.entry_id,
argument,
@ -634,7 +627,7 @@ async def async_setup_entry( # noqa: C901
loaded_resources.add(slugify(f"{_type}_{argument}"))
entities.append(
SystemMonitorSensor(
net_io_coordinator,
coordinator,
sensor_description,
entry.entry_id,
argument,
@ -649,7 +642,7 @@ async def async_setup_entry( # noqa: C901
loaded_resources.add(slugify(f"{_type}_{argument}"))
entities.append(
SystemMonitorSensor(
process_coordinator,
coordinator,
sensor_description,
entry.entry_id,
argument,
@ -678,7 +671,7 @@ async def async_setup_entry( # noqa: C901
loaded_resources.add(slugify(f"{_type}_{argument}"))
entities.append(
SystemMonitorSensor(
processor_coordinator,
coordinator,
sensor_description,
entry.entry_id,
argument,
@ -696,7 +689,7 @@ async def async_setup_entry( # noqa: C901
loaded_resources.add(slugify(f"{_type}_{argument}"))
entities.append(
SystemMonitorSensor(
cpu_temp_coordinator,
coordinator,
sensor_description,
entry.entry_id,
argument,
@ -711,7 +704,7 @@ async def async_setup_entry( # noqa: C901
loaded_resources.add(slugify(f"{_type}_{argument}"))
entities.append(
SystemMonitorSensor(
swap_coordinator,
coordinator,
sensor_description,
entry.entry_id,
argument,
@ -735,13 +728,9 @@ async def async_setup_entry( # noqa: C901
_type = resource[:split_index]
argument = resource[split_index + 1 :]
_LOGGER.debug("Loading legacy %s with argument %s", _type, argument)
if not disk_coordinators.get(argument):
disk_coordinators[argument] = SystemMonitorDiskCoordinator(
hass, psutil_wrapper, f"Disk {argument} coordinator", argument
)
entities.append(
SystemMonitorSensor(
disk_coordinators[argument],
coordinator,
SENSOR_TYPES[_type],
entry.entry_id,
argument,
@ -749,23 +738,6 @@ async def async_setup_entry( # noqa: C901
)
)
hass.data[DOMAIN_COORDINATORS] = {}
# No gathering to avoid swamping the executor
for argument, coordinator in disk_coordinators.items():
hass.data[DOMAIN_COORDINATORS][f"disk_{argument}"] = coordinator
hass.data[DOMAIN_COORDINATORS]["boot_time"] = boot_time_coordinator
hass.data[DOMAIN_COORDINATORS]["cpu_temp"] = cpu_temp_coordinator
hass.data[DOMAIN_COORDINATORS]["memory"] = memory_coordinator
hass.data[DOMAIN_COORDINATORS]["net_addr"] = net_addr_coordinator
hass.data[DOMAIN_COORDINATORS]["net_io"] = net_io_coordinator
hass.data[DOMAIN_COORDINATORS]["process"] = process_coordinator
hass.data[DOMAIN_COORDINATORS]["processor"] = processor_coordinator
hass.data[DOMAIN_COORDINATORS]["swap"] = swap_coordinator
hass.data[DOMAIN_COORDINATORS]["system_load"] = system_load_coordinator
for coordinator in hass.data[DOMAIN_COORDINATORS].values():
await coordinator.async_request_refresh()
@callback
def clean_obsolete_entities() -> None:
"""Remove entities which are disabled and not supported from setup."""
@ -790,17 +762,18 @@ async def async_setup_entry( # noqa: C901
async_add_entities(entities)
class SystemMonitorSensor(CoordinatorEntity[MonitorCoordinator[dataT]], SensorEntity):
class SystemMonitorSensor(CoordinatorEntity[SystemMonitorCoordinator], SensorEntity):
"""Implementation of a system monitor sensor."""
_attr_has_entity_name = True
_attr_entity_category = EntityCategory.DIAGNOSTIC
entity_description: SysMonitorSensorEntityDescription[dataT]
entity_description: SysMonitorSensorEntityDescription
argument: str
def __init__(
self,
coordinator: MonitorCoordinator[dataT],
sensor_description: SysMonitorSensorEntityDescription[dataT],
coordinator: SystemMonitorCoordinator,
sensor_description: SysMonitorSensorEntityDescription,
entry_id: str,
argument: str,
legacy_enabled: bool = False,
@ -824,7 +797,31 @@ class SystemMonitorSensor(CoordinatorEntity[MonitorCoordinator[dataT]], SensorEn
self.value: int | None = None
self.update_time: float | None = None
async def async_added_to_hass(self) -> None:
"""When added to hass."""
self.coordinator.update_subscribers[
self.entity_description.add_to_update(self)
].add(self.entity_id)
return await super().async_added_to_hass()
async def async_will_remove_from_hass(self) -> None:
"""When removed from hass."""
self.coordinator.update_subscribers[
self.entity_description.add_to_update(self)
].remove(self.entity_id)
return await super().async_will_remove_from_hass()
@property
def native_value(self) -> StateType | datetime:
"""Return the state."""
return self.entity_description.value_fn(self)
@property
def available(self) -> bool:
"""Return if entity is available."""
if self.entity_description.none_is_unavailable:
return bool(
self.coordinator.last_update_success is True
and self.native_value is not None
)
return super().available

View file

@ -2,55 +2,34 @@
# name: test_diagnostics
dict({
'coordinators': dict({
'boot_time': dict({
'data': '2024-02-24 15:00:00+00:00',
'last_update_success': True,
'data': dict({
'addresses': dict({
'eth0': "[snicaddr(family=<AddressFamily.AF_INET: 2>, address='192.168.1.1', netmask='255.255.255.0', broadcast='255.255.255.255', ptp=None)]",
'eth1': "[snicaddr(family=<AddressFamily.AF_INET: 2>, address='192.168.10.1', netmask='255.255.255.0', broadcast='255.255.255.255', ptp=None)]",
'vethxyzxyz': "[snicaddr(family=<AddressFamily.AF_INET: 2>, address='172.16.10.1', netmask='255.255.255.0', broadcast='255.255.255.255', ptp=None)]",
}),
'cpu_temp': dict({
'data': "{'cpu0-thermal': [shwtemp(label='cpu0-thermal', current=50.0, high=60.0, critical=70.0)]}",
'last_update_success': True,
'boot_time': '2024-02-24 15:00:00+00:00',
'cpu_percent': '10.0',
'disk_usage': dict({
'/': 'sdiskusage(total=536870912000, used=322122547200, free=214748364800, percent=60.0)',
'/home/notexist/': 'sdiskusage(total=536870912000, used=322122547200, free=214748364800, percent=60.0)',
'/media/share': 'sdiskusage(total=536870912000, used=322122547200, free=214748364800, percent=60.0)',
}),
'disk_/': dict({
'data': 'sdiskusage(total=536870912000, used=322122547200, free=214748364800, percent=60.0)',
'last_update_success': True,
'io_counters': dict({
'eth0': 'snetio(bytes_sent=104857600, bytes_recv=104857600, packets_sent=50, packets_recv=50, errin=0, errout=0, dropin=0, dropout=0)',
'eth1': 'snetio(bytes_sent=209715200, bytes_recv=209715200, packets_sent=150, packets_recv=150, errin=0, errout=0, dropin=0, dropout=0)',
'vethxyzxyz': 'snetio(bytes_sent=314572800, bytes_recv=314572800, packets_sent=150, packets_recv=150, errin=0, errout=0, dropin=0, dropout=0)',
}),
'disk_/home/notexist/': dict({
'data': 'sdiskusage(total=536870912000, used=322122547200, free=214748364800, percent=60.0)',
'last_update_success': True,
'load': '(1, 2, 3)',
'memory': 'VirtualMemory(total=104857600, available=41943040, percent=40.0, used=62914560, free=31457280)',
'processes': "[tests.components.systemmonitor.conftest.MockProcess(pid=1, name='python3', status='sleeping', started='2024-02-23 15:00:00'), tests.components.systemmonitor.conftest.MockProcess(pid=1, name='pip', status='sleeping', started='2024-02-23 15:00:00')]",
'swap': 'sswap(total=104857600, used=62914560, free=41943040, percent=60.0, sin=1, sout=1)',
'temperatures': dict({
'cpu0-thermal': "[shwtemp(label='cpu0-thermal', current=50.0, high=60.0, critical=70.0)]",
}),
'disk_/media/share': dict({
'data': 'sdiskusage(total=536870912000, used=322122547200, free=214748364800, percent=60.0)',
'last_update_success': True,
}),
'memory': dict({
'data': 'VirtualMemory(total=104857600, available=41943040, percent=40.0, used=62914560, free=31457280)',
'last_update_success': True,
}),
'net_addr': dict({
'data': "{'eth0': [snicaddr(family=<AddressFamily.AF_INET: 2>, address='192.168.1.1', netmask='255.255.255.0', broadcast='255.255.255.255', ptp=None)], 'eth1': [snicaddr(family=<AddressFamily.AF_INET: 2>, address='192.168.10.1', netmask='255.255.255.0', broadcast='255.255.255.255', ptp=None)], 'vethxyzxyz': [snicaddr(family=<AddressFamily.AF_INET: 2>, address='172.16.10.1', netmask='255.255.255.0', broadcast='255.255.255.255', ptp=None)]}",
'last_update_success': True,
}),
'net_io': dict({
'data': "{'eth0': snetio(bytes_sent=104857600, bytes_recv=104857600, packets_sent=50, packets_recv=50, errin=0, errout=0, dropin=0, dropout=0), 'eth1': snetio(bytes_sent=209715200, bytes_recv=209715200, packets_sent=150, packets_recv=150, errin=0, errout=0, dropin=0, dropout=0), 'vethxyzxyz': snetio(bytes_sent=314572800, bytes_recv=314572800, packets_sent=150, packets_recv=150, errin=0, errout=0, dropin=0, dropout=0)}",
'last_update_success': True,
}),
'process': dict({
'data': "[tests.components.systemmonitor.conftest.MockProcess(pid=1, name='python3', status='sleeping', started='2024-02-23 15:00:00'), tests.components.systemmonitor.conftest.MockProcess(pid=1, name='pip', status='sleeping', started='2024-02-23 15:00:00')]",
'last_update_success': True,
}),
'processor': dict({
'data': '10.0',
'last_update_success': True,
}),
'swap': dict({
'data': 'sswap(total=104857600, used=62914560, free=41943040, percent=60.0, sin=1, sout=1)',
'last_update_success': True,
}),
'system_load': dict({
'data': '(1, 2, 3)',
'last_update_success': True,
}),
}),
'entry': dict({
'data': dict({
}),

View file

@ -473,10 +473,7 @@ async def test_exception_handling_disk_sensor(
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert (
"Error fetching System Monitor Disk / coordinator data: OS error for /"
in caplog.text
)
assert "OS error for /" in caplog.text
disk_sensor = hass.states.get("sensor.system_monitor_disk_free")
assert disk_sensor is not None
@ -489,10 +486,7 @@ async def test_exception_handling_disk_sensor(
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert (
"Error fetching System Monitor Disk / coordinator data: OS error for /"
in caplog.text
)
assert "OS error for /" in caplog.text
disk_sensor = hass.states.get("sensor.system_monitor_disk_free")
assert disk_sensor is not None