Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Erik
5dfb9f3f8d Make usb.async_is_plugged_in reflect current state 2023-01-08 13:31:59 +01:00
Erik
9c39c51a96 Adjust tests 2023-01-05 17:45:25 +01:00
Erik
de84abf58c Disable sky connect functionality if USB stick is not plugged in 2023-01-05 16:31:40 +01:00
12 changed files with 106 additions and 55 deletions

View file

@ -3,7 +3,7 @@ from __future__ import annotations
from homeassistant.components.hardware.models import BoardInfo, HardwareInfo
from homeassistant.components.hassio import get_os_info
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from .const import DOMAIN
@ -16,8 +16,7 @@ BOARD_NAMES = {
}
@callback
def async_info(hass: HomeAssistant) -> list[HardwareInfo]:
async def async_info(hass: HomeAssistant) -> list[HardwareInfo]:
"""Return board info."""
if (os_info := get_os_info(hass)) is None:
raise HomeAssistantError

View file

@ -4,7 +4,7 @@ from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import HomeAssistant
@dataclass
@ -42,6 +42,5 @@ class HardwareInfo:
class HardwareProtocol(Protocol):
"""Define the format of hardware platforms."""
@callback
def async_info(self, hass: HomeAssistant) -> list[HardwareInfo]:
async def async_info(self, hass: HomeAssistant) -> list[HardwareInfo]:
"""Return info."""

View file

@ -61,7 +61,9 @@ async def ws_info(
for platform in hardware_platform.values():
if hasattr(platform, "async_info"):
with contextlib.suppress(HomeAssistantError):
hardware_info.extend([asdict(hw) for hw in platform.async_info(hass)])
hardware_info.extend(
[asdict(hw) for hw in await platform.async_info(hass)]
)
connection.send_result(msg["id"], {"hardware": hardware_info})

View file

@ -19,8 +19,7 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from .const import DOMAIN
from .util import get_usb_service_info
from .util import async_is_plugged_in, get_usb_service_info
_LOGGER = logging.getLogger(__name__)
@ -64,18 +63,9 @@ async def _multi_pan_addon_info(
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up a Home Assistant Sky Connect config entry."""
matcher = usb.USBCallbackMatcher(
domain=DOMAIN,
vid=entry.data["vid"].upper(),
pid=entry.data["pid"].upper(),
serial_number=entry.data["serial_number"].lower(),
manufacturer=entry.data["manufacturer"].lower(),
description=entry.data["description"].lower(),
)
if not usb.async_is_plugged_in(hass, matcher):
# The USB dongle is not plugged in
raise ConfigEntryNotReady
if not await async_is_plugged_in(hass, entry):
# The USB dongle is not plugged in, don't setup ZHA
return True
addon_info = await _multi_pan_addon_info(hass, entry)

View file

@ -2,15 +2,15 @@
from __future__ import annotations
from homeassistant.components.hardware.models import HardwareInfo, USBInfo
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import HomeAssistant
from .const import DOMAIN
from .util import async_is_plugged_in
DONGLE_NAME = "Home Assistant Sky Connect"
@callback
def async_info(hass: HomeAssistant) -> list[HardwareInfo]:
async def async_info(hass: HomeAssistant) -> list[HardwareInfo]:
"""Return board info."""
entries = hass.config_entries.async_entries(DOMAIN)
@ -29,4 +29,5 @@ def async_info(hass: HomeAssistant) -> list[HardwareInfo]:
url=None,
)
for entry in entries
if await async_is_plugged_in(hass, entry)
]

View file

@ -3,6 +3,9 @@ from __future__ import annotations
from homeassistant.components import usb
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .const import DOMAIN
def get_usb_service_info(config_entry: ConfigEntry) -> usb.UsbServiceInfo:
@ -15,3 +18,17 @@ def get_usb_service_info(config_entry: ConfigEntry) -> usb.UsbServiceInfo:
manufacturer=config_entry.data["manufacturer"],
description=config_entry.data["description"],
)
async def async_is_plugged_in(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Return if the device is plugged in."""
matcher = usb.USBCallbackMatcher(
domain=DOMAIN,
vid=config_entry.data["vid"].upper(),
pid=config_entry.data["pid"].upper(),
serial_number=config_entry.data["serial_number"].lower(),
manufacturer=config_entry.data["manufacturer"].lower(),
description=config_entry.data["description"].lower(),
)
return await usb.async_is_plugged_in(hass, matcher)

View file

@ -3,7 +3,7 @@ from __future__ import annotations
from homeassistant.components.hardware.models import BoardInfo, HardwareInfo
from homeassistant.components.hassio import get_os_info
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from .const import DOMAIN
@ -13,8 +13,7 @@ MANUFACTURER = "homeassistant"
MODEL = "yellow"
@callback
def async_info(hass: HomeAssistant) -> list[HardwareInfo]:
async def async_info(hass: HomeAssistant) -> list[HardwareInfo]:
"""Return board info."""
if (os_info := get_os_info(hass)) is None:
raise HomeAssistantError

View file

@ -3,7 +3,7 @@ from __future__ import annotations
from homeassistant.components.hardware.models import BoardInfo, HardwareInfo
from homeassistant.components.hassio import get_os_info
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from .const import DOMAIN
@ -31,8 +31,7 @@ MODELS = {
}
@callback
def async_info(hass: HomeAssistant) -> list[HardwareInfo]:
async def async_info(hass: HomeAssistant) -> list[HardwareInfo]:
"""Return board info."""
if (os_info := get_os_info(hass)) is None:
raise HomeAssistantError

View file

@ -61,8 +61,7 @@ def async_register_scan_request_callback(
return discovery.async_register_scan_request_callback(callback)
@hass_callback
def async_is_plugged_in(hass: HomeAssistant, matcher: USBCallbackMatcher) -> bool:
async def async_is_plugged_in(hass: HomeAssistant, matcher: USBCallbackMatcher) -> bool:
"""Return True is a USB device is present."""
vid = matcher.get("vid", "")
@ -83,9 +82,10 @@ def async_is_plugged_in(hass: HomeAssistant, matcher: USBCallbackMatcher) -> boo
)
usb_discovery: USBDiscovery = hass.data[DOMAIN]
await usb_discovery.async_request_scan()
return any(
_is_matching(USBDevice(*device_tuple), matcher)
for device_tuple in usb_discovery.seen
for device_tuple in usb_discovery.plugged_in
)
@ -182,6 +182,7 @@ class USBDiscovery:
"""Init USB Discovery."""
self.hass = hass
self.usb = usb
self.plugged_in: set[tuple[str, ...]] = set()
self.seen: set[tuple[str, ...]] = set()
self.observer_active = False
self._request_debouncer: Debouncer[Coroutine[Any, Any, None]] | None = None
@ -263,6 +264,7 @@ class USBDiscovery:
"""Process a USB discovery."""
_LOGGER.debug("Discovered USB Device: %s", device)
device_tuple = dataclasses.astuple(device)
self.plugged_in.add(device_tuple)
if device_tuple in self.seen:
return
self.seen.add(device_tuple)
@ -299,6 +301,7 @@ class USBDiscovery:
@hass_callback
def _async_process_ports(self, ports: list[ListPortInfo]) -> None:
"""Process each discovered port."""
self.plugged_in = set()
for port in ports:
if port.vid is None and port.pid is None:
continue

View file

@ -49,16 +49,16 @@ async def test_hardware_info(
)
config_entry_2.add_to_hass(hass)
with patch(
"homeassistant.components.homeassistant_sky_connect.usb.async_is_plugged_in",
"homeassistant.components.homeassistant_sky_connect.util.usb.async_is_plugged_in",
return_value=True,
):
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
client = await hass_ws_client(hass)
client = await hass_ws_client(hass)
await client.send_json({"id": 1, "type": "hardware/info"})
msg = await client.receive_json()
await client.send_json({"id": 1, "type": "hardware/info"})
msg = await client.receive_json()
assert msg["id"] == 1
assert msg["success"]
@ -92,3 +92,36 @@ async def test_hardware_info(
},
]
}
def dongle_1_unplugged(hass, matcher):
"""Fake that the dongle for entry 1 is unplugged."""
if matcher["vid"] == CONFIG_ENTRY_DATA["vid"].upper():
return False
return True
with patch(
"homeassistant.components.homeassistant_sky_connect.util.usb.async_is_plugged_in",
wraps=dongle_1_unplugged,
):
await client.send_json({"id": 2, "type": "hardware/info"})
msg = await client.receive_json()
assert msg["id"] == 2
assert msg["success"]
assert msg["result"] == {
"hardware": [
{
"board": None,
"config_entries": [config_entry_2.entry_id],
"dongle": {
"vid": "bla_vid_2",
"pid": "bla_pid_2",
"serial_number": "bla_serial_number_2",
"manufacturer": "bla_manufacturer_2",
"description": "bla_description_2",
},
"name": "Home Assistant Sky Connect",
"url": None,
}
]
}

View file

@ -64,7 +64,7 @@ async def test_setup_entry(
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.homeassistant_sky_connect.usb.async_is_plugged_in",
"homeassistant.components.homeassistant_sky_connect.util.usb.async_is_plugged_in",
return_value=True,
) as mock_is_plugged_in, patch(
"homeassistant.components.onboarding.async_is_onboarded", return_value=onboarded
@ -109,7 +109,7 @@ async def test_setup_zha(
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.homeassistant_sky_connect.usb.async_is_plugged_in",
"homeassistant.components.homeassistant_sky_connect.util.usb.async_is_plugged_in",
return_value=True,
) as mock_is_plugged_in, patch(
"homeassistant.components.onboarding.async_is_onboarded", return_value=False
@ -157,7 +157,7 @@ async def test_setup_zha_multipan(
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.homeassistant_sky_connect.usb.async_is_plugged_in",
"homeassistant.components.homeassistant_sky_connect.util.usb.async_is_plugged_in",
return_value=True,
) as mock_is_plugged_in, patch(
"homeassistant.components.onboarding.async_is_onboarded", return_value=False
@ -208,7 +208,7 @@ async def test_setup_zha_multipan_other_device(
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.homeassistant_sky_connect.usb.async_is_plugged_in",
"homeassistant.components.homeassistant_sky_connect.util.usb.async_is_plugged_in",
return_value=True,
) as mock_is_plugged_in, patch(
"homeassistant.components.onboarding.async_is_onboarded", return_value=False
@ -255,13 +255,16 @@ async def test_setup_entry_wait_usb(hass: HomeAssistant) -> None:
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.homeassistant_sky_connect.usb.async_is_plugged_in",
"homeassistant.components.homeassistant_sky_connect.util.usb.async_is_plugged_in",
return_value=False,
) as mock_is_plugged_in:
assert not await hass.config_entries.async_setup(config_entry.entry_id)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert len(mock_is_plugged_in.mock_calls) == 1
assert config_entry.state == ConfigEntryState.SETUP_RETRY
assert config_entry.state == ConfigEntryState.LOADED
zha_flows = hass.config_entries.flow.async_progress_by_handler("zha")
assert len(zha_flows) == 0
async def test_setup_entry_addon_info_fails(
@ -279,7 +282,7 @@ async def test_setup_entry_addon_info_fails(
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.homeassistant_sky_connect.usb.async_is_plugged_in",
"homeassistant.components.homeassistant_sky_connect.util.usb.async_is_plugged_in",
return_value=True,
), patch(
"homeassistant.components.onboarding.async_is_onboarded", return_value=False
@ -305,7 +308,7 @@ async def test_setup_entry_addon_not_running(
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.homeassistant_sky_connect.usb.async_is_plugged_in",
"homeassistant.components.homeassistant_sky_connect.util.usb.async_is_plugged_in",
return_value=True,
), patch(
"homeassistant.components.onboarding.async_is_onboarded", return_value=False

View file

@ -835,7 +835,7 @@ def test_human_readable_device_name():
assert "8A2A" in name
async def test_async_is_plugged_in(hass, hass_ws_client):
async def test_async_is_plugged_in(hass):
"""Test async_is_plugged_in."""
new_usb = [{"domain": "test1", "vid": "3039", "pid": "3039"}]
@ -859,22 +859,28 @@ async def test_async_is_plugged_in(hass, hass_ws_client):
"homeassistant.components.usb.async_get_usb", return_value=new_usb
), patch("homeassistant.components.usb.comports", return_value=[]), patch.object(
hass.config_entries.flow, "async_init"
), patch(
"homeassistant.components.usb.REQUEST_SCAN_COOLDOWN", 0
):
assert await async_setup_component(hass, "usb", {"usb": {}})
await hass.async_block_till_done()
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert not usb.async_is_plugged_in(hass, matcher)
assert not await usb.async_is_plugged_in(hass, matcher)
with patch(
"homeassistant.components.usb.comports", return_value=mock_comports
), patch.object(hass.config_entries.flow, "async_init"):
ws_client = await hass_ws_client(hass)
await ws_client.send_json({"id": 1, "type": "usb/scan"})
response = await ws_client.receive_json()
assert response["success"]
), patch.object(hass.config_entries.flow, "async_init"), patch(
"homeassistant.components.usb.REQUEST_SCAN_COOLDOWN", 0
):
await hass.async_block_till_done()
assert usb.async_is_plugged_in(hass, matcher)
assert await usb.async_is_plugged_in(hass, matcher)
with patch("homeassistant.components.usb.comports", return_value=[]), patch.object(
hass.config_entries.flow, "async_init"
), patch("homeassistant.components.usb.REQUEST_SCAN_COOLDOWN", 0):
await hass.async_block_till_done()
assert not await usb.async_is_plugged_in(hass, matcher)
@pytest.mark.parametrize(
@ -903,7 +909,7 @@ async def test_async_is_plugged_in_case_enforcement(hass, matcher):
await hass.async_block_till_done()
with pytest.raises(ValueError):
usb.async_is_plugged_in(hass, matcher)
await usb.async_is_plugged_in(hass, matcher)
async def test_web_socket_triggers_discovery_request_callbacks(hass, hass_ws_client):