ubus: switch to pypi library (#46690)

Signed-off-by: Álvaro Fernández Rojas <noltari@gmail.com>
This commit is contained in:
Álvaro Fernández Rojas 2021-02-18 12:33:21 +01:00 committed by GitHub
parent 0181cbb312
commit 4083b90138
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 19 additions and 77 deletions

View file

@ -491,6 +491,7 @@ homeassistant/components/tts/* @pvizeli
homeassistant/components/tuya/* @ollo69
homeassistant/components/twentemilieu/* @frenck
homeassistant/components/twinkly/* @dr1rrb
homeassistant/components/ubus/* @noltari
homeassistant/components/unifi/* @Kane610
homeassistant/components/unifiled/* @florisvdk
homeassistant/components/upb/* @gwww

View file

@ -1,9 +1,9 @@
"""Support for OpenWRT (ubus) routers."""
import json
import logging
import re
import requests
from openwrt.ubus import Ubus
import voluptuous as vol
from homeassistant.components.device_tracker import (
@ -11,8 +11,7 @@ from homeassistant.components.device_tracker import (
PLATFORM_SCHEMA,
DeviceScanner,
)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME, HTTP_OK
from homeassistant.exceptions import HomeAssistantError
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
@ -58,7 +57,7 @@ def _refresh_on_access_denied(func):
"Invalid session detected."
" Trying to refresh session_id and re-run RPC"
)
self.session_id = _get_session_id(self.url, self.username, self.password)
self.ubus.connect()
return func(self, *args, **kwargs)
@ -82,10 +81,10 @@ class UbusDeviceScanner(DeviceScanner):
self.last_results = {}
self.url = f"http://{host}/ubus"
self.session_id = _get_session_id(self.url, self.username, self.password)
self.ubus = Ubus(self.url, self.username, self.password)
self.hostapd = []
self.mac2name = None
self.success_init = self.session_id is not None
self.success_init = self.ubus.connect() is not None
def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
@ -119,16 +118,14 @@ class UbusDeviceScanner(DeviceScanner):
_LOGGER.info("Checking hostapd")
if not self.hostapd:
hostapd = _req_json_rpc(self.url, self.session_id, "list", "hostapd.*", "")
hostapd = self.ubus.get_hostapd()
self.hostapd.extend(hostapd.keys())
self.last_results = []
results = 0
# for each access point
for hostapd in self.hostapd:
result = _req_json_rpc(
self.url, self.session_id, "call", hostapd, "get_clients"
)
result = self.ubus.get_hostapd_clients(hostapd)
if result:
results = results + 1
@ -151,31 +148,21 @@ class DnsmasqUbusDeviceScanner(UbusDeviceScanner):
def _generate_mac2name(self):
if self.leasefile is None:
result = _req_json_rpc(
self.url,
self.session_id,
"call",
"uci",
"get",
config="dhcp",
type="dnsmasq",
)
result = self.ubus.get_uci_config("dhcp", "dnsmasq")
if result:
values = result["values"].values()
self.leasefile = next(iter(values))["leasefile"]
else:
return
result = _req_json_rpc(
self.url, self.session_id, "call", "file", "read", path=self.leasefile
)
result = self.ubus.file_read(self.leasefile)
if result:
self.mac2name = {}
for line in result["data"].splitlines():
hosts = line.split(" ")
self.mac2name[hosts[1].upper()] = hosts[3]
else:
# Error, handled in the _req_json_rpc
# Error, handled in the ubus.file_read()
return
@ -183,7 +170,7 @@ class OdhcpdUbusDeviceScanner(UbusDeviceScanner):
"""Implement the Ubus device scanning for the odhcp DHCP server."""
def _generate_mac2name(self):
result = _req_json_rpc(self.url, self.session_id, "call", "dhcp", "ipv4leases")
result = self.ubus.get_dhcp_method("ipv4leases")
if result:
self.mac2name = {}
for device in result["device"].values():
@ -193,55 +180,5 @@ class OdhcpdUbusDeviceScanner(UbusDeviceScanner):
mac = ":".join(mac[i : i + 2] for i in range(0, len(mac), 2))
self.mac2name[mac.upper()] = lease["hostname"]
else:
# Error, handled in the _req_json_rpc
# Error, handled in the ubus.get_dhcp_method()
return
def _req_json_rpc(url, session_id, rpcmethod, subsystem, method, **params):
"""Perform one JSON RPC operation."""
data = json.dumps(
{
"jsonrpc": "2.0",
"id": 1,
"method": rpcmethod,
"params": [session_id, subsystem, method, params],
}
)
try:
res = requests.post(url, data=data, timeout=5)
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
return
if res.status_code == HTTP_OK:
response = res.json()
if "error" in response:
if (
"message" in response["error"]
and response["error"]["message"] == "Access denied"
):
raise PermissionError(response["error"]["message"])
raise HomeAssistantError(response["error"]["message"])
if rpcmethod == "call":
try:
return response["result"][1]
except IndexError:
return
else:
return response["result"]
def _get_session_id(url, username, password):
"""Get the authentication token for the given host+username+password."""
res = _req_json_rpc(
url,
"00000000000000000000000000000000",
"call",
"session",
"login",
username=username,
password=password,
)
return res["ubus_rpc_session"]

View file

@ -2,5 +2,6 @@
"domain": "ubus",
"name": "OpenWrt (ubus)",
"documentation": "https://www.home-assistant.io/integrations/ubus",
"codeowners": []
"requirements": ["openwrt-ubus-rpc==0.0.2"],
"codeowners": ["@noltari"]
}

View file

@ -1070,6 +1070,9 @@ openwebifpy==3.2.7
# homeassistant.components.luci
openwrt-luci-rpc==1.1.6
# homeassistant.components.ubus
openwrt-ubus-rpc==0.0.2
# homeassistant.components.oru
oru==0.1.11