Move AsusWrt sensors update logic in router module (#46606)
This commit is contained in:
parent
a12b98e30e
commit
292f4262aa
6 changed files with 346 additions and 252 deletions
|
@ -31,7 +31,6 @@ from .const import (
|
|||
MODE_ROUTER,
|
||||
PROTOCOL_SSH,
|
||||
PROTOCOL_TELNET,
|
||||
SENSOR_TYPES,
|
||||
)
|
||||
from .router import AsusWrtRouter
|
||||
|
||||
|
@ -39,6 +38,7 @@ PLATFORMS = ["device_tracker", "sensor"]
|
|||
|
||||
CONF_PUB_KEY = "pub_key"
|
||||
SECRET_GROUP = "Password or SSH Key"
|
||||
SENSOR_TYPES = ["devices", "upload_speed", "download_speed", "download", "upload"]
|
||||
|
||||
CONFIG_SCHEMA = vol.Schema(
|
||||
vol.All(
|
||||
|
|
|
@ -21,7 +21,6 @@ from homeassistant.const import (
|
|||
from homeassistant.core import callback
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
|
||||
# pylint:disable=unused-import
|
||||
from .const import (
|
||||
CONF_DNSMASQ,
|
||||
CONF_INTERFACE,
|
||||
|
@ -32,12 +31,12 @@ from .const import (
|
|||
DEFAULT_INTERFACE,
|
||||
DEFAULT_SSH_PORT,
|
||||
DEFAULT_TRACK_UNKNOWN,
|
||||
DOMAIN,
|
||||
MODE_AP,
|
||||
MODE_ROUTER,
|
||||
PROTOCOL_SSH,
|
||||
PROTOCOL_TELNET,
|
||||
)
|
||||
from .const import DOMAIN # pylint:disable=unused-import
|
||||
from .router import get_api
|
||||
|
||||
RESULT_CONN_ERROR = "cannot_connect"
|
||||
|
|
|
@ -20,5 +20,9 @@ MODE_ROUTER = "router"
|
|||
PROTOCOL_SSH = "ssh"
|
||||
PROTOCOL_TELNET = "telnet"
|
||||
|
||||
# Sensor
|
||||
SENSOR_TYPES = ["devices", "upload_speed", "download_speed", "download", "upload"]
|
||||
# Sensors
|
||||
SENSOR_CONNECTED_DEVICE = "sensor_connected_device"
|
||||
SENSOR_RX_BYTES = "sensor_rx_bytes"
|
||||
SENSOR_TX_BYTES = "sensor_tx_bytes"
|
||||
SENSOR_RX_RATES = "sensor_rx_rates"
|
||||
SENSOR_TX_RATES = "sensor_tx_rates"
|
||||
|
|
|
@ -24,6 +24,7 @@ from homeassistant.exceptions import ConfigEntryNotReady
|
|||
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
from homeassistant.helpers.event import async_track_time_interval
|
||||
from homeassistant.helpers.typing import HomeAssistantType
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from .const import (
|
||||
|
@ -37,14 +38,97 @@ from .const import (
|
|||
DEFAULT_TRACK_UNKNOWN,
|
||||
DOMAIN,
|
||||
PROTOCOL_TELNET,
|
||||
SENSOR_CONNECTED_DEVICE,
|
||||
SENSOR_RX_BYTES,
|
||||
SENSOR_RX_RATES,
|
||||
SENSOR_TX_BYTES,
|
||||
SENSOR_TX_RATES,
|
||||
)
|
||||
|
||||
CONF_REQ_RELOAD = [CONF_DNSMASQ, CONF_INTERFACE, CONF_REQUIRE_IP]
|
||||
|
||||
KEY_COORDINATOR = "coordinator"
|
||||
KEY_SENSORS = "sensors"
|
||||
|
||||
SCAN_INTERVAL = timedelta(seconds=30)
|
||||
|
||||
SENSORS_TYPE_BYTES = "sensors_bytes"
|
||||
SENSORS_TYPE_COUNT = "sensors_count"
|
||||
SENSORS_TYPE_RATES = "sensors_rates"
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AsusWrtSensorDataHandler:
|
||||
"""Data handler for AsusWrt sensor."""
|
||||
|
||||
def __init__(self, hass, api):
|
||||
"""Initialize a AsusWrt sensor data handler."""
|
||||
self._hass = hass
|
||||
self._api = api
|
||||
self._connected_devices = 0
|
||||
|
||||
async def _get_connected_devices(self):
|
||||
"""Return number of connected devices."""
|
||||
return {SENSOR_CONNECTED_DEVICE: self._connected_devices}
|
||||
|
||||
async def _get_bytes(self):
|
||||
"""Fetch byte information from the router."""
|
||||
ret_dict: Dict[str, Any] = {}
|
||||
try:
|
||||
datas = await self._api.async_get_bytes_total()
|
||||
except OSError as exc:
|
||||
raise UpdateFailed from exc
|
||||
|
||||
ret_dict[SENSOR_RX_BYTES] = datas[0]
|
||||
ret_dict[SENSOR_TX_BYTES] = datas[1]
|
||||
|
||||
return ret_dict
|
||||
|
||||
async def _get_rates(self):
|
||||
"""Fetch rates information from the router."""
|
||||
ret_dict: Dict[str, Any] = {}
|
||||
try:
|
||||
rates = await self._api.async_get_current_transfer_rates()
|
||||
except OSError as exc:
|
||||
raise UpdateFailed from exc
|
||||
|
||||
ret_dict[SENSOR_RX_RATES] = rates[0]
|
||||
ret_dict[SENSOR_TX_RATES] = rates[1]
|
||||
|
||||
return ret_dict
|
||||
|
||||
def update_device_count(self, conn_devices: int):
|
||||
"""Update connected devices attribute."""
|
||||
if self._connected_devices == conn_devices:
|
||||
return False
|
||||
self._connected_devices = conn_devices
|
||||
return True
|
||||
|
||||
async def get_coordinator(self, sensor_type: str, should_poll=True):
|
||||
"""Get the coordinator for a specific sensor type."""
|
||||
if sensor_type == SENSORS_TYPE_COUNT:
|
||||
method = self._get_connected_devices
|
||||
elif sensor_type == SENSORS_TYPE_BYTES:
|
||||
method = self._get_bytes
|
||||
elif sensor_type == SENSORS_TYPE_RATES:
|
||||
method = self._get_rates
|
||||
else:
|
||||
raise RuntimeError(f"Invalid sensor type: {sensor_type}")
|
||||
|
||||
coordinator = DataUpdateCoordinator(
|
||||
self._hass,
|
||||
_LOGGER,
|
||||
name=sensor_type,
|
||||
update_method=method,
|
||||
# Polling interval. Will only be polled if there are subscribers.
|
||||
update_interval=SCAN_INTERVAL if should_poll else None,
|
||||
)
|
||||
await coordinator.async_refresh()
|
||||
|
||||
return coordinator
|
||||
|
||||
|
||||
class AsusWrtDevInfo:
|
||||
"""Representation of a AsusWrt device info."""
|
||||
|
||||
|
@ -111,8 +195,12 @@ class AsusWrtRouter:
|
|||
self._host = entry.data[CONF_HOST]
|
||||
|
||||
self._devices: Dict[str, Any] = {}
|
||||
self._connected_devices = 0
|
||||
self._connect_error = False
|
||||
|
||||
self._sensors_data_handler: AsusWrtSensorDataHandler = None
|
||||
self._sensors_coordinator: Dict[str, Any] = {}
|
||||
|
||||
self._on_close = []
|
||||
|
||||
self._options = {
|
||||
|
@ -150,6 +238,9 @@ class AsusWrtRouter:
|
|||
# Update devices
|
||||
await self.update_devices()
|
||||
|
||||
# Init Sensors
|
||||
await self.init_sensors_coordinator()
|
||||
|
||||
self.async_on_close(
|
||||
async_track_time_interval(self.hass, self.update_all, SCAN_INTERVAL)
|
||||
)
|
||||
|
@ -201,6 +292,51 @@ class AsusWrtRouter:
|
|||
if new_device:
|
||||
async_dispatcher_send(self.hass, self.signal_device_new)
|
||||
|
||||
self._connected_devices = len(wrt_devices)
|
||||
await self._update_unpolled_sensors()
|
||||
|
||||
async def init_sensors_coordinator(self) -> None:
|
||||
"""Init AsusWrt sensors coordinators."""
|
||||
if self._sensors_data_handler:
|
||||
return
|
||||
|
||||
self._sensors_data_handler = AsusWrtSensorDataHandler(self.hass, self._api)
|
||||
self._sensors_data_handler.update_device_count(self._connected_devices)
|
||||
|
||||
conn_dev_coordinator = await self._sensors_data_handler.get_coordinator(
|
||||
SENSORS_TYPE_COUNT, False
|
||||
)
|
||||
self._sensors_coordinator[SENSORS_TYPE_COUNT] = {
|
||||
KEY_COORDINATOR: conn_dev_coordinator,
|
||||
KEY_SENSORS: [SENSOR_CONNECTED_DEVICE],
|
||||
}
|
||||
|
||||
bytes_coordinator = await self._sensors_data_handler.get_coordinator(
|
||||
SENSORS_TYPE_BYTES
|
||||
)
|
||||
self._sensors_coordinator[SENSORS_TYPE_BYTES] = {
|
||||
KEY_COORDINATOR: bytes_coordinator,
|
||||
KEY_SENSORS: [SENSOR_RX_BYTES, SENSOR_TX_BYTES],
|
||||
}
|
||||
|
||||
rates_coordinator = await self._sensors_data_handler.get_coordinator(
|
||||
SENSORS_TYPE_RATES
|
||||
)
|
||||
self._sensors_coordinator[SENSORS_TYPE_RATES] = {
|
||||
KEY_COORDINATOR: rates_coordinator,
|
||||
KEY_SENSORS: [SENSOR_RX_RATES, SENSOR_TX_RATES],
|
||||
}
|
||||
|
||||
async def _update_unpolled_sensors(self) -> None:
|
||||
"""Request refresh for AsusWrt unpolled sensors."""
|
||||
if not self._sensors_data_handler:
|
||||
return
|
||||
|
||||
if SENSORS_TYPE_COUNT in self._sensors_coordinator:
|
||||
coordinator = self._sensors_coordinator[SENSORS_TYPE_COUNT][KEY_COORDINATOR]
|
||||
if self._sensors_data_handler.update_device_count(self._connected_devices):
|
||||
await coordinator.async_refresh()
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the connection."""
|
||||
if self._api is not None:
|
||||
|
@ -230,6 +366,16 @@ class AsusWrtRouter:
|
|||
self._options.update(new_options)
|
||||
return req_reload
|
||||
|
||||
@property
|
||||
def device_info(self) -> Dict[str, Any]:
|
||||
"""Return the device information."""
|
||||
return {
|
||||
"identifiers": {(DOMAIN, "AsusWRT")},
|
||||
"name": self._host,
|
||||
"model": "Asus Router",
|
||||
"manufacturer": "Asus",
|
||||
}
|
||||
|
||||
@property
|
||||
def signal_device_new(self) -> str:
|
||||
"""Event specific per AsusWrt entry to signal new device."""
|
||||
|
@ -250,6 +396,11 @@ class AsusWrtRouter:
|
|||
"""Return devices."""
|
||||
return self._devices
|
||||
|
||||
@property
|
||||
def sensors_coordinator(self) -> Dict[str, Any]:
|
||||
"""Return sensors coordinators."""
|
||||
return self._sensors_coordinator
|
||||
|
||||
@property
|
||||
def api(self) -> AsusWrt:
|
||||
"""Return router API."""
|
||||
|
|
|
@ -1,236 +1,169 @@
|
|||
"""Asuswrt status sensors."""
|
||||
from datetime import timedelta
|
||||
import enum
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from aioasuswrt.asuswrt import AsusWrt
|
||||
from numbers import Number
|
||||
from typing import Dict
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import CONF_NAME, DATA_GIGABYTES, DATA_RATE_MEGABITS_PER_SECOND
|
||||
from homeassistant.const import DATA_GIGABYTES, DATA_RATE_MEGABITS_PER_SECOND
|
||||
from homeassistant.helpers.typing import HomeAssistantType
|
||||
from homeassistant.helpers.update_coordinator import (
|
||||
CoordinatorEntity,
|
||||
DataUpdateCoordinator,
|
||||
)
|
||||
|
||||
from .const import DATA_ASUSWRT, DOMAIN, SENSOR_TYPES
|
||||
from .const import (
|
||||
DATA_ASUSWRT,
|
||||
DOMAIN,
|
||||
SENSOR_CONNECTED_DEVICE,
|
||||
SENSOR_RX_BYTES,
|
||||
SENSOR_RX_RATES,
|
||||
SENSOR_TX_BYTES,
|
||||
SENSOR_TX_RATES,
|
||||
)
|
||||
from .router import KEY_COORDINATOR, KEY_SENSORS, AsusWrtRouter
|
||||
|
||||
UPLOAD_ICON = "mdi:upload-network"
|
||||
DOWNLOAD_ICON = "mdi:download-network"
|
||||
DEFAULT_PREFIX = "Asuswrt"
|
||||
|
||||
SENSOR_DEVICE_CLASS = "device_class"
|
||||
SENSOR_ICON = "icon"
|
||||
SENSOR_NAME = "name"
|
||||
SENSOR_UNIT = "unit"
|
||||
SENSOR_FACTOR = "factor"
|
||||
SENSOR_DEFAULT_ENABLED = "default_enabled"
|
||||
|
||||
UNIT_DEVICES = "Devices"
|
||||
|
||||
CONNECTION_SENSORS = {
|
||||
SENSOR_CONNECTED_DEVICE: {
|
||||
SENSOR_NAME: "Devices Connected",
|
||||
SENSOR_UNIT: UNIT_DEVICES,
|
||||
SENSOR_FACTOR: 0,
|
||||
SENSOR_ICON: "mdi:router-network",
|
||||
SENSOR_DEVICE_CLASS: None,
|
||||
SENSOR_DEFAULT_ENABLED: True,
|
||||
},
|
||||
SENSOR_RX_RATES: {
|
||||
SENSOR_NAME: "Download Speed",
|
||||
SENSOR_UNIT: DATA_RATE_MEGABITS_PER_SECOND,
|
||||
SENSOR_FACTOR: 125000,
|
||||
SENSOR_ICON: "mdi:download-network",
|
||||
SENSOR_DEVICE_CLASS: None,
|
||||
},
|
||||
SENSOR_TX_RATES: {
|
||||
SENSOR_NAME: "Upload Speed",
|
||||
SENSOR_UNIT: DATA_RATE_MEGABITS_PER_SECOND,
|
||||
SENSOR_FACTOR: 125000,
|
||||
SENSOR_ICON: "mdi:upload-network",
|
||||
SENSOR_DEVICE_CLASS: None,
|
||||
},
|
||||
SENSOR_RX_BYTES: {
|
||||
SENSOR_NAME: "Download",
|
||||
SENSOR_UNIT: DATA_GIGABYTES,
|
||||
SENSOR_FACTOR: 1000000000,
|
||||
SENSOR_ICON: "mdi:download",
|
||||
SENSOR_DEVICE_CLASS: None,
|
||||
},
|
||||
SENSOR_TX_BYTES: {
|
||||
SENSOR_NAME: "Upload",
|
||||
SENSOR_UNIT: DATA_GIGABYTES,
|
||||
SENSOR_FACTOR: 1000000000,
|
||||
SENSOR_ICON: "mdi:upload",
|
||||
SENSOR_DEVICE_CLASS: None,
|
||||
},
|
||||
}
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@enum.unique
|
||||
class _SensorTypes(enum.Enum):
|
||||
DEVICES = "devices"
|
||||
UPLOAD = "upload"
|
||||
DOWNLOAD = "download"
|
||||
DOWNLOAD_SPEED = "download_speed"
|
||||
UPLOAD_SPEED = "upload_speed"
|
||||
|
||||
@property
|
||||
def unit_of_measurement(self) -> Optional[str]:
|
||||
"""Return a string with the unit of the sensortype."""
|
||||
if self in (_SensorTypes.UPLOAD, _SensorTypes.DOWNLOAD):
|
||||
return DATA_GIGABYTES
|
||||
if self in (_SensorTypes.UPLOAD_SPEED, _SensorTypes.DOWNLOAD_SPEED):
|
||||
return DATA_RATE_MEGABITS_PER_SECOND
|
||||
if self == _SensorTypes.DEVICES:
|
||||
return "devices"
|
||||
return None
|
||||
|
||||
@property
|
||||
def icon(self) -> Optional[str]:
|
||||
"""Return the expected icon for the sensortype."""
|
||||
if self in (_SensorTypes.UPLOAD, _SensorTypes.UPLOAD_SPEED):
|
||||
return UPLOAD_ICON
|
||||
if self in (_SensorTypes.DOWNLOAD, _SensorTypes.DOWNLOAD_SPEED):
|
||||
return DOWNLOAD_ICON
|
||||
return None
|
||||
|
||||
@property
|
||||
def sensor_name(self) -> Optional[str]:
|
||||
"""Return the name of the sensor."""
|
||||
if self is _SensorTypes.DEVICES:
|
||||
return "Asuswrt Devices Connected"
|
||||
if self is _SensorTypes.UPLOAD:
|
||||
return "Asuswrt Upload"
|
||||
if self is _SensorTypes.DOWNLOAD:
|
||||
return "Asuswrt Download"
|
||||
if self is _SensorTypes.UPLOAD_SPEED:
|
||||
return "Asuswrt Upload Speed"
|
||||
if self is _SensorTypes.DOWNLOAD_SPEED:
|
||||
return "Asuswrt Download Speed"
|
||||
return None
|
||||
|
||||
@property
|
||||
def is_speed(self) -> bool:
|
||||
"""Return True if the type is an upload/download speed."""
|
||||
return self in (_SensorTypes.UPLOAD_SPEED, _SensorTypes.DOWNLOAD_SPEED)
|
||||
|
||||
@property
|
||||
def is_size(self) -> bool:
|
||||
"""Return True if the type is the total upload/download size."""
|
||||
return self in (_SensorTypes.UPLOAD, _SensorTypes.DOWNLOAD)
|
||||
|
||||
|
||||
class _SensorInfo:
|
||||
"""Class handling sensor information."""
|
||||
|
||||
def __init__(self, sensor_type: _SensorTypes):
|
||||
"""Initialize the handler class."""
|
||||
self.type = sensor_type
|
||||
self.enabled = False
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistantType, entry: ConfigEntry, async_add_entities
|
||||
) -> None:
|
||||
"""Set up the asuswrt sensors."""
|
||||
"""Set up the sensors."""
|
||||
router: AsusWrtRouter = hass.data[DOMAIN][entry.entry_id][DATA_ASUSWRT]
|
||||
entities = []
|
||||
|
||||
router = hass.data[DOMAIN][entry.entry_id][DATA_ASUSWRT]
|
||||
api: AsusWrt = router.api
|
||||
device_name = entry.data.get(CONF_NAME, "AsusWRT")
|
||||
for sensor_data in router.sensors_coordinator.values():
|
||||
coordinator = sensor_data[KEY_COORDINATOR]
|
||||
sensors = sensor_data[KEY_SENSORS]
|
||||
for sensor_key in sensors:
|
||||
if sensor_key in CONNECTION_SENSORS:
|
||||
entities.append(
|
||||
AsusWrtSensor(
|
||||
coordinator, router, sensor_key, CONNECTION_SENSORS[sensor_key]
|
||||
)
|
||||
)
|
||||
|
||||
# Let's discover the valid sensor types.
|
||||
sensors = [_SensorInfo(_SensorTypes(x)) for x in SENSOR_TYPES]
|
||||
|
||||
data_handler = AsuswrtDataHandler(sensors, api)
|
||||
coordinator = DataUpdateCoordinator(
|
||||
hass,
|
||||
_LOGGER,
|
||||
name="sensor",
|
||||
update_method=data_handler.update_data,
|
||||
# Polling interval. Will only be polled if there are subscribers.
|
||||
update_interval=timedelta(seconds=30),
|
||||
)
|
||||
|
||||
await coordinator.async_refresh()
|
||||
async_add_entities(
|
||||
[AsuswrtSensor(coordinator, data_handler, device_name, x.type) for x in sensors]
|
||||
)
|
||||
async_add_entities(entities, True)
|
||||
|
||||
|
||||
class AsuswrtDataHandler:
|
||||
"""Class handling the API updates."""
|
||||
|
||||
def __init__(self, sensors: List[_SensorInfo], api: AsusWrt):
|
||||
"""Initialize the handler class."""
|
||||
self._api = api
|
||||
self._sensors = sensors
|
||||
self._connected = True
|
||||
|
||||
def enable_sensor(self, sensor_type: _SensorTypes):
|
||||
"""Enable a specific sensor type."""
|
||||
for index, sensor in enumerate(self._sensors):
|
||||
if sensor.type == sensor_type:
|
||||
self._sensors[index].enabled = True
|
||||
return
|
||||
|
||||
def disable_sensor(self, sensor_type: _SensorTypes):
|
||||
"""Disable a specific sensor type."""
|
||||
for index, sensor in enumerate(self._sensors):
|
||||
if sensor.type == sensor_type:
|
||||
self._sensors[index].enabled = False
|
||||
return
|
||||
|
||||
async def update_data(self) -> Dict[_SensorTypes, Any]:
|
||||
"""Fetch the relevant data from the router."""
|
||||
ret_dict: Dict[_SensorTypes, Any] = {}
|
||||
try:
|
||||
if _SensorTypes.DEVICES in [x.type for x in self._sensors if x.enabled]:
|
||||
# Let's check the nr of devices.
|
||||
devices = await self._api.async_get_connected_devices()
|
||||
ret_dict[_SensorTypes.DEVICES] = len(devices)
|
||||
|
||||
if any(x.type.is_speed for x in self._sensors if x.enabled):
|
||||
# Let's check the upload and download speed
|
||||
speed = await self._api.async_get_current_transfer_rates()
|
||||
ret_dict[_SensorTypes.DOWNLOAD_SPEED] = round(speed[0] / 125000, 2)
|
||||
ret_dict[_SensorTypes.UPLOAD_SPEED] = round(speed[1] / 125000, 2)
|
||||
|
||||
if any(x.type.is_size for x in self._sensors if x.enabled):
|
||||
rates = await self._api.async_get_bytes_total()
|
||||
ret_dict[_SensorTypes.DOWNLOAD] = round(rates[0] / 1000000000, 1)
|
||||
ret_dict[_SensorTypes.UPLOAD] = round(rates[1] / 1000000000, 1)
|
||||
|
||||
if not self._connected:
|
||||
# Log a successful reconnect
|
||||
self._connected = True
|
||||
_LOGGER.warning("Successfully reconnected to ASUS router")
|
||||
|
||||
except OSError as err:
|
||||
if self._connected:
|
||||
# Log the first time connection was lost
|
||||
_LOGGER.warning("Lost connection to router error due to: '%s'", err)
|
||||
self._connected = False
|
||||
|
||||
return ret_dict
|
||||
|
||||
|
||||
class AsuswrtSensor(CoordinatorEntity):
|
||||
"""The asuswrt specific sensor class."""
|
||||
class AsusWrtSensor(CoordinatorEntity):
|
||||
"""Representation of a AsusWrt sensor."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: DataUpdateCoordinator,
|
||||
data_handler: AsuswrtDataHandler,
|
||||
device_name: str,
|
||||
sensor_type: _SensorTypes,
|
||||
):
|
||||
"""Initialize the sensor class."""
|
||||
router: AsusWrtRouter,
|
||||
sensor_type: str,
|
||||
sensor: Dict[str, any],
|
||||
) -> None:
|
||||
"""Initialize a AsusWrt sensor."""
|
||||
super().__init__(coordinator)
|
||||
self._handler = data_handler
|
||||
self._device_name = device_name
|
||||
self._type = sensor_type
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
"""Return the state of the sensor."""
|
||||
return self.coordinator.data.get(self._type)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""Return the name of the sensor."""
|
||||
return self._type.sensor_name
|
||||
|
||||
@property
|
||||
def icon(self) -> Optional[str]:
|
||||
"""Return the icon to use in the frontend."""
|
||||
return self._type.icon
|
||||
|
||||
@property
|
||||
def unit_of_measurement(self) -> Optional[str]:
|
||||
"""Return the unit."""
|
||||
return self._type.unit_of_measurement
|
||||
|
||||
@property
|
||||
def unique_id(self) -> str:
|
||||
"""Return the unique_id of the sensor."""
|
||||
return f"{DOMAIN} {self._type.sensor_name}"
|
||||
|
||||
@property
|
||||
def device_info(self) -> Dict[str, any]:
|
||||
"""Return the device information."""
|
||||
return {
|
||||
"identifiers": {(DOMAIN, "AsusWRT")},
|
||||
"name": self._device_name,
|
||||
"model": "Asus Router",
|
||||
"manufacturer": "Asus",
|
||||
}
|
||||
self._router = router
|
||||
self._sensor_type = sensor_type
|
||||
self._name = f"{DEFAULT_PREFIX} {sensor[SENSOR_NAME]}"
|
||||
self._unique_id = f"{DOMAIN} {self._name}"
|
||||
self._unit = sensor[SENSOR_UNIT]
|
||||
self._factor = sensor[SENSOR_FACTOR]
|
||||
self._icon = sensor[SENSOR_ICON]
|
||||
self._device_class = sensor[SENSOR_DEVICE_CLASS]
|
||||
self._default_enabled = sensor.get(SENSOR_DEFAULT_ENABLED, False)
|
||||
|
||||
@property
|
||||
def entity_registry_enabled_default(self) -> bool:
|
||||
"""Return if the entity should be enabled when first added to the entity registry."""
|
||||
return False
|
||||
return self._default_enabled
|
||||
|
||||
async def async_added_to_hass(self) -> None:
|
||||
"""When entity is added to hass."""
|
||||
self._handler.enable_sensor(self._type)
|
||||
await super().async_added_to_hass()
|
||||
@property
|
||||
def state(self) -> str:
|
||||
"""Return current state."""
|
||||
state = self.coordinator.data.get(self._sensor_type)
|
||||
if state is None:
|
||||
return None
|
||||
if self._factor and isinstance(state, Number):
|
||||
return round(state / self._factor, 2)
|
||||
return state
|
||||
|
||||
async def async_will_remove_from_hass(self):
|
||||
"""Call when entity is removed from hass."""
|
||||
self._handler.disable_sensor(self._type)
|
||||
@property
|
||||
def unique_id(self) -> str:
|
||||
"""Return a unique ID."""
|
||||
return self._unique_id
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""Return the name."""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def unit_of_measurement(self) -> str:
|
||||
"""Return the unit."""
|
||||
return self._unit
|
||||
|
||||
@property
|
||||
def icon(self) -> str:
|
||||
"""Return the icon."""
|
||||
return self._icon
|
||||
|
||||
@property
|
||||
def device_class(self) -> str:
|
||||
"""Return the device_class."""
|
||||
return self._device_class
|
||||
|
||||
@property
|
||||
def device_state_attributes(self) -> Dict[str, any]:
|
||||
"""Return the attributes."""
|
||||
return {"hostname": self._router.host}
|
||||
|
||||
@property
|
||||
def device_info(self) -> Dict[str, any]:
|
||||
"""Return the device information."""
|
||||
return self._router.device_info
|
||||
|
|
|
@ -7,7 +7,7 @@ import pytest
|
|||
|
||||
from homeassistant.components import device_tracker, sensor
|
||||
from homeassistant.components.asuswrt.const import DOMAIN
|
||||
from homeassistant.components.asuswrt.sensor import _SensorTypes
|
||||
from homeassistant.components.asuswrt.sensor import DEFAULT_PREFIX
|
||||
from homeassistant.components.device_tracker.const import CONF_CONSIDER_HOME
|
||||
from homeassistant.const import (
|
||||
CONF_HOST,
|
||||
|
@ -66,49 +66,56 @@ async def test_sensors(hass, connect):
|
|||
"""Test creating an AsusWRT sensor."""
|
||||
entity_reg = await hass.helpers.entity_registry.async_get_registry()
|
||||
|
||||
# Pre-enable the status sensor
|
||||
entity_reg.async_get_or_create(
|
||||
sensor.DOMAIN,
|
||||
DOMAIN,
|
||||
f"{DOMAIN} {_SensorTypes(_SensorTypes.DEVICES).sensor_name}",
|
||||
suggested_object_id="asuswrt_connected_devices",
|
||||
disabled_by=None,
|
||||
)
|
||||
entity_reg.async_get_or_create(
|
||||
sensor.DOMAIN,
|
||||
DOMAIN,
|
||||
f"{DOMAIN} {_SensorTypes(_SensorTypes.DOWNLOAD_SPEED).sensor_name}",
|
||||
suggested_object_id="asuswrt_download_speed",
|
||||
disabled_by=None,
|
||||
)
|
||||
entity_reg.async_get_or_create(
|
||||
sensor.DOMAIN,
|
||||
DOMAIN,
|
||||
f"{DOMAIN} {_SensorTypes(_SensorTypes.DOWNLOAD).sensor_name}",
|
||||
suggested_object_id="asuswrt_download",
|
||||
disabled_by=None,
|
||||
)
|
||||
entity_reg.async_get_or_create(
|
||||
sensor.DOMAIN,
|
||||
DOMAIN,
|
||||
f"{DOMAIN} {_SensorTypes(_SensorTypes.UPLOAD_SPEED).sensor_name}",
|
||||
suggested_object_id="asuswrt_upload_speed",
|
||||
disabled_by=None,
|
||||
)
|
||||
entity_reg.async_get_or_create(
|
||||
sensor.DOMAIN,
|
||||
DOMAIN,
|
||||
f"{DOMAIN} {_SensorTypes(_SensorTypes.UPLOAD).sensor_name}",
|
||||
suggested_object_id="asuswrt_upload",
|
||||
disabled_by=None,
|
||||
)
|
||||
|
||||
# init config entry
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data=CONFIG_DATA,
|
||||
options={CONF_CONSIDER_HOME: 60},
|
||||
)
|
||||
|
||||
# init variable
|
||||
unique_id = DOMAIN
|
||||
name_prefix = DEFAULT_PREFIX
|
||||
obj_prefix = name_prefix.lower()
|
||||
sensor_prefix = f"{sensor.DOMAIN}.{obj_prefix}"
|
||||
|
||||
# Pre-enable the status sensor
|
||||
entity_reg.async_get_or_create(
|
||||
sensor.DOMAIN,
|
||||
DOMAIN,
|
||||
f"{unique_id} {name_prefix} Devices Connected",
|
||||
suggested_object_id=f"{obj_prefix}_devices_connected",
|
||||
disabled_by=None,
|
||||
)
|
||||
entity_reg.async_get_or_create(
|
||||
sensor.DOMAIN,
|
||||
DOMAIN,
|
||||
f"{unique_id} {name_prefix} Download Speed",
|
||||
suggested_object_id=f"{obj_prefix}_download_speed",
|
||||
disabled_by=None,
|
||||
)
|
||||
entity_reg.async_get_or_create(
|
||||
sensor.DOMAIN,
|
||||
DOMAIN,
|
||||
f"{unique_id} {name_prefix} Download",
|
||||
suggested_object_id=f"{obj_prefix}_download",
|
||||
disabled_by=None,
|
||||
)
|
||||
entity_reg.async_get_or_create(
|
||||
sensor.DOMAIN,
|
||||
DOMAIN,
|
||||
f"{unique_id} {name_prefix} Upload Speed",
|
||||
suggested_object_id=f"{obj_prefix}_upload_speed",
|
||||
disabled_by=None,
|
||||
)
|
||||
entity_reg.async_get_or_create(
|
||||
sensor.DOMAIN,
|
||||
DOMAIN,
|
||||
f"{unique_id} {name_prefix} Upload",
|
||||
suggested_object_id=f"{obj_prefix}_upload",
|
||||
disabled_by=None,
|
||||
)
|
||||
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
# initial devices setup
|
||||
|
@ -119,11 +126,11 @@ async def test_sensors(hass, connect):
|
|||
|
||||
assert hass.states.get(f"{device_tracker.DOMAIN}.test").state == STATE_HOME
|
||||
assert hass.states.get(f"{device_tracker.DOMAIN}.testtwo").state == STATE_HOME
|
||||
assert hass.states.get(f"{sensor.DOMAIN}.asuswrt_connected_devices").state == "2"
|
||||
assert hass.states.get(f"{sensor.DOMAIN}.asuswrt_download_speed").state == "160.0"
|
||||
assert hass.states.get(f"{sensor.DOMAIN}.asuswrt_download").state == "60.0"
|
||||
assert hass.states.get(f"{sensor.DOMAIN}.asuswrt_upload_speed").state == "80.0"
|
||||
assert hass.states.get(f"{sensor.DOMAIN}.asuswrt_upload").state == "50.0"
|
||||
assert hass.states.get(f"{sensor_prefix}_download_speed").state == "160.0"
|
||||
assert hass.states.get(f"{sensor_prefix}_download").state == "60.0"
|
||||
assert hass.states.get(f"{sensor_prefix}_upload_speed").state == "80.0"
|
||||
assert hass.states.get(f"{sensor_prefix}_upload").state == "50.0"
|
||||
assert hass.states.get(f"{sensor_prefix}_devices_connected").state == "2"
|
||||
|
||||
# add one device and remove another
|
||||
MOCK_DEVICES.pop("a1:b1:c1:d1:e1:f1")
|
||||
|
@ -137,7 +144,7 @@ async def test_sensors(hass, connect):
|
|||
assert hass.states.get(f"{device_tracker.DOMAIN}.test").state == STATE_HOME
|
||||
assert hass.states.get(f"{device_tracker.DOMAIN}.testtwo").state == STATE_HOME
|
||||
assert hass.states.get(f"{device_tracker.DOMAIN}.testthree").state == STATE_HOME
|
||||
assert hass.states.get(f"{sensor.DOMAIN}.asuswrt_connected_devices").state == "2"
|
||||
assert hass.states.get(f"{sensor_prefix}_devices_connected").state == "2"
|
||||
|
||||
hass.config_entries.async_update_entry(
|
||||
config_entry, options={CONF_CONSIDER_HOME: 0}
|
||||
|
|
Loading…
Add table
Reference in a new issue