2023-02-22 16:24:21 +00:00
"""Diagnostics support for Thread networks.
When triaging Matter and HomeKit issues you often need to check for problems with the Thread network.
This report helps spot and rule out:
* Is the users border router visible at all?
* Is the border router actually announcing any routes? The user could have a network boundary like
VLANs or WiFi isolation that is blocking the RA packets.
* Alternatively, if user isn't on HAOS they could have accept_ra_rt_info_max_plen set incorrectly.
* Are there any bogus routes that could be interfering. If routes don't expire they can build up.
When you have 10 routes and only 2 border routers something has gone wrong.
This does not do any connectivity checks. So user could have all their border routers visible, but
some of their thread accessories can't be pinged, but it's still a thread problem.
from __future__ import annotations
2023-03-07 15:21:26 -10:00
from typing import TYPE_CHECKING, Any, TypedDict
2023-02-22 16:24:21 +00:00
from python_otbr_api.tlv_parser import MeshcopTLVType
from homeassistant.components import zeroconf
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .dataset_store import async_get_store
from .discovery import async_read_zeroconf_cache
2023-03-07 15:21:26 -10:00
from pyroute2 import NDB # pylint: disable=no-name-in-module
2023-02-22 16:24:21 +00:00
class Neighbour(TypedDict):
"""A neighbour cache entry (ip neigh)."""
lladdr: str
state: int
probes: int
class Route(TypedDict):
"""A route table entry (ip -6 route)."""
metrics: int
priority: int
is_nexthop: bool
class Router(TypedDict):
"""A border router."""
server: str | None
addresses: list[str]
neighbours: dict[str, Neighbour]
thread_version: str | None
model: str | None
vendor: str | None
routes: dict[str, Route]
class Network(TypedDict):
"""A thread network."""
name: str | None
routers: dict[str, Router]
prefixes: set[str]
unexpected_routers: set[str]
2023-03-07 15:21:26 -10:00
def _get_possible_thread_routes(
ndb: NDB,
) -> tuple[dict[str, dict[str, Route]], dict[str, set[str]]]:
2023-02-22 16:24:21 +00:00
# Build a list of possible thread routes
# Right now, this is ipv6 /64's that have a gateway
# We cross reference with zerconf data to confirm which via's are known border routers
routes: dict[str, dict[str, Route]] = {}
reverse_routes: dict[str, set[str]] = {}
2023-03-07 15:21:26 -10:00
for record in ndb.routes:
# Limit to IPV6 routes
if record.family != 10:
# Limit to /64 prefixes
if record.dst_len != 64:
# Limit to routes with a via
if not record.gateway and not record.nh_gateway:
gateway = record.gateway or record.nh_gateway
route = routes.setdefault(gateway, {})
route[record.dst] = {
"metrics": record.metrics,
"priority": record.priority,
# NM creates "nexthop" routes - a single route with many via's
# Kernel creates many routes with a single via
"is_nexthop": record.nh_gateway is not None,
reverse_routes.setdefault(record.dst, set()).add(gateway)
2023-02-22 16:24:21 +00:00
return routes, reverse_routes
2023-03-07 15:21:26 -10:00
def _get_neighbours(ndb: NDB) -> dict[str, Neighbour]:
# Build a list of neighbours
neighbours: dict[str, Neighbour] = {
record.dst: {
"lladdr": record.lladdr,
"state": record.state,
"probes": record.probes,
for record in ndb.neighbours
return neighbours
2023-02-22 16:24:21 +00:00
2023-03-07 15:21:26 -10:00
def _get_routes_and_neighbors():
"""Get the routes and neighbours from pyroute2."""
# Import in the executor since import NDB can take a while
from pyroute2 import ( # pylint: disable=no-name-in-module, import-outside-toplevel
with NDB() as ndb: # pylint: disable=not-callable
routes, reverse_routes = _get_possible_thread_routes(ndb)
neighbours = _get_neighbours(ndb)
return routes, reverse_routes, neighbours
2023-02-22 16:24:21 +00:00
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: ConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for all known thread networks."""
networks: dict[str, Network] = {}
# Start with all networks that HA knows about
store = await async_get_store(hass)
for record in store.datasets.values():
if not record.extended_pan_id:
network = networks.setdefault(
"name": record.network_name,
"routers": {},
"prefixes": set(),
"unexpected_routers": set(),
if mlp := record.dataset.get(MeshcopTLVType.MESHLOCALPREFIX):
# Find all routes currently act that might be thread related, so we can match them to
# border routers as we process the zeroconf data.
2023-03-07 15:21:26 -10:00
# Also find all neighbours
routes, reverse_routes, neighbours = await hass.async_add_executor_job(
2023-02-22 16:24:21 +00:00
aiozc = await zeroconf.async_get_async_instance(hass)
for data in async_read_zeroconf_cache(aiozc):
if not data.extended_pan_id:
network = networks.setdefault(
"name": data.network_name,
"routers": {},
"prefixes": set(),
"unexpected_routers": set(),
if not data.server:
router = network["routers"][data.server] = {
"server": data.server,
"addresses": data.addresses or [],
"neighbours": {},
"thread_version": data.thread_version,
"model": data.model_name,
"vendor": data.vendor_name,
"routes": {},
# For every address this border router hass, see if we have seen
# it in the route table as a via - these are the routes its
# announcing via RA
if data.addresses:
for address in data.addresses:
if address in routes:
if address in neighbours:
router["neighbours"][address] = neighbours[address]
# Find unexpected via's.
# Collect all router addresses and then for each prefix, find via's that aren't
# a known router for that prefix.
for network in networks.values():
routers = set()
for router in network["routers"].values():
for prefix in network["prefixes"]:
if prefix not in reverse_routes:
if ghosts := reverse_routes[prefix] - routers:
network["unexpected_routers"] = ghosts
return {
"networks": networks,