Add upnp binary sensor for connectivity status (#54489)

* New binary sensor for connectivity

* Add binary_sensor

* New binary sensor for connectivity

* Add binary_sensor

* Handle values returned as None

* Small text update for Uptime

* Update homeassistant/components/upnp/binary_sensor.py

Co-authored-by: Joakim Sørensen <hi@ludeeus.dev>

* Update homeassistant/components/upnp/binary_sensor.py

Co-authored-by: Joakim Sørensen <hi@ludeeus.dev>

* Update homeassistant/components/upnp/binary_sensor.py

Co-authored-by: Joakim Sørensen <hi@ludeeus.dev>

* Update homeassistant/components/upnp/binary_sensor.py

Co-authored-by: Joakim Sørensen <hi@ludeeus.dev>

* Update homeassistant/components/upnp/binary_sensor.py

Co-authored-by: Joakim Sørensen <hi@ludeeus.dev>

* Update homeassistant/components/upnp/binary_sensor.py

Co-authored-by: Joakim Sørensen <hi@ludeeus.dev>

* Update homeassistant/components/upnp/binary_sensor.py

Co-authored-by: Joakim Sørensen <hi@ludeeus.dev>

* Update homeassistant/components/upnp/binary_sensor.py

Co-authored-by: Joakim Sørensen <hi@ludeeus.dev>

* Updates based on review

* Update homeassistant/components/upnp/binary_sensor.py

Co-authored-by: Joakim Sørensen <hi@ludeeus.dev>

* Further updates based on review

* Set device_class as a class atribute

* Create 1 combined data coordinator
and UpnpEntity class

* Updates on coordinator

* Update comment

* Fix in async_step_init for coordinator

* Add async_get_status to mocked device
and set times polled for each call seperately

* Updated to get device through coordinator
Check polling for each status call seperately

* Use collections.abc instead of Typing for Mapping

* Remove adding device to hass.data as coordinator
is now saved

* Removed setting _coordinator

* Added myself as codeowner

* Update type in __init__

* Removed attributes from binary sensor

* Fix async_unload_entry

* Add expected return value to is_on

Co-authored-by: Joakim Sørensen <hi@ludeeus.dev>
This commit is contained in:
ehendrix23 2021-08-17 12:23:41 -06:00 committed by GitHub
parent 5b75c8254b
commit 8bf79d61ee
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 222 additions and 130 deletions

View file

@ -545,7 +545,7 @@ homeassistant/components/upb/* @gwww
homeassistant/components/upc_connect/* @pvizeli @fabaff
homeassistant/components/upcloud/* @scop
homeassistant/components/updater/* @home-assistant/core
homeassistant/components/upnp/* @StevenLooman
homeassistant/components/upnp/* @StevenLooman @ehendrix23
homeassistant/components/uptimerobot/* @ludeeus
homeassistant/components/usgs_earthquakes_feed/* @exxamalte
homeassistant/components/utility_meter/* @dgomes

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Mapping
from datetime import timedelta
from ipaddress import ip_address
from typing import Any
@ -17,24 +18,30 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv, device_registry as dr
from homeassistant.helpers.typing import ConfigType
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from .const import (
CONF_LOCAL_IP,
CONFIG_ENTRY_HOSTNAME,
CONFIG_ENTRY_SCAN_INTERVAL,
CONFIG_ENTRY_ST,
CONFIG_ENTRY_UDN,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
DOMAIN_CONFIG,
DOMAIN_DEVICES,
DOMAIN_LOCAL_IP,
LOGGER as _LOGGER,
LOGGER,
)
from .device import Device
NOTIFICATION_ID = "upnp_notification"
NOTIFICATION_TITLE = "UPnP/IGD Setup"
PLATFORMS = ["sensor"]
PLATFORMS = ["binary_sensor", "sensor"]
CONFIG_SCHEMA = vol.Schema(
{
@ -50,7 +57,7 @@ CONFIG_SCHEMA = vol.Schema(
async def async_setup(hass: HomeAssistant, config: ConfigType):
"""Set up UPnP component."""
_LOGGER.debug("async_setup, config: %s", config)
LOGGER.debug("async_setup, config: %s", config)
conf_default = CONFIG_SCHEMA({DOMAIN: {}})[DOMAIN]
conf = config.get(DOMAIN, conf_default)
local_ip = await async_get_source_ip(hass, PUBLIC_TARGET_IP)
@ -73,7 +80,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType):
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up UPnP/IGD device from a config entry."""
_LOGGER.debug("Setting up config entry: %s", entry.unique_id)
LOGGER.debug("Setting up config entry: %s", entry.unique_id)
udn = entry.data[CONFIG_ENTRY_UDN]
st = entry.data[CONFIG_ENTRY_ST] # pylint: disable=invalid-name
@ -86,7 +93,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
@callback
def device_discovered(info: Mapping[str, Any]) -> None:
nonlocal discovery_info
_LOGGER.debug(
LOGGER.debug(
"Device discovered: %s, at: %s", usn, info[ssdp.ATTR_SSDP_LOCATION]
)
discovery_info = info
@ -103,7 +110,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
try:
await asyncio.wait_for(device_discovered_event.wait(), timeout=10)
except asyncio.TimeoutError as err:
_LOGGER.debug("Device not discovered: %s", usn)
LOGGER.debug("Device not discovered: %s", usn)
raise ConfigEntryNotReady from err
finally:
cancel_discovered_callback()
@ -114,12 +121,9 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
]
device = await Device.async_create_device(hass, location)
# Save device.
hass.data[DOMAIN][DOMAIN_DEVICES][udn] = device
# Ensure entry has a unique_id.
if not entry.unique_id:
_LOGGER.debug(
LOGGER.debug(
"Setting unique_id: %s, for config_entry: %s",
device.unique_id,
entry,
@ -150,8 +154,24 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
model=device.model_name,
)
update_interval_sec = entry.options.get(
CONFIG_ENTRY_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
)
update_interval = timedelta(seconds=update_interval_sec)
LOGGER.debug("update_interval: %s", update_interval)
coordinator = UpnpDataUpdateCoordinator(
hass,
device=device,
update_interval=update_interval,
)
# Save coordinator.
hass.data[DOMAIN][entry.entry_id] = coordinator
await coordinator.async_config_entry_first_refresh()
# Create sensors.
_LOGGER.debug("Enabling sensors")
LOGGER.debug("Enabling sensors")
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
# Start device updater.
@ -162,14 +182,53 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Unload a UPnP/IGD device from a config entry."""
_LOGGER.debug("Unloading config entry: %s", config_entry.unique_id)
LOGGER.debug("Unloading config entry: %s", config_entry.unique_id)
udn = config_entry.data.get(CONFIG_ENTRY_UDN)
if udn in hass.data[DOMAIN][DOMAIN_DEVICES]:
device = hass.data[DOMAIN][DOMAIN_DEVICES][udn]
await device.async_stop()
if coordinator := hass.data[DOMAIN].pop(config_entry.entry_id, None):
await coordinator.device.async_stop()
del hass.data[DOMAIN][DOMAIN_DEVICES][udn]
_LOGGER.debug("Deleting sensors")
LOGGER.debug("Deleting sensors")
return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)
class UpnpDataUpdateCoordinator(DataUpdateCoordinator):
"""Define an object to update data from UPNP device."""
def __init__(
self, hass: HomeAssistant, device: Device, update_interval: timedelta
) -> None:
"""Initialize."""
self.device = device
super().__init__(
hass, LOGGER, name=device.name, update_interval=update_interval
)
async def _async_update_data(self) -> Mapping[str, Any]:
"""Update data."""
update_values = await asyncio.gather(
self.device.async_get_traffic_data(),
self.device.async_get_status(),
)
data = dict(update_values[0])
data.update(update_values[1])
return data
class UpnpEntity(CoordinatorEntity):
"""Base class for UPnP/IGD entities."""
coordinator: UpnpDataUpdateCoordinator
def __init__(self, coordinator: UpnpDataUpdateCoordinator) -> None:
"""Initialize the base entities."""
super().__init__(coordinator)
self._device = coordinator.device
self._attr_device_info = {
"connections": {(dr.CONNECTION_UPNP, coordinator.device.udn)},
"name": coordinator.device.name,
"manufacturer": coordinator.device.manufacturer,
"model": coordinator.device.model_name,
}

View file

@ -0,0 +1,54 @@
"""Support for UPnP/IGD Binary Sensors."""
from __future__ import annotations
from homeassistant.components.binary_sensor import (
DEVICE_CLASS_CONNECTIVITY,
BinarySensorEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import UpnpDataUpdateCoordinator, UpnpEntity
from .const import DOMAIN, LOGGER, WANSTATUS
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the UPnP/IGD sensors."""
coordinator = hass.data[DOMAIN][config_entry.entry_id]
LOGGER.debug("Adding binary sensor")
sensors = [
UpnpStatusBinarySensor(coordinator),
]
async_add_entities(sensors)
class UpnpStatusBinarySensor(UpnpEntity, BinarySensorEntity):
"""Class for UPnP/IGD binary sensors."""
_attr_device_class = DEVICE_CLASS_CONNECTIVITY
def __init__(
self,
coordinator: UpnpDataUpdateCoordinator,
) -> None:
"""Initialize the base sensor."""
super().__init__(coordinator)
self._attr_name = f"{coordinator.device.name} wan status"
self._attr_unique_id = f"{coordinator.device.udn}_wanstatus"
@property
def available(self) -> bool:
"""Return if entity is available."""
return super().available and self.coordinator.data.get(WANSTATUS)
@property
def is_on(self) -> bool:
"""Return true if the binary sensor is on."""
return self.coordinator.data[WANSTATUS] == "Connected"

View file

@ -20,8 +20,7 @@ from .const import (
CONFIG_ENTRY_UDN,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
DOMAIN_DEVICES,
LOGGER as _LOGGER,
LOGGER,
SSDP_SEARCH_TIMEOUT,
ST_IGD_V1,
ST_IGD_V2,
@ -43,7 +42,7 @@ async def _async_wait_for_discoveries(hass: HomeAssistant) -> bool:
@callback
def device_discovered(info: Mapping[str, Any]) -> None:
_LOGGER.info(
LOGGER.info(
"Device discovered: %s, at: %s",
info[ssdp.ATTR_SSDP_USN],
info[ssdp.ATTR_SSDP_LOCATION],
@ -103,7 +102,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
self, user_input: Mapping | None = None
) -> Mapping[str, Any]:
"""Handle a flow start."""
_LOGGER.debug("async_step_user: user_input: %s", user_input)
LOGGER.debug("async_step_user: user_input: %s", user_input)
if user_input is not None:
# Ensure wanted device was discovered.
@ -162,12 +161,12 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
configured before, find any device and create a config_entry for it.
Otherwise, do nothing.
"""
_LOGGER.debug("async_step_import: import_info: %s", import_info)
LOGGER.debug("async_step_import: import_info: %s", import_info)
# Landed here via configuration.yaml entry.
# Any device already added, then abort.
if self._async_current_entries():
_LOGGER.debug("Already configured, aborting")
LOGGER.debug("Already configured, aborting")
return self.async_abort(reason="already_configured")
# Discover devices.
@ -176,7 +175,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
# Ensure anything to add. If not, silently abort.
if not discoveries:
_LOGGER.info("No UPnP devices discovered, aborting")
LOGGER.info("No UPnP devices discovered, aborting")
return self.async_abort(reason="no_devices_found")
# Ensure complete discovery.
@ -187,7 +186,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
or ssdp.ATTR_SSDP_LOCATION not in discovery
or ssdp.ATTR_SSDP_USN not in discovery
):
_LOGGER.debug("Incomplete discovery, ignoring")
LOGGER.debug("Incomplete discovery, ignoring")
return self.async_abort(reason="incomplete_discovery")
# Ensure not already configuring/configured.
@ -202,7 +201,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
This flow is triggered by the SSDP component. It will check if the
host is already configured and delegate to the import step if not.
"""
_LOGGER.debug("async_step_ssdp: discovery_info: %s", discovery_info)
LOGGER.debug("async_step_ssdp: discovery_info: %s", discovery_info)
# Ensure complete discovery.
if (
@ -211,7 +210,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
or ssdp.ATTR_SSDP_LOCATION not in discovery_info
or ssdp.ATTR_SSDP_USN not in discovery_info
):
_LOGGER.debug("Incomplete discovery, ignoring")
LOGGER.debug("Incomplete discovery, ignoring")
return self.async_abort(reason="incomplete_discovery")
# Ensure not already configuring/configured.
@ -225,7 +224,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
for config_entry in existing_entries:
entry_hostname = config_entry.data.get(CONFIG_ENTRY_HOSTNAME)
if entry_hostname == hostname:
_LOGGER.debug(
LOGGER.debug(
"Found existing config_entry with same hostname, discovery ignored"
)
return self.async_abort(reason="discovery_ignored")
@ -244,7 +243,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
self, user_input: Mapping | None = None
) -> Mapping[str, Any]:
"""Confirm integration via SSDP."""
_LOGGER.debug("async_step_ssdp_confirm: user_input: %s", user_input)
LOGGER.debug("async_step_ssdp_confirm: user_input: %s", user_input)
if user_input is None:
return self.async_show_form(step_id="ssdp_confirm")
@ -264,7 +263,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
discovery: Mapping,
) -> Mapping[str, Any]:
"""Create an entry from discovery."""
_LOGGER.debug(
LOGGER.debug(
"_async_create_entry_from_discovery: discovery: %s",
discovery,
)
@ -288,13 +287,12 @@ class UpnpOptionsFlowHandler(config_entries.OptionsFlow):
async def async_step_init(self, user_input: Mapping = None) -> None:
"""Manage the options."""
if user_input is not None:
udn = self.config_entry.data[CONFIG_ENTRY_UDN]
coordinator = self.hass.data[DOMAIN][DOMAIN_DEVICES][udn].coordinator
coordinator = self.hass.data[DOMAIN][self.config_entry.entry_id]
update_interval_sec = user_input.get(
CONFIG_ENTRY_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
)
update_interval = timedelta(seconds=update_interval_sec)
_LOGGER.debug("Updating coordinator, update_interval: %s", update_interval)
LOGGER.debug("Updating coordinator, update_interval: %s", update_interval)
coordinator.update_interval = update_interval
return self.async_create_entry(title="", data=user_input)

View file

@ -18,6 +18,9 @@ PACKETS_SENT = "packets_sent"
TIMESTAMP = "timestamp"
DATA_PACKETS = "packets"
DATA_RATE_PACKETS_PER_SECOND = f"{DATA_PACKETS}/{TIME_SECONDS}"
WANSTATUS = "wan_status"
WANIP = "wan_ip"
UPTIME = "uptime"
KIBIBYTE = 1024
UPDATE_INTERVAL = timedelta(seconds=30)
CONFIG_ENTRY_SCAN_INTERVAL = "scan_interval"

View file

@ -27,6 +27,9 @@ from .const import (
PACKETS_RECEIVED,
PACKETS_SENT,
TIMESTAMP,
UPTIME,
WANIP,
WANSTATUS,
)
@ -154,3 +157,18 @@ class Device:
PACKETS_RECEIVED: values[2],
PACKETS_SENT: values[3],
}
async def async_get_status(self) -> Mapping[str, Any]:
"""Get connection status, uptime, and external IP."""
_LOGGER.debug("Getting status for device: %s", self)
values = await asyncio.gather(
self._igd_device.async_get_status_info(),
self._igd_device.async_get_external_ip_address(),
)
return {
WANSTATUS: values[0][0] if values[0] is not None else None,
UPTIME: values[0][2] if values[0] is not None else None,
WANIP: values[1],
}

View file

@ -5,7 +5,7 @@
"documentation": "https://www.home-assistant.io/integrations/upnp",
"requirements": ["async-upnp-client==0.19.2"],
"dependencies": ["network", "ssdp"],
"codeowners": ["@StevenLooman"],
"codeowners": ["@StevenLooman","@ehendrix23"],
"ssdp": [
{
"st": "urn:schemas-upnp-org:device:InternetGatewayDevice:1"

View file

@ -1,38 +1,25 @@
"""Support for UPnP/IGD Sensors."""
from __future__ import annotations
from datetime import timedelta
from typing import Any, Mapping
from homeassistant.components.sensor import SensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import DATA_BYTES, DATA_RATE_KIBIBYTES_PER_SECOND
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from . import UpnpDataUpdateCoordinator, UpnpEntity
from .const import (
BYTES_RECEIVED,
BYTES_SENT,
CONFIG_ENTRY_SCAN_INTERVAL,
CONFIG_ENTRY_UDN,
DATA_PACKETS,
DATA_RATE_PACKETS_PER_SECOND,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
DOMAIN_DEVICES,
KIBIBYTE,
LOGGER as _LOGGER,
LOGGER,
PACKETS_RECEIVED,
PACKETS_SENT,
TIMESTAMP,
)
from .device import Device
SENSOR_TYPES = {
BYTES_RECEIVED: {
@ -78,7 +65,7 @@ async def async_setup_platform(
hass: HomeAssistant, config, async_add_entities, discovery_info=None
) -> None:
"""Old way of setting up UPnP/IGD sensors."""
_LOGGER.debug(
LOGGER.debug(
"async_setup_platform: config: %s, discovery: %s", config, discovery_info
)
@ -89,52 +76,36 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the UPnP/IGD sensors."""
udn = config_entry.data[CONFIG_ENTRY_UDN]
device: Device = hass.data[DOMAIN][DOMAIN_DEVICES][udn]
coordinator = hass.data[DOMAIN][config_entry.entry_id]
update_interval_sec = config_entry.options.get(
CONFIG_ENTRY_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
)
update_interval = timedelta(seconds=update_interval_sec)
_LOGGER.debug("update_interval: %s", update_interval)
_LOGGER.debug("Adding sensors")
coordinator = DataUpdateCoordinator[Mapping[str, Any]](
hass,
_LOGGER,
name=device.name,
update_method=device.async_get_traffic_data,
update_interval=update_interval,
)
device.coordinator = coordinator
await coordinator.async_refresh()
LOGGER.debug("Adding sensors")
sensors = [
RawUpnpSensor(coordinator, device, SENSOR_TYPES[BYTES_RECEIVED]),
RawUpnpSensor(coordinator, device, SENSOR_TYPES[BYTES_SENT]),
RawUpnpSensor(coordinator, device, SENSOR_TYPES[PACKETS_RECEIVED]),
RawUpnpSensor(coordinator, device, SENSOR_TYPES[PACKETS_SENT]),
DerivedUpnpSensor(coordinator, device, SENSOR_TYPES[BYTES_RECEIVED]),
DerivedUpnpSensor(coordinator, device, SENSOR_TYPES[BYTES_SENT]),
DerivedUpnpSensor(coordinator, device, SENSOR_TYPES[PACKETS_RECEIVED]),
DerivedUpnpSensor(coordinator, device, SENSOR_TYPES[PACKETS_SENT]),
RawUpnpSensor(coordinator, SENSOR_TYPES[BYTES_RECEIVED]),
RawUpnpSensor(coordinator, SENSOR_TYPES[BYTES_SENT]),
RawUpnpSensor(coordinator, SENSOR_TYPES[PACKETS_RECEIVED]),
RawUpnpSensor(coordinator, SENSOR_TYPES[PACKETS_SENT]),
DerivedUpnpSensor(coordinator, SENSOR_TYPES[BYTES_RECEIVED]),
DerivedUpnpSensor(coordinator, SENSOR_TYPES[BYTES_SENT]),
DerivedUpnpSensor(coordinator, SENSOR_TYPES[PACKETS_RECEIVED]),
DerivedUpnpSensor(coordinator, SENSOR_TYPES[PACKETS_SENT]),
]
async_add_entities(sensors, True)
async_add_entities(sensors)
class UpnpSensor(CoordinatorEntity, SensorEntity):
class UpnpSensor(UpnpEntity, SensorEntity):
"""Base class for UPnP/IGD sensors."""
def __init__(
self,
coordinator: DataUpdateCoordinator[Mapping[str, Any]],
device: Device,
sensor_type: Mapping[str, str],
coordinator: UpnpDataUpdateCoordinator,
sensor_type: dict[str, str],
) -> None:
"""Initialize the base sensor."""
super().__init__(coordinator)
self._device = device
self._sensor_type = sensor_type
self._attr_name = f"{coordinator.device.name} {sensor_type['name']}"
self._attr_unique_id = f"{coordinator.device.udn}_{sensor_type['unique_id']}"
@property
def icon(self) -> str:
@ -144,37 +115,15 @@ class UpnpSensor(CoordinatorEntity, SensorEntity):
@property
def available(self) -> bool:
"""Return if entity is available."""
device_value_key = self._sensor_type["device_value_key"]
return (
self.coordinator.last_update_success
and device_value_key in self.coordinator.data
return super().available and self.coordinator.data.get(
self._sensor_type["device_value_key"]
)
@property
def name(self) -> str:
"""Return the name of the sensor."""
return f"{self._device.name} {self._sensor_type['name']}"
@property
def unique_id(self) -> str:
"""Return an unique ID."""
return f"{self._device.udn}_{self._sensor_type['unique_id']}"
@property
def native_unit_of_measurement(self) -> str:
"""Return the unit of measurement of this entity, if any."""
return self._sensor_type["unit"]
@property
def device_info(self) -> DeviceInfo:
"""Get device info."""
return {
"connections": {(dr.CONNECTION_UPNP, self._device.udn)},
"name": self._device.name,
"manufacturer": self._device.manufacturer,
"model": self._device.model_name,
}
class RawUpnpSensor(UpnpSensor):
"""Representation of a UPnP/IGD sensor."""
@ -192,21 +141,15 @@ class RawUpnpSensor(UpnpSensor):
class DerivedUpnpSensor(UpnpSensor):
"""Representation of a UNIT Sent/Received per second sensor."""
def __init__(self, coordinator, device, sensor_type) -> None:
def __init__(self, coordinator: UpnpDataUpdateCoordinator, sensor_type) -> None:
"""Initialize sensor."""
super().__init__(coordinator, device, sensor_type)
super().__init__(coordinator, sensor_type)
self._last_value = None
self._last_timestamp = None
@property
def name(self) -> str:
"""Return the name of the sensor."""
return f"{self._device.name} {self._sensor_type['derived_name']}"
@property
def unique_id(self) -> str:
"""Return an unique ID."""
return f"{self._device.udn}_{self._sensor_type['derived_unique_id']}"
self._attr_name = f"{coordinator.device.name} {sensor_type['derived_name']}"
self._attr_unique_id = (
f"{coordinator.device.udn}_{sensor_type['derived_unique_id']}"
)
@property
def native_unit_of_measurement(self) -> str:

View file

@ -11,6 +11,9 @@ from homeassistant.components.upnp.const import (
PACKETS_RECEIVED,
PACKETS_SENT,
TIMESTAMP,
UPTIME,
WANIP,
WANSTATUS,
)
from homeassistant.components.upnp.device import Device
from homeassistant.util import dt
@ -27,7 +30,8 @@ class MockDevice(Device):
mock_device_updater = AsyncMock()
super().__init__(igd_device, mock_device_updater)
self._udn = udn
self.times_polled = 0
self.traffic_times_polled = 0
self.status_times_polled = 0
@classmethod
async def async_create_device(cls, hass, ssdp_location) -> "MockDevice":
@ -66,7 +70,7 @@ class MockDevice(Device):
async def async_get_traffic_data(self) -> Mapping[str, Any]:
"""Get traffic data."""
self.times_polled += 1
self.traffic_times_polled += 1
return {
TIMESTAMP: dt.utcnow(),
BYTES_RECEIVED: 0,
@ -75,6 +79,15 @@ class MockDevice(Device):
PACKETS_SENT: 0,
}
async def async_get_status(self) -> Mapping[str, Any]:
"""Get connection status, uptime, and external IP."""
self.status_times_polled += 1
return {
WANSTATUS: "Connected",
UPTIME: 0,
WANIP: "192.168.0.1",
}
async def async_start(self) -> None:
"""Start the device updater."""

View file

@ -14,7 +14,6 @@ from homeassistant.components.upnp.const import (
CONFIG_ENTRY_UDN,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
DOMAIN_DEVICES,
)
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.setup import async_setup_component
@ -238,15 +237,17 @@ async def test_options_flow(hass: HomeAssistant):
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id) is True
await hass.async_block_till_done()
mock_device = hass.data[DOMAIN][DOMAIN_DEVICES][TEST_UDN]
mock_device = hass.data[DOMAIN][config_entry.entry_id].device
# Reset.
mock_device.times_polled = 0
mock_device.traffic_times_polled = 0
mock_device.status_times_polled = 0
# Forward time, ensure single poll after 30 (default) seconds.
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=31))
await hass.async_block_till_done()
assert mock_device.times_polled == 1
assert mock_device.traffic_times_polled == 1
assert mock_device.status_times_polled == 1
# Options flow with no input results in form.
result = await hass.config_entries.options.async_init(
@ -267,15 +268,18 @@ async def test_options_flow(hass: HomeAssistant):
# Forward time, ensure single poll after 60 seconds, still from original setting.
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=61))
await hass.async_block_till_done()
assert mock_device.times_polled == 2
assert mock_device.traffic_times_polled == 2
assert mock_device.status_times_polled == 2
# Now the updated interval takes effect.
# Forward time, ensure single poll after 120 seconds.
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=121))
await hass.async_block_till_done()
assert mock_device.times_polled == 3
assert mock_device.traffic_times_polled == 3
assert mock_device.status_times_polled == 3
# Forward time, ensure single poll after 180 seconds.
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=181))
await hass.async_block_till_done()
assert mock_device.times_polled == 4
assert mock_device.traffic_times_polled == 4
assert mock_device.status_times_polled == 4