Add broken link and missing device lists to insteon configuration panel (#119715)

* Add broken link and missing device lists

* Fix incorrect import

* Add tests

* Bump pyinsteon

* Typing
This commit is contained in:
Tom Harris 2024-09-20 06:11:51 -04:00 committed by GitHub
parent 90f691fa2c
commit 7433d2eca9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 245 additions and 23 deletions

View file

@ -14,13 +14,16 @@ from .aldb import (
websocket_get_aldb,
websocket_load_aldb,
websocket_notify_on_aldb_status,
websocket_notify_on_aldb_status_all,
websocket_reset_aldb,
websocket_write_aldb,
)
from .config import (
websocket_add_device_override,
websocket_get_broken_links,
websocket_get_config,
websocket_get_modem_schema,
websocket_get_unknown_devices,
websocket_remove_device_override,
websocket_update_modem_config,
)
@ -70,6 +73,7 @@ def async_load_api(hass):
websocket_api.async_register_command(hass, websocket_notify_on_aldb_status)
websocket_api.async_register_command(hass, websocket_add_x10_device)
websocket_api.async_register_command(hass, websocket_remove_device)
websocket_api.async_register_command(hass, websocket_notify_on_aldb_status_all)
websocket_api.async_register_command(hass, websocket_get_properties)
websocket_api.async_register_command(hass, websocket_change_properties_record)
@ -82,6 +86,8 @@ def async_load_api(hass):
websocket_api.async_register_command(hass, websocket_update_modem_config)
websocket_api.async_register_command(hass, websocket_add_device_override)
websocket_api.async_register_command(hass, websocket_remove_device_override)
websocket_api.async_register_command(hass, websocket_get_broken_links)
websocket_api.async_register_command(hass, websocket_get_unknown_devices)
async def async_register_insteon_frontend(hass: HomeAssistant):

View file

@ -11,7 +11,8 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
from ..const import DEVICE_ADDRESS, ID, INSTEON_DEVICE_NOT_FOUND, TYPE
from .device import async_device_name, notify_device_not_found
from ..utils import async_device_name
from .device import notify_device_not_found
ALDB_RECORD = "record"
ALDB_RECORD_SCHEMA = vol.Schema(
@ -59,6 +60,13 @@ async def async_reload_and_save_aldb(hass, device):
await devices.async_save(workdir=hass.config.config_dir)
def any_aldb_loading() -> bool:
"""Identify if any All-Link Databases are loading."""
return any(
device.aldb.status == ALDBStatus.LOADING for _, device in devices.items()
)
@websocket_api.websocket_command(
{vol.Required(TYPE): "insteon/aldb/get", vol.Required(DEVICE_ADDRESS): str}
)
@ -293,3 +301,45 @@ async def websocket_notify_on_aldb_status(
device.aldb.subscribe_status_changed(aldb_loaded)
connection.send_result(msg[ID])
@websocket_api.websocket_command({vol.Required(TYPE): "insteon/aldb/notify_all"})
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_notify_on_aldb_status_all(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Tell Insteon all ALDBs are loaded."""
@callback
def aldb_status_changed(status: ALDBStatus) -> None:
"""Forward ALDB loaded event to websocket."""
forward_data = {
"type": "status",
"is_loading": any_aldb_loading(),
}
connection.send_message(websocket_api.event_message(msg["id"], forward_data))
@callback
def async_cleanup() -> None:
"""Remove signal listeners."""
for device in devices.values():
device.aldb.unsubscribe_status_changed(aldb_status_changed)
forward_data = {"type": "unsubscribed"}
connection.send_message(websocket_api.event_message(msg["id"], forward_data))
connection.subscriptions[msg["id"]] = async_cleanup
for device in devices.values():
device.aldb.subscribe_status_changed(aldb_status_changed)
connection.send_result(msg[ID])
forward_data = {
"type": "status",
"is_loading": any_aldb_loading(),
}
connection.send_message(websocket_api.event_message(msg["id"], forward_data))

View file

@ -6,6 +6,9 @@ from typing import Any, TypedDict
from pyinsteon import async_close, async_connect, devices
from pyinsteon.address import Address
from pyinsteon.aldb.aldb_record import ALDBRecord
from pyinsteon.constants import LinkStatus
from pyinsteon.managers.link_manager import get_broken_links
import voluptuous as vol
import voluptuous_serialize
@ -13,6 +16,7 @@ from homeassistant.components import websocket_api
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_ADDRESS, CONF_DEVICE
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.dispatcher import async_dispatcher_send
from ..const import (
@ -34,7 +38,7 @@ from ..schemas import (
build_plm_manual_schema,
build_plm_schema,
)
from ..utils import async_get_usb_ports
from ..utils import async_device_name, async_get_usb_ports
HUB_V1_SCHEMA = build_hub_schema(hub_version=1)
HUB_V2_SCHEMA = build_hub_schema(hub_version=2)
@ -134,6 +138,30 @@ def remove_device_override(hass: HomeAssistant, address: Address):
hass.config_entries.async_update_entry(entry=config_entry, options=new_options)
async def async_link_to_dict(
address: Address, record: ALDBRecord, dev_registry: dr.DeviceRegistry, status=None
) -> dict[str, str | int]:
"""Convert a link to a dictionary."""
link_dict: dict[str, str | int] = {}
device_name = await async_device_name(dev_registry, address)
target_name = await async_device_name(dev_registry, record.target)
link_dict["address"] = str(address)
link_dict["device_name"] = device_name if device_name else str(address)
link_dict["mem_addr"] = record.mem_addr
link_dict["in_use"] = record.is_in_use
link_dict["group"] = record.group
link_dict["is_controller"] = record.is_controller
link_dict["highwater"] = record.is_high_water_mark
link_dict["target"] = str(record.target)
link_dict["target_name"] = target_name if target_name else str(record.target)
link_dict["data1"] = record.data1
link_dict["data2"] = record.data2
link_dict["data3"] = record.data3
if status:
link_dict["status"] = status.name.lower()
return link_dict
async def _async_connect(**kwargs):
"""Connect to the Insteon modem."""
if devices.modem:
@ -270,3 +298,44 @@ async def websocket_remove_device_override(
remove_device_override(hass, address)
async_dispatcher_send(hass, SIGNAL_REMOVE_DEVICE_OVERRIDE, address)
connection.send_result(msg[ID])
@websocket_api.websocket_command(
{vol.Required(TYPE): "insteon/config/get_broken_links"}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_get_broken_links(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Get any broken links between devices."""
broken_links = get_broken_links(devices=devices)
dev_registry = dr.async_get(hass)
broken_links_list = [
await async_link_to_dict(address, record, dev_registry, status)
for address, record, status in broken_links
if status != LinkStatus.MISSING_TARGET
]
connection.send_result(msg[ID], broken_links_list)
@websocket_api.websocket_command(
{vol.Required(TYPE): "insteon/config/get_unknown_devices"}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_get_unknown_devices(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Get any broken links between devices."""
broken_links = get_broken_links(devices=devices)
unknown_devices = {
str(record.target)
for _, record, status in broken_links
if status == LinkStatus.MISSING_TARGET
}
connection.send_result(msg[ID], unknown_devices)

View file

@ -26,6 +26,7 @@ from ..const import (
TYPE,
)
from ..schemas import build_x10_schema
from ..utils import compute_device_name
from .config import add_x10_device, remove_device_override, remove_x10_device
X10_DEVICE = "x10_device"
@ -33,11 +34,6 @@ X10_DEVICE_SCHEMA = build_x10_schema()
REMOVE_ALL_REFS = "remove_all_refs"
def compute_device_name(ha_device):
"""Return the HA device name."""
return ha_device.name_by_user if ha_device.name_by_user else ha_device.name
async def async_add_devices(address, multiple):
"""Add one or more Insteon devices."""
async for _ in devices.async_add_device(address=address, multiple=multiple):
@ -52,20 +48,10 @@ def get_insteon_device_from_ha_device(ha_device):
return None
async def async_device_name(dev_registry, address):
"""Get the Insteon device name from a device registry id."""
ha_device = dev_registry.async_get_device(identifiers={(DOMAIN, str(address))})
if not ha_device:
if device := devices[address]:
return f"{device.description} ({device.model})"
return ""
return compute_device_name(ha_device)
def notify_device_not_found(connection, msg, text):
"""Notify the caller that the device was not found."""
connection.send_message(
websocket_api.error_message(msg[ID], websocket_api.ERR_NOT_FOUND, text)
websocket_api.error_message(msg[ID], websocket_api.const.ERR_NOT_FOUND, text)
)

View file

@ -471,3 +471,18 @@ def get_usb_ports() -> dict[str, str]:
async def async_get_usb_ports(hass: HomeAssistant) -> dict[str, str]:
"""Return a dict of USB ports and their friendly names."""
return await hass.async_add_executor_job(get_usb_ports)
def compute_device_name(ha_device) -> str:
"""Return the HA device name."""
return ha_device.name_by_user if ha_device.name_by_user else ha_device.name
async def async_device_name(dev_registry: dr.DeviceRegistry, address: Address) -> str:
"""Get the Insteon device name from a device registry id."""
ha_device = dev_registry.async_get_device(identifiers={(DOMAIN, str(address))})
if not ha_device:
if device := devices[address]:
return f"{device.description} ({device.model})"
return ""
return compute_device_name(ha_device)

View file

@ -168,6 +168,14 @@ class MockDevices:
yield address
await asyncio.sleep(0.01)
def values(self):
"""Return the devices."""
return self._devices.values()
def items(self):
"""Return the address, device pair."""
return self._devices.items()
def subscribe(self, listener, force_strong_ref=False):
"""Mock the subscribe function."""
subscribe_topic(listener, DEVICE_LIST_CHANGED)

View file

@ -1,5 +1,6 @@
"""Test the Insteon All-Link Database APIs."""
import asyncio
import json
from typing import Any
from unittest.mock import patch
@ -332,3 +333,38 @@ async def test_bad_address(
msg = await ws_client.receive_json()
assert not msg["success"]
assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND
async def test_notify_on_aldb_loading(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator, aldb_data
) -> None:
"""Test tracking changes to ALDB status across all devices."""
ws_client, devices = await _setup(hass, hass_ws_client, aldb_data)
with patch.object(insteon.api.aldb, "devices", devices):
await ws_client.send_json_auto_id({TYPE: "insteon/aldb/notify_all"})
msg = await ws_client.receive_json()
assert msg["success"]
await asyncio.sleep(0.1)
msg = await ws_client.receive_json()
assert msg["event"]["type"] == "status"
assert not msg["event"]["is_loading"]
device = devices["333333"]
device.aldb._update_status(ALDBStatus.LOADING)
await asyncio.sleep(0.1)
msg = await ws_client.receive_json()
assert msg["event"]["type"] == "status"
assert msg["event"]["is_loading"]
device.aldb._update_status(ALDBStatus.LOADED)
await asyncio.sleep(0.1)
msg = await ws_client.receive_json()
assert msg["event"]["type"] == "status"
assert not msg["event"]["is_loading"]
await ws_client.client.session.close()
# Allow lingering tasks to complete
await asyncio.sleep(0.1)

View file

@ -1,7 +1,10 @@
"""Test the Insteon APIs for configuring the integration."""
import asyncio
import json
from unittest.mock import patch
from homeassistant.components import insteon
from homeassistant.components.insteon.api.device import ID, TYPE
from homeassistant.components.insteon.const import (
CONF_HUB_VERSION,
@ -18,8 +21,10 @@ from .const import (
MOCK_USER_INPUT_PLM,
)
from .mock_connection import mock_failed_connection, mock_successful_connection
from .mock_devices import MockDevices
from .mock_setup import async_mock_setup
from tests.common import load_fixture
from tests.typing import WebSocketGenerator
@ -389,3 +394,54 @@ async def test_remove_device_override_no_overrides(
config_entry = hass.config_entries.async_get_entry("abcde12345")
assert not config_entry.options.get(CONF_OVERRIDE)
async def test_get_broken_links(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test getting broken ALDB links."""
ws_client, _, _, _ = await async_mock_setup(hass, hass_ws_client)
devices = MockDevices()
await devices.async_load()
aldb_data = json.loads(load_fixture("insteon/aldb_data.json"))
devices.fill_aldb("33.33.33", aldb_data)
with patch.object(insteon.api.config, "devices", devices):
await ws_client.send_json({ID: 2, TYPE: "insteon/config/get_broken_links"})
msg = await ws_client.receive_json()
assert msg["success"]
assert len(msg["result"]) == 5
async def test_get_unknown_devices(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test getting unknown Insteon devices."""
ws_client, _, _, _ = await async_mock_setup(hass, hass_ws_client)
devices = MockDevices()
await devices.async_load()
aldb_data = {
"4095": {
"memory": 4095,
"in_use": True,
"controller": False,
"high_water_mark": False,
"bit5": True,
"bit4": False,
"group": 0,
"target": "FFFFFF",
"data1": 0,
"data2": 0,
"data3": 0,
},
}
devices.fill_aldb("33.33.33", aldb_data)
with patch.object(insteon.api.config, "devices", devices):
await ws_client.send_json({ID: 2, TYPE: "insteon/config/get_unknown_devices"})
msg = await ws_client.receive_json()
assert msg["success"]
assert len(msg["result"]) == 1
await asyncio.sleep(0.1)

View file

@ -16,7 +16,6 @@ from homeassistant.components.insteon.api.device import (
ID,
INSTEON_DEVICE_NOT_FOUND,
TYPE,
async_device_name,
)
from homeassistant.components.insteon.const import (
CONF_OVERRIDE,
@ -24,6 +23,7 @@ from homeassistant.components.insteon.const import (
DOMAIN,
MULTIPLE,
)
from homeassistant.components.insteon.utils import async_device_name
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
@ -129,10 +129,6 @@ async def test_get_ha_device_name(
name = await async_device_name(device_reg, "11.11.11")
assert name == "Device 11.11.11"
# Test no HA device but a real Insteon device
name = await async_device_name(device_reg, "22.22.22")
assert name == "Device 22.22.22 (2)"
# Test no HA or Insteon device
name = await async_device_name(device_reg, "BB.BB.BB")
assert name == ""