Raise repair issue if OTBR and ZHA are on different channels (#90494)

* Raise repair issue if OTBR and ZHA are on different channels

* Update issues after creating or setting dataset

* Explain impact

* Add link to documentation, adjust language

* Update homeassistant/components/otbr/strings.json

---------

Co-authored-by: Paulus Schoutsen <balloob@gmail.com>
This commit is contained in:
Erik Montnemery 2023-04-03 19:02:10 +02:00 committed by GitHub
parent 73d0124c98
commit c3091fad4c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 302 additions and 125 deletions

View file

@ -2,91 +2,20 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
import dataclasses
from functools import wraps
from typing import Any, Concatenate, ParamSpec, TypeVar
import aiohttp
import python_otbr_api
from python_otbr_api import tlv_parser
from python_otbr_api.pskc import compute_pskc
from homeassistant.components.thread import async_add_dataset
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady, HomeAssistantError
from homeassistant.helpers import issue_registry as ir
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.typing import ConfigType
from . import websocket_api
from .const import DOMAIN
_R = TypeVar("_R")
_P = ParamSpec("_P")
INSECURE_NETWORK_KEYS = (
# Thread web UI default
bytes.fromhex("00112233445566778899AABBCCDDEEFF"),
)
INSECURE_PASSPHRASES = (
# Thread web UI default
"j01Nme",
# Thread documentation default
"J01NME",
)
def _handle_otbr_error(
func: Callable[Concatenate[OTBRData, _P], Coroutine[Any, Any, _R]]
) -> Callable[Concatenate[OTBRData, _P], Coroutine[Any, Any, _R]]:
"""Handle OTBR errors."""
@wraps(func)
async def _func(self: OTBRData, *args: _P.args, **kwargs: _P.kwargs) -> _R:
try:
return await func(self, *args, **kwargs)
except python_otbr_api.OTBRError as exc:
raise HomeAssistantError("Failed to call OTBR API") from exc
return _func
@dataclasses.dataclass
class OTBRData:
"""Container for OTBR data."""
url: str
api: python_otbr_api.OTBR
@_handle_otbr_error
async def set_enabled(self, enabled: bool) -> None:
"""Enable or disable the router."""
return await self.api.set_enabled(enabled)
@_handle_otbr_error
async def get_active_dataset_tlvs(self) -> bytes | None:
"""Get current active operational dataset in TLVS format, or None."""
return await self.api.get_active_dataset_tlvs()
@_handle_otbr_error
async def create_active_dataset(
self, dataset: python_otbr_api.OperationalDataSet
) -> None:
"""Create an active operational dataset."""
return await self.api.create_active_dataset(dataset)
@_handle_otbr_error
async def set_active_dataset_tlvs(self, dataset: bytes) -> None:
"""Set current active operational dataset in TLVS format."""
await self.api.set_active_dataset_tlvs(dataset)
@_handle_otbr_error
async def get_extended_address(self) -> bytes:
"""Get extended address (EUI-64)."""
return await self.api.get_extended_address()
from .util import OTBRData, update_issues
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
@ -95,54 +24,11 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return True
def _warn_on_default_network_settings(
hass: HomeAssistant, entry: ConfigEntry, dataset_tlvs: bytes
) -> None:
"""Warn user if insecure default network settings are used."""
dataset = tlv_parser.parse_tlv(dataset_tlvs.hex())
insecure = False
if (
network_key := dataset.get(tlv_parser.MeshcopTLVType.NETWORKKEY)
) is not None and bytes.fromhex(network_key) in INSECURE_NETWORK_KEYS:
insecure = True
if (
not insecure
and tlv_parser.MeshcopTLVType.EXTPANID in dataset
and tlv_parser.MeshcopTLVType.NETWORKNAME in dataset
and tlv_parser.MeshcopTLVType.PSKC in dataset
):
ext_pan_id = dataset[tlv_parser.MeshcopTLVType.EXTPANID]
network_name = dataset[tlv_parser.MeshcopTLVType.NETWORKNAME]
pskc = bytes.fromhex(dataset[tlv_parser.MeshcopTLVType.PSKC])
for passphrase in INSECURE_PASSPHRASES:
if pskc == compute_pskc(ext_pan_id, network_name, passphrase):
insecure = True
break
if insecure:
ir.async_create_issue(
hass,
DOMAIN,
f"insecure_thread_network_{entry.entry_id}",
is_fixable=False,
is_persistent=False,
severity=ir.IssueSeverity.WARNING,
translation_key="insecure_thread_network",
)
else:
ir.async_delete_issue(
hass,
DOMAIN,
f"insecure_thread_network_{entry.entry_id}",
)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up an Open Thread Border Router config entry."""
api = python_otbr_api.OTBR(entry.data["url"], async_get_clientsession(hass), 10)
otbrdata = OTBRData(entry.data["url"], api)
otbrdata = OTBRData(entry.data["url"], api, entry.entry_id)
try:
dataset_tlvs = await otbrdata.get_active_dataset_tlvs()
except (
@ -152,7 +38,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
) as err:
raise ConfigEntryNotReady("Unable to connect") from err
if dataset_tlvs:
_warn_on_default_network_settings(hass, entry, dataset_tlvs)
await update_issues(hass, otbrdata, dataset_tlvs)
await async_add_dataset(hass, DOMAIN, dataset_tlvs.hex())
entry.async_on_unload(entry.add_update_listener(async_reload_entry))

View file

@ -1,7 +1,7 @@
{
"domain": "otbr",
"name": "Open Thread Border Router",
"after_dependencies": ["hassio", "zha"],
"after_dependencies": ["hassio", "homeassistant_yellow", "zha"],
"codeowners": ["@home-assistant/core"],
"config_flow": true,
"dependencies": ["homeassistant_hardware", "thread"],

View file

@ -19,6 +19,10 @@
"insecure_thread_network": {
"title": "Insecure Thread network settings detected",
"description": "Your Thread network is using a default network key or pass phrase.\n\nThis is a security risk, please create a new Thread network."
},
"otbr_zha_channel_collision": {
"title": "OTBR and ZHA share the same radio but use different channels",
"description": "When OTBR and ZHA share the radio, they must use the same network channel.\n\nIf OTBR and ZHA attempt to connect to networks on different channels, neither Thread/Matter nor Zigbee will work.\n\nOTBR is configured with a Thread network on channel {otbr_channel}, ZHA is configured with a Zigbee network on channel {zha_channel}."
}
}
}

View file

@ -1,13 +1,100 @@
"""Utility functions for the Open Thread Border Router integration."""
from __future__ import annotations
from collections.abc import Callable, Coroutine
import contextlib
import dataclasses
from functools import wraps
from typing import Any, Concatenate, ParamSpec, TypeVar
import python_otbr_api
from python_otbr_api import tlv_parser
from python_otbr_api.pskc import compute_pskc
from homeassistant.components.homeassistant_hardware.silabs_multiprotocol_addon import (
is_multiprotocol_url,
multi_pan_addon_using_device,
)
from homeassistant.components.homeassistant_yellow import RADIO_DEVICE as YELLOW_RADIO
from homeassistant.components.zha import api as zha_api
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import issue_registry as ir
from .const import DOMAIN
_R = TypeVar("_R")
_P = ParamSpec("_P")
INFO_URL_SKY_CONNECT = (
"https://skyconnect.home-assistant.io/procedures/enable-multiprotocol/#limitations"
)
INFO_URL_YELLOW = (
"https://yellow.home-assistant.io/guides/enable-multiprotocol/#limitations"
)
INSECURE_NETWORK_KEYS = (
# Thread web UI default
bytes.fromhex("00112233445566778899AABBCCDDEEFF"),
)
INSECURE_PASSPHRASES = (
# Thread web UI default
"j01Nme",
# Thread documentation default
"J01NME",
)
def _handle_otbr_error(
func: Callable[Concatenate[OTBRData, _P], Coroutine[Any, Any, _R]]
) -> Callable[Concatenate[OTBRData, _P], Coroutine[Any, Any, _R]]:
"""Handle OTBR errors."""
@wraps(func)
async def _func(self: OTBRData, *args: _P.args, **kwargs: _P.kwargs) -> _R:
try:
return await func(self, *args, **kwargs)
except python_otbr_api.OTBRError as exc:
raise HomeAssistantError("Failed to call OTBR API") from exc
return _func
@dataclasses.dataclass
class OTBRData:
"""Container for OTBR data."""
url: str
api: python_otbr_api.OTBR
entry_id: str
@_handle_otbr_error
async def set_enabled(self, enabled: bool) -> None:
"""Enable or disable the router."""
return await self.api.set_enabled(enabled)
@_handle_otbr_error
async def get_active_dataset_tlvs(self) -> bytes | None:
"""Get current active operational dataset in TLVS format, or None."""
return await self.api.get_active_dataset_tlvs()
@_handle_otbr_error
async def create_active_dataset(
self, dataset: python_otbr_api.OperationalDataSet
) -> None:
"""Create an active operational dataset."""
return await self.api.create_active_dataset(dataset)
@_handle_otbr_error
async def set_active_dataset_tlvs(self, dataset: bytes) -> None:
"""Set current active operational dataset in TLVS format."""
await self.api.set_active_dataset_tlvs(dataset)
@_handle_otbr_error
async def get_extended_address(self) -> bytes:
"""Get extended address (EUI-64)."""
return await self.api.get_extended_address()
def _get_zha_url(hass: HomeAssistant) -> str | None:
@ -41,3 +128,104 @@ async def get_allowed_channel(hass: HomeAssistant, otbr_url: str) -> int | None:
return None
return await _get_zha_channel(hass)
async def _warn_on_channel_collision(
hass: HomeAssistant, otbrdata: OTBRData, dataset_tlvs: bytes
) -> None:
"""Warn user if OTBR and ZHA attempt to use different channels."""
def delete_issue() -> None:
ir.async_delete_issue(
hass,
DOMAIN,
f"otbr_zha_channel_collision_{otbrdata.entry_id}",
)
if (allowed_channel := await get_allowed_channel(hass, otbrdata.url)) is None:
delete_issue()
return
dataset = tlv_parser.parse_tlv(dataset_tlvs.hex())
if (channel_s := dataset.get(tlv_parser.MeshcopTLVType.CHANNEL)) is None:
delete_issue()
return
try:
channel = int(channel_s, 16)
except ValueError:
delete_issue()
return
if channel == allowed_channel:
delete_issue()
return
yellow = await multi_pan_addon_using_device(hass, YELLOW_RADIO)
learn_more_url = INFO_URL_YELLOW if yellow else INFO_URL_SKY_CONNECT
ir.async_create_issue(
hass,
DOMAIN,
f"otbr_zha_channel_collision_{otbrdata.entry_id}",
is_fixable=False,
is_persistent=False,
learn_more_url=learn_more_url,
severity=ir.IssueSeverity.WARNING,
translation_key="otbr_zha_channel_collision",
translation_placeholders={
"otbr_channel": str(channel),
"zha_channel": str(allowed_channel),
},
)
def _warn_on_default_network_settings(
hass: HomeAssistant, otbrdata: OTBRData, dataset_tlvs: bytes
) -> None:
"""Warn user if insecure default network settings are used."""
dataset = tlv_parser.parse_tlv(dataset_tlvs.hex())
insecure = False
if (
network_key := dataset.get(tlv_parser.MeshcopTLVType.NETWORKKEY)
) is not None and bytes.fromhex(network_key) in INSECURE_NETWORK_KEYS:
insecure = True
if (
not insecure
and tlv_parser.MeshcopTLVType.EXTPANID in dataset
and tlv_parser.MeshcopTLVType.NETWORKNAME in dataset
and tlv_parser.MeshcopTLVType.PSKC in dataset
):
ext_pan_id = dataset[tlv_parser.MeshcopTLVType.EXTPANID]
network_name = dataset[tlv_parser.MeshcopTLVType.NETWORKNAME]
pskc = bytes.fromhex(dataset[tlv_parser.MeshcopTLVType.PSKC])
for passphrase in INSECURE_PASSPHRASES:
if pskc == compute_pskc(ext_pan_id, network_name, passphrase):
insecure = True
break
if insecure:
ir.async_create_issue(
hass,
DOMAIN,
f"insecure_thread_network_{otbrdata.entry_id}",
is_fixable=False,
is_persistent=False,
severity=ir.IssueSeverity.WARNING,
translation_key="insecure_thread_network",
)
else:
ir.async_delete_issue(
hass,
DOMAIN,
f"insecure_thread_network_{otbrdata.entry_id}",
)
async def update_issues(
hass: HomeAssistant, otbrdata: OTBRData, dataset_tlvs: bytes
) -> None:
"""Raise or clear repair issues related to network settings."""
await _warn_on_channel_collision(hass, otbrdata, dataset_tlvs)
_warn_on_default_network_settings(hass, otbrdata, dataset_tlvs)

View file

@ -1,5 +1,4 @@
"""Websocket API for OTBR."""
from typing import TYPE_CHECKING
import python_otbr_api
from python_otbr_api import tlv_parser
@ -11,10 +10,7 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from .const import DEFAULT_CHANNEL, DOMAIN
from .util import get_allowed_channel
if TYPE_CHECKING:
from . import OTBRData
from .util import OTBRData, get_allowed_channel, update_issues
@callback
@ -109,6 +105,9 @@ async def websocket_create_network(
await async_add_dataset(hass, DOMAIN, dataset_tlvs.hex())
# Update repair issues
await update_issues(hass, data, dataset_tlvs)
connection.send_result(msg["id"])
@ -167,6 +166,9 @@ async def websocket_set_network(
connection.send_error(msg["id"], "set_enabled_failed", str(exc))
return
# Update repair issues
await update_issues(hass, data, bytes.fromhex(dataset_tlv))
connection.send_result(msg["id"])

View file

@ -23,7 +23,7 @@ async def otbr_config_entry_fixture(hass):
with patch(
"python_otbr_api.OTBR.get_active_dataset_tlvs", return_value=DATASET_CH16
), patch(
"homeassistant.components.otbr.compute_pskc"
"homeassistant.components.otbr.util.compute_pskc"
): # Patch to speed up tests
assert await hass.config_entries.async_setup(config_entry.entry_id)

View file

@ -1,7 +1,7 @@
"""Test the Open Thread Border Router integration."""
import asyncio
from http import HTTPStatus
from unittest.mock import ANY, AsyncMock, MagicMock, patch
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, patch
import aiohttp
import pytest
@ -15,6 +15,7 @@ from homeassistant.helpers import issue_registry as ir
from . import (
BASE_URL,
CONFIG_ENTRY_DATA,
DATASET_CH15,
DATASET_CH16,
DATASET_INSECURE_NW_KEY,
DATASET_INSECURE_PASSPHRASE,
@ -23,6 +24,18 @@ from . import (
from tests.common import MockConfigEntry
from tests.test_util.aiohttp import AiohttpClientMocker
DATASET_BAD_CHANNEL = bytes.fromhex(
"0E080000000000010000000035060004001FFFE00208F642646DA209B1C00708FDF57B5A"
"0FE2AAF60510DE98B5BA1A528FEE049D4B4B01835375030D4F70656E5468726561642048410102"
"25A40410F5DD18371BFD29E1A601EF6FFAD94C030C0402A0F7F8"
)
DATASET_NO_CHANNEL = bytes.fromhex(
"0E08000000000001000035060004001FFFE00208F642646DA209B1C00708FDF57B5A"
"0FE2AAF60510DE98B5BA1A528FEE049D4B4B01835375030D4F70656E5468726561642048410102"
"25A40410F5DD18371BFD29E1A601EF6FFAD94C030C0402A0F7F8"
)
async def test_import_dataset(hass: HomeAssistant) -> None:
"""Test the active dataset is imported at setup."""
@ -46,6 +59,90 @@ async def test_import_dataset(hass: HomeAssistant) -> None:
assert not issue_registry.async_get_issue(
domain=otbr.DOMAIN, issue_id=f"insecure_thread_network_{config_entry.entry_id}"
)
assert not issue_registry.async_get_issue(
domain=otbr.DOMAIN,
issue_id=f"otbr_zha_channel_collision_{config_entry.entry_id}",
)
async def test_import_share_radio_channel_collision(hass: HomeAssistant) -> None:
"""Test the active dataset is imported at setup.
This imports a dataset with different channel than ZHA when ZHA and OTBR share
the radio.
"""
issue_registry = ir.async_get(hass)
networksettings = Mock()
networksettings.network_info.channel = 15
config_entry = MockConfigEntry(
data=CONFIG_ENTRY_DATA,
domain=otbr.DOMAIN,
options={},
title="My OTBR",
)
config_entry.add_to_hass(hass)
with patch(
"python_otbr_api.OTBR.get_active_dataset_tlvs", return_value=DATASET_CH16
), patch(
"homeassistant.components.thread.dataset_store.DatasetStore.async_add"
) as mock_add, patch(
"homeassistant.components.otbr.util.zha_api.async_get_radio_path",
return_value="socket://core-silabs-multiprotocol:9999",
), patch(
"homeassistant.components.otbr.util.zha_api.async_get_network_settings",
return_value=networksettings,
):
assert await hass.config_entries.async_setup(config_entry.entry_id)
mock_add.assert_called_once_with(otbr.DOMAIN, DATASET_CH16.hex())
assert issue_registry.async_get_issue(
domain=otbr.DOMAIN,
issue_id=f"otbr_zha_channel_collision_{config_entry.entry_id}",
)
@pytest.mark.parametrize(
"dataset", [DATASET_BAD_CHANNEL, DATASET_CH15, DATASET_NO_CHANNEL]
)
async def test_import_share_radio_no_channel_collision(
hass: HomeAssistant, dataset: bytes
) -> None:
"""Test the active dataset is imported at setup.
This imports a dataset when ZHA and OTBR share the radio.
"""
issue_registry = ir.async_get(hass)
networksettings = Mock()
networksettings.network_info.channel = 15
config_entry = MockConfigEntry(
data=CONFIG_ENTRY_DATA,
domain=otbr.DOMAIN,
options={},
title="My OTBR",
)
config_entry.add_to_hass(hass)
with patch(
"python_otbr_api.OTBR.get_active_dataset_tlvs", return_value=dataset
), patch(
"homeassistant.components.thread.dataset_store.DatasetStore.async_add"
) as mock_add, patch(
"homeassistant.components.otbr.util.zha_api.async_get_radio_path",
return_value="socket://core-silabs-multiprotocol:9999",
), patch(
"homeassistant.components.otbr.util.zha_api.async_get_network_settings",
return_value=networksettings,
):
assert await hass.config_entries.async_setup(config_entry.entry_id)
mock_add.assert_called_once_with(otbr.DOMAIN, dataset.hex())
assert not issue_registry.async_get_issue(
domain=otbr.DOMAIN,
issue_id=f"otbr_zha_channel_collision_{config_entry.entry_id}",
)
@pytest.mark.parametrize(