Use set instead of list in Systemmonitor (#106650)

This commit is contained in:
G Johansson 2023-12-29 13:21:08 +01:00 committed by GitHub
parent 19e0f55fc8
commit 4cd1965786
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 17 deletions

View file

@ -86,7 +86,7 @@ async def validate_import_sensor_setup(
async def get_sensor_setup_schema(handler: SchemaCommonFlowHandler) -> vol.Schema:
"""Return process sensor setup schema."""
hass = handler.parent_handler.hass
processes = await hass.async_add_executor_job(get_all_running_processes)
processes = list(await hass.async_add_executor_job(get_all_running_processes))
return vol.Schema(
{
vol.Required(CONF_PROCESS): SelectSelector(

View file

@ -267,7 +267,7 @@ def check_required_arg(value: Any) -> Any:
return value
def check_legacy_resource(resource: str, resources: list[str]) -> bool:
def check_legacy_resource(resource: str, resources: set[str]) -> bool:
"""Return True if legacy resource was configured."""
# This function to check legacy resources can be removed
# once we are removing the import from YAML
@ -388,8 +388,8 @@ async def async_setup_entry(
"""Set up System Montor sensors based on a config entry."""
entities = []
sensor_registry: dict[tuple[str, str], SensorData] = {}
legacy_resources: list[str] = entry.options.get("resources", [])
loaded_resources: list[str] = []
legacy_resources: set[str] = set(entry.options.get("resources", []))
loaded_resources: set[str] = set()
disk_arguments = await hass.async_add_executor_job(get_all_disk_mounts)
network_arguments = await hass.async_add_executor_job(get_all_network_interfaces)
cpu_temperature = await hass.async_add_executor_job(_read_cpu_temperature)
@ -405,7 +405,7 @@ async def async_setup_entry(
is_enabled = check_legacy_resource(
f"{_type}_{argument}", legacy_resources
)
loaded_resources.append(f"{_type}_{argument}")
loaded_resources.add(f"{_type}_{argument}")
entities.append(
SystemMonitorSensor(
sensor_registry,
@ -425,7 +425,7 @@ async def async_setup_entry(
is_enabled = check_legacy_resource(
f"{_type}_{argument}", legacy_resources
)
loaded_resources.append(f"{_type}_{argument}")
loaded_resources.add(f"{_type}_{argument}")
entities.append(
SystemMonitorSensor(
sensor_registry,
@ -449,7 +449,7 @@ async def async_setup_entry(
sensor_registry[(_type, argument)] = SensorData(
argument, None, None, None, None
)
loaded_resources.append(f"{_type}_{argument}")
loaded_resources.add(f"{_type}_{argument}")
entities.append(
SystemMonitorSensor(
sensor_registry,
@ -463,7 +463,7 @@ async def async_setup_entry(
sensor_registry[(_type, "")] = SensorData("", None, None, None, None)
is_enabled = check_legacy_resource(f"{_type}_", legacy_resources)
loaded_resources.append(f"{_type}_")
loaded_resources.add(f"{_type}_")
entities.append(
SystemMonitorSensor(
sensor_registry,

View file

@ -8,9 +8,9 @@ import psutil
_LOGGER = logging.getLogger(__name__)
def get_all_disk_mounts() -> list[str]:
def get_all_disk_mounts() -> set[str]:
"""Return all disk mount points on system."""
disks: list[str] = []
disks: set[str] = set()
for part in psutil.disk_partitions(all=True):
if os.name == "nt":
if "cdrom" in part.opts or part.fstype == "":
@ -20,25 +20,25 @@ def get_all_disk_mounts() -> list[str]:
continue
usage = psutil.disk_usage(part.mountpoint)
if usage.total > 0 and part.device != "":
disks.append(part.mountpoint)
disks.add(part.mountpoint)
_LOGGER.debug("Adding disks: %s", ", ".join(disks))
return disks
def get_all_network_interfaces() -> list[str]:
def get_all_network_interfaces() -> set[str]:
"""Return all network interfaces on system."""
interfaces: list[str] = []
interfaces: set[str] = set()
for interface, _ in psutil.net_if_addrs().items():
interfaces.append(interface)
interfaces.add(interface)
_LOGGER.debug("Adding interfaces: %s", ", ".join(interfaces))
return interfaces
def get_all_running_processes() -> list[str]:
def get_all_running_processes() -> set[str]:
"""Return all running processes on system."""
processes: list[str] = []
processes: set[str] = set()
for proc in psutil.process_iter(["name"]):
if proc.name() not in processes:
processes.append(proc.name())
processes.add(proc.name())
_LOGGER.debug("Running processes: %s", ", ".join(processes))
return processes