441 lines
16 KiB
Python
441 lines
16 KiB
Python
"""Tests for the TP-Link component."""
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from typing import Any
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from pyHS100 import SmartBulb, SmartDevice, SmartDeviceException, SmartPlug, smartstrip
|
|
from pyHS100.smartdevice import EmeterStatus
|
|
import pytest
|
|
|
|
from homeassistant import config_entries, data_entry_flow
|
|
from homeassistant.components import tplink
|
|
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
|
|
from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN
|
|
from homeassistant.components.tplink.common import SmartDevices
|
|
from homeassistant.components.tplink.const import (
|
|
CONF_DIMMER,
|
|
CONF_DISCOVERY,
|
|
CONF_LIGHT,
|
|
CONF_SW_VERSION,
|
|
CONF_SWITCH,
|
|
UNAVAILABLE_RETRY_DELAY,
|
|
)
|
|
from homeassistant.components.tplink.sensor import ENERGY_SENSORS
|
|
from homeassistant.const import CONF_HOST
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers import device_registry as dr
|
|
from homeassistant.setup import async_setup_component
|
|
from homeassistant.util import dt, slugify
|
|
|
|
from tests.common import MockConfigEntry, async_fire_time_changed, mock_coro
|
|
from tests.components.tplink.consts import (
|
|
SMARTPLUG_HS100_DATA,
|
|
SMARTPLUG_HS110_DATA,
|
|
SMARTSTRIP_KP303_DATA,
|
|
)
|
|
|
|
|
|
async def test_creating_entry_tries_discover(hass):
|
|
"""Test setting up does discovery."""
|
|
with patch(
|
|
"homeassistant.components.tplink.async_setup_entry",
|
|
return_value=mock_coro(True),
|
|
) as mock_setup, patch(
|
|
"homeassistant.components.tplink.common.Discover.discover",
|
|
return_value={"host": 1234},
|
|
):
|
|
result = await hass.config_entries.flow.async_init(
|
|
tplink.DOMAIN, context={"source": config_entries.SOURCE_USER}
|
|
)
|
|
|
|
# Confirmation form
|
|
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
|
|
|
result = await hass.config_entries.flow.async_configure(result["flow_id"], {})
|
|
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
|
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(mock_setup.mock_calls) == 1
|
|
|
|
|
|
async def test_configuring_tplink_causes_discovery(hass):
|
|
"""Test that specifying empty config does discovery."""
|
|
with patch("homeassistant.components.tplink.common.Discover.discover") as discover:
|
|
discover.return_value = {"host": 1234}
|
|
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(discover.mock_calls) == 1
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"name,cls,platform",
|
|
[
|
|
("pyHS100.SmartPlug", SmartPlug, "switch"),
|
|
("pyHS100.SmartBulb", SmartBulb, "light"),
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("count", [1, 2, 3])
|
|
async def test_configuring_device_types(hass, name, cls, platform, count):
|
|
"""Test that light or switch platform list is filled correctly."""
|
|
with patch(
|
|
"homeassistant.components.tplink.common.Discover.discover"
|
|
) as discover, patch(
|
|
"homeassistant.components.tplink.common.SmartDevice._query_helper"
|
|
), patch(
|
|
"homeassistant.components.tplink.light.async_setup_entry",
|
|
return_value=True,
|
|
):
|
|
discovery_data = {
|
|
f"123.123.123.{c}": cls("123.123.123.123") for c in range(count)
|
|
}
|
|
discover.return_value = discovery_data
|
|
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(discover.mock_calls) == 1
|
|
assert len(hass.data[tplink.DOMAIN][platform]) == count
|
|
|
|
|
|
class UnknownSmartDevice(SmartDevice):
|
|
"""Dummy class for testing."""
|
|
|
|
@property
|
|
def has_emeter(self) -> bool:
|
|
"""Do nothing."""
|
|
|
|
def turn_off(self) -> None:
|
|
"""Do nothing."""
|
|
|
|
def turn_on(self) -> None:
|
|
"""Do nothing."""
|
|
|
|
@property
|
|
def is_on(self) -> bool:
|
|
"""Do nothing."""
|
|
|
|
@property
|
|
def state_information(self) -> dict[str, Any]:
|
|
"""Do nothing."""
|
|
|
|
|
|
async def test_configuring_devices_from_multiple_sources(hass):
|
|
"""Test static and discover devices are not duplicated."""
|
|
with patch(
|
|
"homeassistant.components.tplink.common.Discover.discover"
|
|
) as discover, patch(
|
|
"homeassistant.components.tplink.common.SmartDevice._query_helper"
|
|
), patch(
|
|
"homeassistant.config_entries.ConfigEntries.async_forward_entry_setup"
|
|
):
|
|
discover_device_fail = SmartPlug("123.123.123.123")
|
|
discover_device_fail.get_sysinfo = MagicMock(side_effect=SmartDeviceException())
|
|
|
|
discover.return_value = {
|
|
"123.123.123.1": SmartBulb("123.123.123.1"),
|
|
"123.123.123.2": SmartPlug("123.123.123.2"),
|
|
"123.123.123.3": SmartBulb("123.123.123.3"),
|
|
"123.123.123.4": SmartPlug("123.123.123.4"),
|
|
"123.123.123.123": discover_device_fail,
|
|
"123.123.123.124": UnknownSmartDevice("123.123.123.124"),
|
|
}
|
|
|
|
await async_setup_component(
|
|
hass,
|
|
tplink.DOMAIN,
|
|
{
|
|
tplink.DOMAIN: {
|
|
CONF_LIGHT: [{CONF_HOST: "123.123.123.1"}],
|
|
CONF_SWITCH: [{CONF_HOST: "123.123.123.2"}],
|
|
CONF_DIMMER: [{CONF_HOST: "123.123.123.22"}],
|
|
}
|
|
},
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(discover.mock_calls) == 1
|
|
assert len(hass.data[tplink.DOMAIN][CONF_LIGHT]) == 3
|
|
assert len(hass.data[tplink.DOMAIN][CONF_SWITCH]) == 2
|
|
|
|
|
|
async def test_is_dimmable(hass):
|
|
"""Test that is_dimmable switches are correctly added as lights."""
|
|
with patch(
|
|
"homeassistant.components.tplink.common.Discover.discover"
|
|
) as discover, patch(
|
|
"homeassistant.components.tplink.light.async_setup_entry",
|
|
return_value=mock_coro(True),
|
|
) as setup, patch(
|
|
"homeassistant.components.tplink.common.SmartDevice._query_helper"
|
|
), patch(
|
|
"homeassistant.components.tplink.common.SmartPlug.is_dimmable", True
|
|
):
|
|
dimmable_switch = SmartPlug("123.123.123.123")
|
|
discover.return_value = {"host": dimmable_switch}
|
|
|
|
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(discover.mock_calls) == 1
|
|
assert len(setup.mock_calls) == 1
|
|
assert len(hass.data[tplink.DOMAIN][CONF_LIGHT]) == 1
|
|
assert not hass.data[tplink.DOMAIN][CONF_SWITCH]
|
|
|
|
|
|
async def test_configuring_discovery_disabled(hass):
|
|
"""Test that discover does not get called when disabled."""
|
|
with patch(
|
|
"homeassistant.components.tplink.async_setup_entry",
|
|
return_value=mock_coro(True),
|
|
) as mock_setup, patch(
|
|
"homeassistant.components.tplink.common.Discover.discover", return_value=[]
|
|
) as discover:
|
|
await async_setup_component(
|
|
hass, tplink.DOMAIN, {tplink.DOMAIN: {tplink.CONF_DISCOVERY: False}}
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
assert discover.call_count == 0
|
|
assert mock_setup.call_count == 1
|
|
|
|
|
|
async def test_platforms_are_initialized(hass: HomeAssistant):
|
|
"""Test that platforms are initialized per configuration array."""
|
|
config = {
|
|
tplink.DOMAIN: {
|
|
CONF_DISCOVERY: False,
|
|
CONF_LIGHT: [{CONF_HOST: "123.123.123.123"}],
|
|
CONF_SWITCH: [{CONF_HOST: "321.321.321.321"}],
|
|
}
|
|
}
|
|
|
|
with patch("homeassistant.components.tplink.common.Discover.discover"), patch(
|
|
"homeassistant.components.tplink.get_static_devices"
|
|
) as get_static_devices, patch(
|
|
"homeassistant.components.tplink.common.SmartDevice._query_helper"
|
|
), patch(
|
|
"homeassistant.components.tplink.light.async_setup_entry",
|
|
return_value=mock_coro(True),
|
|
), patch(
|
|
"homeassistant.components.tplink.common.SmartPlug.is_dimmable",
|
|
False,
|
|
):
|
|
|
|
light = SmartBulb("123.123.123.123")
|
|
switch = SmartPlug("321.321.321.321")
|
|
switch.get_sysinfo = MagicMock(return_value=SMARTPLUG_HS110_DATA["sysinfo"])
|
|
switch.get_emeter_realtime = MagicMock(
|
|
return_value=EmeterStatus(SMARTPLUG_HS110_DATA["realtime"])
|
|
)
|
|
switch.get_emeter_daily = MagicMock(
|
|
return_value={int(time.strftime("%e")): 1.123}
|
|
)
|
|
get_static_devices.return_value = SmartDevices([light], [switch])
|
|
|
|
# patching is_dimmable is necessray to avoid misdetection as light.
|
|
await async_setup_component(hass, tplink.DOMAIN, config)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(f"switch.{switch.alias}")
|
|
assert state
|
|
assert state.name == switch.alias
|
|
|
|
for description in ENERGY_SENSORS:
|
|
state = hass.states.get(
|
|
f"sensor.{switch.alias}_{slugify(description.name)}"
|
|
)
|
|
assert state
|
|
assert state.state is not None
|
|
assert state.name == f"{switch.alias} {description.name}"
|
|
|
|
device_registry = dr.async_get(hass)
|
|
assert len(device_registry.devices) == 1
|
|
device = next(iter(device_registry.devices.values()))
|
|
assert device.name == switch.alias
|
|
assert device.model == switch.model
|
|
assert device.connections == {(dr.CONNECTION_NETWORK_MAC, switch.mac.lower())}
|
|
assert device.sw_version == switch.sys_info[CONF_SW_VERSION]
|
|
|
|
|
|
async def test_smartplug_without_consumption_sensors(hass: HomeAssistant):
|
|
"""Test that platforms are initialized per configuration array."""
|
|
config = {
|
|
tplink.DOMAIN: {
|
|
CONF_DISCOVERY: False,
|
|
CONF_SWITCH: [{CONF_HOST: "321.321.321.321"}],
|
|
}
|
|
}
|
|
|
|
with patch("homeassistant.components.tplink.common.Discover.discover"), patch(
|
|
"homeassistant.components.tplink.get_static_devices"
|
|
) as get_static_devices, patch(
|
|
"homeassistant.components.tplink.common.SmartDevice._query_helper"
|
|
), patch(
|
|
"homeassistant.components.tplink.light.async_setup_entry",
|
|
return_value=mock_coro(True),
|
|
), patch(
|
|
"homeassistant.components.tplink.common.SmartPlug.is_dimmable", False
|
|
):
|
|
|
|
switch = SmartPlug("321.321.321.321")
|
|
switch.get_sysinfo = MagicMock(return_value=SMARTPLUG_HS100_DATA["sysinfo"])
|
|
get_static_devices.return_value = SmartDevices([], [switch])
|
|
|
|
await async_setup_component(hass, tplink.DOMAIN, config)
|
|
await hass.async_block_till_done()
|
|
|
|
entities = hass.states.async_entity_ids(SWITCH_DOMAIN)
|
|
assert len(entities) == 1
|
|
|
|
entities = hass.states.async_entity_ids(SENSOR_DOMAIN)
|
|
assert len(entities) == 0
|
|
|
|
|
|
async def test_smartstrip_device(hass: HomeAssistant):
|
|
"""Test discover a SmartStrip devices."""
|
|
config = {
|
|
tplink.DOMAIN: {
|
|
CONF_DISCOVERY: True,
|
|
}
|
|
}
|
|
|
|
class SmartStrip(smartstrip.SmartStrip):
|
|
"""Moked SmartStrip class."""
|
|
|
|
def get_sysinfo(self):
|
|
return SMARTSTRIP_KP303_DATA["sysinfo"]
|
|
|
|
with patch(
|
|
"homeassistant.components.tplink.common.Discover.discover"
|
|
) as discover, patch(
|
|
"homeassistant.components.tplink.common.SmartDevice._query_helper"
|
|
), patch(
|
|
"homeassistant.components.tplink.common.SmartPlug.get_sysinfo",
|
|
return_value=SMARTSTRIP_KP303_DATA["sysinfo"],
|
|
):
|
|
|
|
strip = SmartStrip("123.123.123.123")
|
|
discover.return_value = {"123.123.123.123": strip}
|
|
|
|
assert await async_setup_component(hass, tplink.DOMAIN, config)
|
|
await hass.async_block_till_done()
|
|
|
|
entities = hass.states.async_entity_ids(SWITCH_DOMAIN)
|
|
assert len(entities) == 3
|
|
|
|
|
|
async def test_no_config_creates_no_entry(hass):
|
|
"""Test for when there is no tplink in config."""
|
|
with patch(
|
|
"homeassistant.components.tplink.async_setup_entry",
|
|
return_value=mock_coro(True),
|
|
) as mock_setup:
|
|
await async_setup_component(hass, tplink.DOMAIN, {})
|
|
await hass.async_block_till_done()
|
|
|
|
assert mock_setup.call_count == 0
|
|
|
|
|
|
async def test_not_available_at_startup(hass: HomeAssistant):
|
|
"""Test when configured devices are not available."""
|
|
config = {
|
|
tplink.DOMAIN: {
|
|
CONF_DISCOVERY: False,
|
|
CONF_SWITCH: [{CONF_HOST: "321.321.321.321"}],
|
|
}
|
|
}
|
|
|
|
with patch("homeassistant.components.tplink.common.Discover.discover"), patch(
|
|
"homeassistant.components.tplink.get_static_devices"
|
|
) as get_static_devices, patch(
|
|
"homeassistant.components.tplink.common.SmartDevice._query_helper"
|
|
), patch(
|
|
"homeassistant.components.tplink.light.async_setup_entry",
|
|
return_value=mock_coro(True),
|
|
), patch(
|
|
"homeassistant.components.tplink.common.SmartPlug.is_dimmable", False
|
|
):
|
|
|
|
switch = SmartPlug("321.321.321.321")
|
|
switch.get_sysinfo = MagicMock(side_effect=SmartDeviceException())
|
|
get_static_devices.return_value = SmartDevices([], [switch])
|
|
|
|
# run setup while device unreachable
|
|
await async_setup_component(hass, tplink.DOMAIN, config)
|
|
await hass.async_block_till_done()
|
|
|
|
entries = hass.config_entries.async_entries(tplink.DOMAIN)
|
|
assert len(entries) == 1
|
|
assert entries[0].state is config_entries.ConfigEntryState.LOADED
|
|
|
|
entities = hass.states.async_entity_ids(SWITCH_DOMAIN)
|
|
assert len(entities) == 0
|
|
|
|
# retrying with still unreachable device
|
|
async_fire_time_changed(hass, dt.utcnow() + UNAVAILABLE_RETRY_DELAY)
|
|
await hass.async_block_till_done()
|
|
|
|
entries = hass.config_entries.async_entries(tplink.DOMAIN)
|
|
assert len(entries) == 1
|
|
assert entries[0].state is config_entries.ConfigEntryState.LOADED
|
|
|
|
entities = hass.states.async_entity_ids(SWITCH_DOMAIN)
|
|
assert len(entities) == 0
|
|
|
|
# retrying with now reachable device
|
|
switch.get_sysinfo = MagicMock(return_value=SMARTPLUG_HS100_DATA["sysinfo"])
|
|
async_fire_time_changed(hass, dt.utcnow() + UNAVAILABLE_RETRY_DELAY)
|
|
await hass.async_block_till_done()
|
|
|
|
entries = hass.config_entries.async_entries(tplink.DOMAIN)
|
|
assert len(entries) == 1
|
|
assert entries[0].state is config_entries.ConfigEntryState.LOADED
|
|
|
|
entities = hass.states.async_entity_ids(SWITCH_DOMAIN)
|
|
assert len(entities) == 1
|
|
|
|
|
|
@pytest.mark.parametrize("platform", ["switch", "light"])
|
|
async def test_unload(hass, platform):
|
|
"""Test that the async_unload_entry works."""
|
|
# As we have currently no configuration, we just to pass the domain here.
|
|
entry = MockConfigEntry(domain=tplink.DOMAIN)
|
|
entry.add_to_hass(hass)
|
|
|
|
with patch(
|
|
"homeassistant.components.tplink.get_static_devices"
|
|
) as get_static_devices, patch(
|
|
"homeassistant.components.tplink.common.SmartDevice._query_helper"
|
|
), patch(
|
|
f"homeassistant.components.tplink.{platform}.async_setup_entry",
|
|
return_value=mock_coro(True),
|
|
) as async_setup_entry:
|
|
config = {
|
|
tplink.DOMAIN: {
|
|
platform: [{CONF_HOST: "123.123.123.123"}],
|
|
CONF_DISCOVERY: False,
|
|
}
|
|
}
|
|
|
|
light = SmartBulb("123.123.123.123")
|
|
switch = SmartPlug("321.321.321.321")
|
|
switch.get_sysinfo = MagicMock(return_value=SMARTPLUG_HS110_DATA["sysinfo"])
|
|
switch.get_emeter_realtime = MagicMock(
|
|
return_value=EmeterStatus(SMARTPLUG_HS110_DATA["realtime"])
|
|
)
|
|
if platform == "light":
|
|
get_static_devices.return_value = SmartDevices([light], [])
|
|
elif platform == "switch":
|
|
get_static_devices.return_value = SmartDevices([], [switch])
|
|
|
|
assert await async_setup_component(hass, tplink.DOMAIN, config)
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(async_setup_entry.mock_calls) == 1
|
|
assert tplink.DOMAIN in hass.data
|
|
|
|
assert await tplink.async_unload_entry(hass, entry)
|
|
assert not hass.data[tplink.DOMAIN]
|